Skip to content

Commit 14757f4

Browse files
authored
Implement job queues for desktop dive (#1534)
* WIP create job queues * Show upcoming jobs in jobs tab * Allow concurrent jobs with async job queue * Send backend most recent settings * WIP add gpu and cpu job queues * Queue CPU jobs * Clean up PR * Fix linting/syntax issues * WIP show queued pipeline jobs * Update test of video that needs conversion * Show pending conversions on recents tab * Move job queues to subdirectory * Add component to show queued jobs * Use queues exclusively for desktop jobs * Remove upcoming jobs from jobs history component * Refactor job types * Watch queue directly for "awaiting conversion"
1 parent de73613 commit 14757f4

File tree

16 files changed

+696
-304
lines changed

16 files changed

+696
-304
lines changed

client/platform/desktop/backend/ipcService.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ import {
77
DesktopJobUpdate, RunPipeline, RunTraining, Settings, ExportDatasetArgs,
88
DesktopMediaImportResponse,
99
ExportTrainedPipeline,
10+
ConversionArgs,
11+
DesktopJob,
1012
} from 'platform/desktop/constants';
13+
import { convertMedia } from 'platform/desktop/backend/native/mediaJobs';
1114

1215
import linux from './native/linux';
1316
import win32 from './native/windows';
@@ -110,12 +113,21 @@ export default function register() {
110113
return ret;
111114
});
112115

113-
ipcMain.handle('finalize-import', async (event, args: DesktopMediaImportResponse) => {
116+
ipcMain.handle('finalize-import', async (event, args: DesktopMediaImportResponse) => common.finalizeMediaImport(settings.get(), args));
117+
118+
ipcMain.handle('convert', async (event, args: ConversionArgs) => {
114119
const updater = (update: DesktopJobUpdate) => {
115120
event.sender.send('job-update', update);
116121
};
117-
const ret = await common.finalizeMediaImport(settings.get(), args, updater);
118-
return ret;
122+
const currentSettings: Settings = settings.get();
123+
const job: DesktopJob = await convertMedia(
124+
currentSettings,
125+
args,
126+
updater,
127+
(jobKey, meta) => common.completeConversion(currentSettings, args.meta.id, jobKey, meta),
128+
true,
129+
);
130+
return job;
119131
});
120132

121133
ipcMain.handle('validate-settings', async (_, s: Settings) => {

client/platform/desktop/backend/native/common.spec.ts

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { Console } from 'console';
66

77
import {
88
AnnotationsCurrentVersion, DesktopJob,
9-
DesktopJobUpdate, JsonMeta, RunTraining, Settings,
9+
DesktopJobUpdate, JobType, JsonMeta, RunTraining, Settings,
1010
} from 'platform/desktop/constants';
1111
import { makeEmptyAnnotationFile } from 'platform/desktop/backend/serializers/dive';
1212

@@ -484,7 +484,8 @@ describe('native.common', () => {
484484
'/home/user/data/imageLists/success/image1.png',
485485
]);
486486
expect(payload.jsonMeta.name).toBe('success');
487-
const final = await common.finalizeMediaImport(settings, payload, updater);
487+
const res = await common.finalizeMediaImport(settings, payload);
488+
const final = res.meta;
488489
expect(final.originalImageFiles.length).toBe(4);
489490
expect(final.name).toBe('success');
490491
expect(final.imageListPath).toBe('/home/user/data/imageLists/success/image_list.txt');
@@ -497,7 +498,8 @@ describe('native.common', () => {
497498
);
498499
expect(payload.jsonMeta.originalBasePath).toBe('');
499500
payload.globPattern = '2018*';
500-
const final = await common.finalizeMediaImport(settings, payload, updater);
501+
const res = await common.finalizeMediaImport(settings, payload);
502+
const final = res.meta;
501503
const expectedImageFiles = [
502504
'/home/user/data/imageLists/successGlob/2018-image2.png',
503505
'/home/user/data/imageLists/successGlob/nested/2018-image1.png',
@@ -537,7 +539,8 @@ describe('native.common', () => {
537539
const payload = await common.beginMediaImport(
538540
'/home/user/data/imageLists/success/image_list.txt',
539541
);
540-
const final = await common.finalizeMediaImport(settings, payload, updater);
542+
const res = await common.finalizeMediaImport(settings, payload);
543+
const final = res.meta;
541544
const annotations = await common.loadDetections(settings, final.id);
542545
expect(Object.keys(annotations.tracks)).toHaveLength(0);
543546

@@ -562,7 +565,7 @@ describe('native.common', () => {
562565
const payload = await common.beginMediaImport('/home/user/data/imageSuccessWithAnnotations');
563566
payload.trackFileAbsPath = ''; //It returns null be default but users change it.
564567
payload.jsonMeta.fps = 12; // simulate user specify FPS action
565-
await common.finalizeMediaImport(settings, payload, updater);
568+
await common.finalizeMediaImport(settings, payload);
566569
const meta = await common.loadMetadata(settings, payload.jsonMeta.id, urlMapper);
567570
expect(meta.fps).toBe(12);
568571
});
@@ -571,20 +574,20 @@ describe('native.common', () => {
571574
const payload = await common.beginMediaImport('/home/user/data/imageSuccessWithAnnotations');
572575
payload.trackFileAbsPath = '/home/user/data/imageSuccessWithAnnotations/file1.csv';
573576
payload.jsonMeta.fps = 12; // simulate user specify FPS action
574-
await common.finalizeMediaImport(settings, payload, updater);
577+
await common.finalizeMediaImport(settings, payload);
575578
const meta = await common.loadMetadata(settings, payload.jsonMeta.id, urlMapper);
576579
expect(meta.fps).toBe(32);
577580
});
578581

579582
it('import with user selected FPS > originalFPS', async () => {
580583
const payload = await common.beginMediaImport('/home/user/data/videoSuccess/video1.mp4');
581584
payload.jsonMeta.fps = 50; // above 30
582-
await common.finalizeMediaImport(settings, payload, updater);
585+
await common.finalizeMediaImport(settings, payload);
583586
const meta1 = await common.loadMetadata(settings, payload.jsonMeta.id, urlMapper);
584587
expect(meta1.fps).toBe(30);
585588

586589
payload.jsonMeta.fps = -1; // above 30
587-
await common.finalizeMediaImport(settings, payload, updater);
590+
await common.finalizeMediaImport(settings, payload);
588591
const meta2 = await common.loadMetadata(settings, payload.jsonMeta.id, urlMapper);
589592
expect(meta2.fps).toBe(1);
590593
});
@@ -600,15 +603,15 @@ describe('native.common', () => {
600603

601604
it('importMedia empty json file success', async () => {
602605
const payload = await common.beginMediaImport('/home/user/data/annotationEmptySuccess/video1.mp4');
603-
await common.finalizeMediaImport(settings, payload, updater);
606+
await common.finalizeMediaImport(settings, payload);
604607
const annotations = await common.loadDetections(settings, payload.jsonMeta.id);
605608
expect(annotations).toEqual(makeEmptyAnnotationFile());
606609
});
607610

608611
it('importMedia include meta.json file ', async () => {
609612
const payload = await common.beginMediaImport('/home/user/data/metaJsonIncluded/video1.mp4');
610613
expect(payload.metaFileAbsPath).toBe('/home/user/data/metaJsonIncluded/meta.json');
611-
await common.finalizeMediaImport(settings, payload, updater);
614+
await common.finalizeMediaImport(settings, payload);
612615
const tracks = await common.loadDetections(settings, payload.jsonMeta.id);
613616
const meta = await common.loadMetadata(settings, payload.jsonMeta.id, urlMapper);
614617
expect(meta?.customTypeStyling?.other.color).toBe('blue');
@@ -618,7 +621,7 @@ describe('native.common', () => {
618621
it('Export meta.json file ', async () => {
619622
const payload = await common.beginMediaImport('/home/user/data/metaJsonIncluded/video1.mp4');
620623
expect(payload.metaFileAbsPath).toBe('/home/user/data/metaJsonIncluded/meta.json');
621-
await common.finalizeMediaImport(settings, payload, updater);
624+
await common.finalizeMediaImport(settings, payload);
622625
const tracks = await common.loadDetections(settings, payload.jsonMeta.id);
623626
const meta = await common.loadMetadata(settings, payload.jsonMeta.id, urlMapper);
624627
expect(meta?.customTypeStyling?.other.color).toBe('blue');
@@ -640,16 +643,15 @@ describe('native.common', () => {
640643
});
641644
it('import first CSV in list', async () => {
642645
const payload = await common.beginMediaImport('/home/user/data/multiCSV/video1.mp4');
643-
await common.finalizeMediaImport(settings, payload, updater);
646+
await common.finalizeMediaImport(settings, payload);
644647
const tracks = await common.loadDetections(settings, payload.jsonMeta.id);
645648
expect(tracks).toEqual(makeEmptyAnnotationFile());
646649
});
647650

648-
it('importMedia video, start conversion', async () => {
651+
it('importMedia video, has conversion file list', async () => {
649652
const payload = await common.beginMediaImport('/home/user/data/videoSuccess/video1.avi');
650-
await common.finalizeMediaImport(settings, payload, updater);
651-
expect(payload.jsonMeta.transcodingJobKey).toBe('jobKey');
652-
expect(payload.jsonMeta.type).toBe('video');
653+
const conversionArgs = await common.finalizeMediaImport(settings, payload);
654+
expect(conversionArgs.mediaList.length).toBeGreaterThan(0);
653655
});
654656

655657
it('check Dastset existence', async () => {
@@ -693,6 +695,7 @@ describe('native.common', () => {
693695

694696
it('processing good Trained Pipeline folder', async () => {
695697
const trainingArgs: RunTraining = {
698+
type: JobType.RunTraining,
696699
datasetIds: ['randomID'],
697700
pipelineName: 'trainedPipelineName',
698701
trainingConfig: 'trainingConfig',
@@ -713,6 +716,7 @@ describe('native.common', () => {
713716

714717
it('processing bad Trained Pipeline folders', async () => {
715718
const trainingArgs: RunTraining = {
719+
type: JobType.RunTraining,
716720
datasetIds: ['randomID'],
717721
pipelineName: 'trainedBadPipelineName',
718722
trainingConfig: 'trainingConfig',
@@ -728,6 +732,7 @@ describe('native.common', () => {
728732

729733
it('getPipelineList lists pipelines with Trained pipelines', async () => {
730734
const trainingArgs: RunTraining = {
735+
type: JobType.RunTraining,
731736
datasetIds: ['randomID'],
732737
pipelineName: 'trainedPipelineName',
733738
trainingConfig: 'trainingConfig',
@@ -771,7 +776,8 @@ describe('native.common', () => {
771776
'9.png',
772777
]);
773778
// eslint-disable-next-line no-await-in-loop
774-
const final = await common.finalizeMediaImport(settings, payload, updater);
779+
const res = await common.finalizeMediaImport(settings, payload);
780+
const final = res.meta;
775781
expect(final.attributes).toEqual(testData[num][2]);
776782
// eslint-disable-next-line no-await-in-loop
777783
const tracks = await common.loadDetections(settings, final.id);

client/platform/desktop/backend/native/common.ts

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,16 @@ import * as dive from 'platform/desktop/backend/serializers/dive';
3030
import kpf from 'platform/desktop/backend/serializers/kpf';
3131
// TODO: Check to Refactor this
3232
// eslint-disable-next-line import/no-cycle
33-
import { checkMedia, convertMedia } from 'platform/desktop/backend/native/mediaJobs';
33+
import { checkMedia } from 'platform/desktop/backend/native/mediaJobs';
3434
import {
3535
websafeImageTypes, websafeVideoTypes, otherImageTypes, otherVideoTypes, MultiType, JsonMetaRegEx,
3636
} from 'dive-common/constants';
3737
import {
38-
JsonMeta, Settings, JsonMetaCurrentVersion, DesktopMetadata, DesktopJobUpdater,
38+
JsonMeta, Settings, JsonMetaCurrentVersion, DesktopMetadata,
3939
RunTraining, ExportDatasetArgs, DesktopMediaImportResponse,
40-
ExportConfigurationArgs, JobsFolderName, ProjectsFolderName, PipelinesFolderName,
40+
ExportConfigurationArgs, JobsFolderName, ProjectsFolderName,
41+
PipelinesFolderName, ConversionArgs,
42+
JobType,
4143
} from 'platform/desktop/constants';
4244
import {
4345
cleanString, filterByGlob, makeid, strNumericCompare,
@@ -1084,7 +1086,7 @@ async function _importTrackFile(
10841086
/**
10851087
* After media conversion we need to remove the transcodingKey to signify it is done
10861088
*/
1087-
async function completeConversion(settings: Settings, datasetId: string, transcodingJobKey: string, meta: JsonMeta) {
1089+
export async function completeConversion(settings: Settings, datasetId: string, transcodingJobKey: string, meta: JsonMeta) {
10881090
await getValidatedProjectDir(settings, datasetId);
10891091
if (meta.transcodingJobKey === transcodingJobKey) {
10901092
// eslint-disable-next-line no-param-reassign
@@ -1099,8 +1101,7 @@ async function completeConversion(settings: Settings, datasetId: string, transco
10991101
async function finalizeMediaImport(
11001102
settings: Settings,
11011103
args: DesktopMediaImportResponse,
1102-
updater: DesktopJobUpdater,
1103-
) {
1104+
): Promise<ConversionArgs> {
11041105
const { jsonMeta, globPattern } = args;
11051106
let { mediaConvertList } = args;
11061107
const { type: datasetType } = jsonMeta;
@@ -1133,10 +1134,10 @@ async function finalizeMediaImport(
11331134
}
11341135
}
11351136

1136-
//Now we will kick off any conversions that are necessary
1137-
let jobBase = null;
1137+
// Determine which files, if any, need to be queued for conversion. Consumers
1138+
// of this function are responsible for starting the conversion.
1139+
const srcDstList: [string, string][] = [];
11381140
if (mediaConvertList.length) {
1139-
const srcDstList: [string, string][] = [];
11401141
const extension = datasetType === 'video' ? '.mp4' : '.png';
11411142
let destAbsPath = '';
11421143
mediaConvertList.forEach((absPath) => {
@@ -1158,16 +1159,6 @@ async function finalizeMediaImport(
11581159
}
11591160
srcDstList.push([absPath, destAbsPath]);
11601161
});
1161-
jobBase = await convertMedia(
1162-
settings,
1163-
{
1164-
meta: jsonMeta,
1165-
mediaList: srcDstList,
1166-
},
1167-
updater,
1168-
(jobKey, meta) => completeConversion(settings, jsonMeta.id, jobKey, meta),
1169-
);
1170-
jsonMeta.transcodingJobKey = jobBase.key;
11711162
}
11721163

11731164
//We need to create datasets for each of the multiCam folders as well
@@ -1197,7 +1188,12 @@ async function finalizeMediaImport(
11971188
if (args.metaFileAbsPath) {
11981189
await dataFileImport(settings, jsonMeta.id, args.metaFileAbsPath);
11991190
}
1200-
return finalJsonMeta;
1191+
const conversionJobArgs: ConversionArgs = {
1192+
type: JobType.Conversion,
1193+
meta: finalJsonMeta,
1194+
mediaList: srcDstList,
1195+
};
1196+
return conversionJobArgs;
12011197
}
12021198

12031199
async function openLink(url: string) {

client/platform/desktop/backend/native/mediaJobs.ts

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ async function convertMedia(
169169
args: ConversionArgs,
170170
updater: DesktopJobUpdater,
171171
onComplete?: (jobKey: string, meta: JsonMeta) => void,
172+
setTranscodingKey = false,
172173
mediaIndex = 0,
173174
key = '',
174175
baseWorkDir = '',
@@ -205,7 +206,10 @@ async function convertMedia(
205206
exitCode: job.exitCode,
206207
startTime: new Date(),
207208
};
208-
209+
if (setTranscodingKey) {
210+
// eslint-disable-next-line no-param-reassign
211+
args.meta.transcodingJobKey = jobBase.key;
212+
}
209213
fs.writeFile(npath.join(jobWorkDir, DiveJobManifestName), JSON.stringify(jobBase, null, 2));
210214

211215
job.stdout.on('data', jobFileEchoMiddleware(jobBase, updater, joblog));
@@ -247,7 +251,7 @@ async function convertMedia(
247251
...jobBase,
248252
body: [`Conversion ${mediaIndex + 1} of ${args.mediaList.length} Complete`],
249253
});
250-
convertMedia(settings, args, updater, onComplete, mediaIndex + 1, jobKey, jobWorkDir);
254+
convertMedia(settings, args, updater, onComplete, setTranscodingKey, mediaIndex + 1, jobKey, jobWorkDir);
251255
}
252256
}
253257
});

client/platform/desktop/constants.ts

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -152,19 +152,33 @@ export interface NvidiaSmiReply {
152152
error: string;
153153
}
154154

155+
export enum JobType {
156+
Conversion,
157+
ExportTrainedPipeline,
158+
RunPipeline,
159+
RunTraining,
160+
}
161+
162+
export interface JobArgs {
163+
type: JobType;
164+
}
165+
155166
/** TODO promote to apispec */
156-
export interface RunPipeline {
167+
export interface RunPipeline extends JobArgs {
168+
type: JobType.RunPipeline;
157169
datasetId: string;
158170
pipeline: Pipe;
159171
}
160172

161-
export interface ExportTrainedPipeline {
173+
export interface ExportTrainedPipeline extends JobArgs {
174+
type: JobType.ExportTrainedPipeline;
162175
path: string;
163176
pipeline: Pipe;
164177
}
165178

166179
/** TODO promote to apispec */
167-
export interface RunTraining {
180+
export interface RunTraining extends JobArgs {
181+
type: JobType.RunTraining;
168182
// datasets to run training on
169183
datasetIds: string[];
170184
// new pipeline name to be created
@@ -184,11 +198,14 @@ export interface RunTraining {
184198
};
185199
}
186200

187-
export interface ConversionArgs {
201+
export interface ConversionArgs extends JobArgs {
202+
type: JobType.Conversion;
188203
meta: JsonMeta;
189204
mediaList: [string, string][];
190205
}
191206

207+
export type Job = ConversionArgs | RunPipeline | RunTraining | ExportTrainedPipeline;
208+
192209
export interface DesktopJob {
193210
// key unique identifier for this job
194211
key: string;

0 commit comments

Comments
 (0)