Skip to content

Commit f9269b9

Browse files
authored
Merge pull request #1080 from rocket-admin/backend_bulk_rows_delete_rework
reworked bulk delete logic
2 parents a9fa79e + c0a19ba commit f9269b9

15 files changed

+264
-75
lines changed

backend/src/entities/table/use-cases/delete-rows-from-table.use.case.ts

Lines changed: 21 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import { HttpException, HttpStatus, Inject, Injectable } from '@nestjs/common';
2+
import { getDataAccessObject } from '@rocketadmin/shared-code/dist/src/data-access-layer/shared/create-data-access-object.js';
23
import AbstractUseCase from '../../../common/abstract-use.case.js';
34
import { IGlobalDatabaseContext } from '../../../common/application/global-database-context.interface.js';
45
import { BaseType } from '../../../common/data-injection.tokens.js';
5-
import { getDataAccessObject } from '@rocketadmin/shared-code/dist/src/data-access-layer/shared/create-data-access-object.js';
66
import { AmplitudeEventTypeEnum, LogOperationTypeEnum, OperationResultStatusEnum } from '../../../enums/index.js';
77
import { Messages } from '../../../exceptions/text/messages.js';
88
import { compareArrayElements, isConnectionTypeAgent } from '../../../helpers/index.js';
@@ -13,9 +13,6 @@ import { DeleteRowsFromTableDs } from '../application/data-structures/delete-row
1313
import { convertHexDataInPrimaryKeyUtil } from '../utils/convert-hex-data-in-primary-key.util.js';
1414
import { findObjectsWithProperties } from '../utils/find-objects-with-properties.js';
1515
import { IDeleteRowsFromTable } from './table-use-cases.interface.js';
16-
import { IDataAccessObject } from '@rocketadmin/shared-code/dist/src/data-access-layer/shared/interfaces/data-access-object.interface.js';
17-
import { IDataAccessObjectAgent } from '@rocketadmin/shared-code/dist/src/data-access-layer/shared/interfaces/data-access-object-agent.interface.js';
18-
import PQueue from 'p-queue';
1916

2017
type DeleteRowsFromTableResult = {
2118
operationStatusResult: OperationResultStatusEnum;
@@ -113,7 +110,7 @@ export class DeleteRowsFromTableUseCase
113110
);
114111
}
115112
});
116-
113+
// todo need improve
117114
let oldRowsData: Array<Record<string, unknown>>;
118115
try {
119116
oldRowsData = await Promise.all(
@@ -130,34 +127,28 @@ export class DeleteRowsFromTableUseCase
130127

131128
const deleteOperationsResults: Array<DeleteRowsFromTableResult> = [];
132129

133-
const queue = new PQueue({ concurrency: 5 });
134-
const deleteRowsResults: Array<DeleteRowsFromTableResult | void> = await Promise.all(
135-
primaryKeys.map((primaryKey) =>
136-
queue.add(() => this.deleteRowFromTable(dao, tableName, primaryKey, oldRowsData, userEmail)),
137-
),
138-
);
139-
140-
const deletionErrors: Array<string> = [];
141-
deleteRowsResults.forEach((result) => {
142-
if (result) {
143-
deleteOperationsResults.push(result);
144-
if (result.error) {
145-
deletionErrors.push(result.error);
146-
}
147-
}
148-
});
149-
150130
try {
151-
if (deletionErrors.length > 0) {
152-
throw new HttpException(
153-
{
154-
message: Messages.BULK_DELETE_FAILED_DELETE_ROWS(deletionErrors),
155-
},
156-
HttpStatus.INTERNAL_SERVER_ERROR,
157-
);
158-
}
131+
await dao.bulkDeleteRowsInTable(tableName, primaryKeys, userEmail);
132+
primaryKeys.forEach((primaryKey) => {
133+
deleteOperationsResults.push({
134+
operationStatusResult: OperationResultStatusEnum.successfully,
135+
row: primaryKey,
136+
old_data: findObjectsWithProperties(oldRowsData, primaryKey).at(0),
137+
error: null,
138+
affected_primary_key: primaryKey as unknown as string,
139+
});
140+
});
159141
return true;
160142
} catch (error) {
143+
primaryKeys.forEach((primaryKey) => {
144+
deleteOperationsResults.push({
145+
operationStatusResult: OperationResultStatusEnum.unsuccessfully,
146+
row: primaryKey,
147+
old_data: findObjectsWithProperties(oldRowsData, primaryKey).at(0),
148+
error: error.message,
149+
affected_primary_key: primaryKey as unknown as string,
150+
});
151+
});
161152
throw error;
162153
} finally {
163154
const createdLogs = await this.tableLogsService.createAndSaveNewLogsUtil(
@@ -175,34 +166,4 @@ export class DeleteRowsFromTableUseCase
175166
);
176167
}
177168
}
178-
179-
private async deleteRowFromTable(
180-
dataAccessObject: IDataAccessObject | IDataAccessObjectAgent,
181-
tableName: string,
182-
primaryKey: Record<string, unknown>,
183-
oldRowsData: Array<Record<string, unknown>>,
184-
userEmail: string,
185-
): Promise<DeleteRowsFromTableResult> {
186-
let operationResult = OperationResultStatusEnum.unknown;
187-
try {
188-
await dataAccessObject.deleteRowInTable(tableName, primaryKey, userEmail);
189-
operationResult = OperationResultStatusEnum.successfully;
190-
return {
191-
operationStatusResult: operationResult,
192-
row: primaryKey,
193-
old_data: findObjectsWithProperties(oldRowsData, primaryKey).at(0),
194-
error: null,
195-
affected_primary_key: primaryKey as unknown as string,
196-
};
197-
} catch (error) {
198-
operationResult = OperationResultStatusEnum.unsuccessfully;
199-
return {
200-
operationStatusResult: operationResult,
201-
row: primaryKey,
202-
old_data: findObjectsWithProperties(oldRowsData, primaryKey).at(0),
203-
error: error.message,
204-
affected_primary_key: primaryKey as unknown as string,
205-
};
206-
}
207-
}
208169
}

backend/test/ava-tests/saas-tests/table-postgres-e2e.test.ts

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3576,6 +3576,17 @@ test.serial(`${currentTest} should delete row in table and return result`, async
35763576
-1,
35773577
);
35783578
}
3579+
3580+
// check that deletion of rows was logged
3581+
3582+
const getTableLogs = await request(app.getHttpServer())
3583+
.get(`/logs/${createConnectionRO.id}`)
3584+
.set('Cookie', firstUserToken)
3585+
.set('Content-Type', 'application/json')
3586+
.set('Accept', 'application/json');
3587+
const getRowInTableRO = JSON.parse(getTableLogs.text);
3588+
const deleteRowsLogs = getRowInTableRO.logs.filter((log) => log.operationType === LogOperationTypeEnum.deleteRow);
3589+
t.is(deleteRowsLogs.length, primaryKeysForDeletion.length);
35793590
});
35803591

35813592
test.serial(`${currentTest} should test connection and return result`, async (t) => {
@@ -3594,22 +3605,25 @@ test.serial(`${currentTest} should test connection and return result`, async (t)
35943605
t.is(message, 'Successfully connected');
35953606
});
35963607

3597-
test.serial(`${currentTest} should test connection and return negative result when connection password is incorrect result`, async (t) => {
3598-
const connectionToTestDB = getTestData(mockFactory).connectionToPostgres;
3599-
const firstUserToken = (await registerUserAndReturnUserInfo(app)).token;
3608+
test.serial(
3609+
`${currentTest} should test connection and return negative result when connection password is incorrect result`,
3610+
async (t) => {
3611+
const connectionToTestDB = getTestData(mockFactory).connectionToPostgres;
3612+
const firstUserToken = (await registerUserAndReturnUserInfo(app)).token;
36003613

3601-
connectionToTestDB.password = '8764323452888';
3602-
const testConnectionResponse = await request(app.getHttpServer())
3603-
.post('/connection/test/')
3604-
.send(connectionToTestDB)
3605-
.set('Cookie', firstUserToken)
3606-
.set('Content-Type', 'application/json')
3607-
.set('Accept', 'application/json');
3614+
connectionToTestDB.password = '8764323452888';
3615+
const testConnectionResponse = await request(app.getHttpServer())
3616+
.post('/connection/test/')
3617+
.send(connectionToTestDB)
3618+
.set('Cookie', firstUserToken)
3619+
.set('Content-Type', 'application/json')
3620+
.set('Accept', 'application/json');
36083621

3609-
t.is(testConnectionResponse.status, 201);
3610-
const { result } = JSON.parse(testConnectionResponse.text);
3611-
t.is(result, false);
3612-
});
3622+
t.is(testConnectionResponse.status, 201);
3623+
const { result } = JSON.parse(testConnectionResponse.text);
3624+
t.is(result, false);
3625+
},
3626+
);
36133627

36143628
currentTest = 'GET table/csv/:slug';
36153629

rocketadmin-agent/src/command/command-executor.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,25 @@ export class CommandExecutor {
234234
Logger.createLogRecord(row, tableName, email, LogOperationTypeEnum.updateRow, operationStatusResult, null);
235235
}
236236
break;
237+
case OperationTypeEnum.bulkDeleteRowsInTable:
238+
try {
239+
operationStatusResult = OperationResultStatusEnum.successfully;
240+
return await dao.bulkDeleteRowsInTable(tableName, primaryKey);
241+
} catch (e) {
242+
operationStatusResult = OperationResultStatusEnum.unsuccessfully;
243+
console.log(Messages.FAIL_MESSAGE(e.message));
244+
return new Error(Messages.FAILED_TO_UPDATE_ROWS);
245+
} finally {
246+
Logger.createLogRecord(
247+
primaryKey,
248+
tableName,
249+
email,
250+
LogOperationTypeEnum.deleteRow,
251+
operationStatusResult,
252+
null,
253+
);
254+
}
255+
break;
237256
case OperationTypeEnum.executeRawQuery:
238257
try {
239258
return await dao.executeRawQuery(row, tableName);

rocketadmin-agent/src/enums/operation-type.enum.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export enum OperationTypeEnum {
1010
testConnect = 'testConnect',
1111
updateRowInTable = 'updateRowInTable',
1212
bulkUpdateRowsInTable = 'bulkUpdateRowsInTable',
13+
bulkDeleteRowsInTable = 'bulkDeleteRowsInTable',
1314
validateSettings = 'validateSettings',
1415
initialConnection = 'initialConnection',
1516
dataFromAgent = 'dataFromAgent',

shared-code/src/data-access-layer/data-access-objects/data-access-object-agent.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,40 @@ export class DataAccessObjectAgent implements IDataAccessObjectAgent {
458458
}
459459
}
460460

461+
public async bulkDeleteRowsInTable(
462+
tableName: string,
463+
primaryKeys: Array<Record<string, unknown>>,
464+
userEmail: string,
465+
): Promise<number> {
466+
const jwtAuthToken = this.generateJWT(this.connection.token);
467+
axios.defaults.headers.common['Authorization'] = `Bearer ${jwtAuthToken}`;
468+
469+
try {
470+
const { data: { commandResult } = {} } = await axios.post(this.serverAddress, {
471+
operationType: DataAccessObjectCommandsEnum.bulkDeleteRowsInTable,
472+
tableName,
473+
primaryKey: primaryKeys,
474+
email: userEmail,
475+
});
476+
477+
if (commandResult instanceof Error) {
478+
throw new Error(commandResult.message);
479+
}
480+
481+
if (!commandResult) {
482+
throw new Error(ERROR_MESSAGES.NO_DATA_RETURNED_FROM_AGENT);
483+
}
484+
485+
return commandResult;
486+
} catch (e) {
487+
if (axios.isAxiosError(e)) {
488+
this.checkIsErrorLocalAndThrowException(e);
489+
throw new Error(e.response?.data);
490+
}
491+
throw e;
492+
}
493+
}
494+
461495
public async validateSettings(
462496
settings: ValidateTableSettingsDS,
463497
tableName: string,
@@ -623,7 +657,11 @@ export class DataAccessObjectAgent implements IDataAccessObjectAgent {
623657
}
624658
}
625659

626-
public async executeRawQuery(query: string, tableName: string, userEmail: string): Promise<Array<Record<string, unknown>>> {
660+
public async executeRawQuery(
661+
query: string,
662+
tableName: string,
663+
userEmail: string,
664+
): Promise<Array<Record<string, unknown>>> {
627665
const jwtAuthToken = this.generateJWT(this.connection.token);
628666
axios.defaults.headers.common['Authorization'] = `Bearer ${jwtAuthToken}`;
629667

shared-code/src/data-access-layer/data-access-objects/data-access-object-dynamodb.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,41 @@ export class DataAccessObjectDynamoDB extends BasicDataAccessObject implements I
513513
return primaryKeys as any;
514514
}
515515

516+
public async bulkDeleteRowsInTable(tableName: string, primaryKeys: Array<Record<string, unknown>>): Promise<number> {
517+
const { documentClient } = this.getDynamoDb();
518+
519+
if (primaryKeys.length === 0) {
520+
return 0;
521+
}
522+
523+
const tableStructure = await this.getTableStructure(tableName);
524+
525+
const deletePromises = primaryKeys.map(async (primaryKey) => {
526+
for (const key in primaryKey) {
527+
const foundKeySchema = tableStructure.find((el) => el.column_name === key);
528+
if (foundKeySchema?.data_type === 'number') {
529+
const numericValue = Number(primaryKey[key]);
530+
if (!isNaN(numericValue)) {
531+
primaryKey[key] = numericValue;
532+
}
533+
}
534+
}
535+
536+
try {
537+
const params = {
538+
TableName: tableName,
539+
Key: marshall(primaryKey),
540+
};
541+
await documentClient.send(new DeleteItemCommand(params));
542+
} catch (e) {
543+
console.error(`Failed to delete item with key ${JSON.stringify(primaryKey)}: ${e.message}`);
544+
}
545+
});
546+
547+
await Promise.all(deletePromises);
548+
return primaryKeys.length;
549+
}
550+
516551
public async validateSettings(settings: ValidateTableSettingsDS, tableName: string): Promise<Array<string>> {
517552
const [tableStructure, primaryColumns] = await Promise.all([
518553
this.getTableStructure(tableName),

shared-code/src/data-access-layer/data-access-objects/data-access-object-ibmdb2.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,11 @@ WHERE
459459
return newValues;
460460
}
461461

462+
public async bulkDeleteRowsInTable(tableName: string, primaryKeys: Array<Record<string, unknown>>): Promise<number> {
463+
await Promise.allSettled(primaryKeys.map((key) => this.deleteRowInTable(tableName, key)));
464+
return primaryKeys.length;
465+
}
466+
462467
public async validateSettings(settings: ValidateTableSettingsDS, tableName: string): Promise<string[]> {
463468
const [tableStructure, primaryColumns] = await Promise.all([
464469
this.getTableStructure(tableName),

shared-code/src/data-access-layer/data-access-objects/data-access-object-mongodb.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,14 @@ export class DataAccessObjectMongo extends BasicDataAccessObject implements IDat
345345
return { _id: objectIds.map((objectId) => this.processMongoIdField(objectId)) };
346346
}
347347

348+
public async bulkDeleteRowsInTable(tableName: string, primaryKeys: Array<Record<string, unknown>>): Promise<number> {
349+
const db = await this.getConnectionToDatabase();
350+
const collection = db.collection(tableName);
351+
const objectIds = primaryKeys.map((primaryKey) => this.createObjectIdFromSting(primaryKey._id as string));
352+
await collection.deleteMany({ _id: { $in: objectIds } });
353+
return primaryKeys.length;
354+
}
355+
348356
public async validateSettings(settings: ValidateTableSettingsDS, tableName: string): Promise<string[]> {
349357
const [tableStructure, primaryColumns] = await Promise.all([
350358
this.getTableStructure(tableName),

shared-code/src/data-access-layer/data-access-objects/data-access-object-mssql.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,31 @@ WHERE TABLE_TYPE = 'VIEW'
394394
.update(newValues);
395395
}
396396

397+
public async bulkDeleteRowsInTable(tableName: string, primaryKeys: Array<Record<string, unknown>>): Promise<number> {
398+
const [knex, schemaName] = await Promise.all([this.configureKnex(), this.getSchemaName(tableName)]);
399+
const tableWithSchema = `${schemaName}.[${tableName}]`;
400+
401+
if (primaryKeys.length === 0) {
402+
return 0;
403+
}
404+
405+
await knex.transaction(async (trx) => {
406+
await trx(tableWithSchema)
407+
.delete()
408+
.modify((queryBuilder) => {
409+
primaryKeys.forEach((key) => {
410+
queryBuilder.orWhere((builder) => {
411+
Object.entries(key).forEach(([column, value]) => {
412+
builder.andWhere(column, value);
413+
});
414+
});
415+
});
416+
});
417+
});
418+
419+
return primaryKeys.length;
420+
}
421+
397422
public async validateSettings(settings: ValidateTableSettingsDS, tableName: string): Promise<string[]> {
398423
const [tableStructure, primaryColumns] = await Promise.all([
399424
this.getTableStructure(tableName),

shared-code/src/data-access-layer/data-access-objects/data-access-object-mysql.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,30 @@ export class DataAccessObjectMysql extends BasicDataAccessObject implements IDat
467467
.update(newValues);
468468
}
469469

470+
public async bulkDeleteRowsInTable(tableName: string, primaryKeys: Array<Record<string, unknown>>): Promise<number> {
471+
const knex = await this.configureKnex();
472+
473+
if (primaryKeys.length === 0) {
474+
return 0;
475+
}
476+
477+
await knex.transaction(async (trx) => {
478+
await trx(tableName)
479+
.delete()
480+
.modify((queryBuilder) => {
481+
primaryKeys.forEach((key) => {
482+
queryBuilder.orWhere((builder) => {
483+
Object.entries(key).forEach(([column, value]) => {
484+
builder.andWhere(column, value);
485+
});
486+
});
487+
});
488+
});
489+
});
490+
491+
return primaryKeys.length;
492+
}
493+
470494
public async validateSettings(settings: ValidateTableSettingsDS, tableName: string): Promise<string[]> {
471495
const [tableStructure, primaryColumns] = await Promise.all([
472496
this.getTableStructure(tableName),

0 commit comments

Comments
 (0)