Skip to content

Commit 888e8b5

Browse files
authored
S3 assetstore import job (#1525)
* generateQuickVideos script to generate a large number of videos * update crud_rpc.postprocess to return list of jobIds started if multiple jobs are started * add single batch post process job that spawns other jobs
1 parent 296654b commit 888e8b5

File tree

7 files changed

+572
-17
lines changed

7 files changed

+572
-17
lines changed

client/platform/web-girder/api/dataset.service.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ async function importAnnotationFile(parentId: string, path: string, file?: HTMLF
112112
});
113113
if (uploadResponse.status === 200) {
114114
const final = await postProcess(parentId, true, false, additive, additivePrepend, set);
115-
if (final.data.length > 1) {
116-
const warnings = final.data[1];
115+
if (final.data.warnings !== undefined) {
116+
const { warnings } = final.data;
117117
return warnings;
118118
}
119119

client/platform/web-girder/api/rpc.service.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import type { GirderModel } from '@girder/components/src';
33
import { Pipe } from 'dive-common/apispec';
44

55
function postProcess(folderId: string, skipJobs = false, skipTranscoding = false, additive = false, additivePrepend = '', set: string | undefined = undefined) {
6-
return girderRest.post<[GirderModel, string[]]>(`dive_rpc/postprocess/${folderId}`, null, {
6+
return girderRest.post<{folder: GirderModel, warnings: string[], job_ids: string[]}>(`dive_rpc/postprocess/${folderId}`, null, {
77
params: {
88
skipJobs, skipTranscoding, additive, additivePrepend, set,
99
},

client/platform/web-girder/views/Admin/AdminJobs.vue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export default defineComponent({
3535
const store = useStore();
3636
const { prompt } = usePrompt();
3737
const table: Ref<(GirderJob & {type: string})[]> = ref([]);
38-
const jobTypes: Ref<string[]> = ref(['convert', 'celery', 'large_image_tiff', 'pipelines', 'private', 'training']);
38+
const jobTypes: Ref<string[]> = ref(['convert', 'celery', 'large_image_tiff', 'pipelines', 'private', 'training', 'DIVE Batch Postprocess']);
3939
const jobStatusList: Ref<string[]> = ref(['Cancelled', 'Error', 'Inactive', 'Running', 'Cancelling', 'Success']);
4040
const filterStatus: Ref<string[]> = ref(['Running', 'Error', 'Inactive']);
4141
const filterTypes: Ref<string[]> = ref(jobTypes.value);
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
# /// script
2+
# requires-python = ">=3.8"
3+
# dependencies = [
4+
# "click",
5+
# "faker",
6+
# ]
7+
# ///
8+
import subprocess
9+
import shutil
10+
import random
11+
import json
12+
import math
13+
from pathlib import Path
14+
import click
15+
from faker import Faker
16+
17+
fake = Faker()
18+
19+
FRAME_WIDTH = 1280
20+
FRAME_HEIGHT = 720
21+
FPS = 30
22+
23+
24+
def create_base_video(file_path: Path, duration: int):
25+
"""Create a single base test video using ffmpeg (MP4 container, H.264 codec)."""
26+
cmd = [
27+
"ffmpeg", "-y",
28+
"-f", "lavfi", "-i", f"testsrc=size={FRAME_WIDTH}x{FRAME_HEIGHT}:rate={FPS}",
29+
"-t", str(duration),
30+
"-c:v", "libx264",
31+
"-pix_fmt", "yuv420p",
32+
str(file_path)
33+
]
34+
subprocess.run(cmd, check=True)
35+
36+
37+
def copy_video_to_new_location(base_video: Path, target_path: Path):
38+
"""Copy the base video to a new file path."""
39+
shutil.copy2(base_video, target_path)
40+
41+
42+
# ---- Geometry + Annotations ----
43+
44+
def generate_star_points(cx, cy, r, spikes=5):
45+
pts = []
46+
angle = math.pi / spikes
47+
for i in range(2 * spikes):
48+
radius = r if i % 2 == 0 else r / 2
49+
x = cx + math.cos(i * angle) * radius
50+
y = cy + math.sin(i * angle) * radius
51+
pts.append([x, y])
52+
pts.append(pts[0])
53+
return pts
54+
55+
56+
def generate_diamond_points(cx, cy, r):
57+
return [
58+
[cx, cy - r],
59+
[cx + r, cy],
60+
[cx, cy + r],
61+
[cx - r, cy],
62+
[cx, cy - r],
63+
]
64+
65+
66+
def generate_circle_points(cx, cy, r, segments=12):
67+
pts = []
68+
for i in range(segments + 1):
69+
angle = 2 * math.pi * i / segments
70+
x = cx + math.cos(angle) * r
71+
y = cy + math.sin(angle) * r
72+
pts.append([x, y])
73+
return pts
74+
75+
76+
def generate_geometry(shape: str, cx: float, cy: float, size: float):
77+
if shape == "star":
78+
coords = generate_star_points(cx, cy, size)
79+
elif shape == "diamond":
80+
coords = generate_diamond_points(cx, cy, size)
81+
elif shape == "circle":
82+
coords = generate_circle_points(cx, cy, size)
83+
else: # rectangle fallback
84+
half = size
85+
coords = [
86+
[cx-half, cy-half],
87+
[cx+half, cy-half],
88+
[cx+half, cy+half],
89+
[cx-half, cy+half],
90+
[cx-half, cy-half]
91+
]
92+
return {'geojson': {
93+
"type": "FeatureCollection",
94+
"features": [
95+
{
96+
"type": "Feature",
97+
"geometry": {
98+
"type": "Polygon",
99+
"coordinates": [coords]
100+
},
101+
"properties": { "key": "" }
102+
}
103+
]
104+
},
105+
'coords': coords
106+
}
107+
108+
109+
def geometry_bounds(coords):
110+
xs = [pt[0] for pt in coords]
111+
ys = [pt[1] for pt in coords]
112+
return [min(xs), min(ys), max(xs), max(ys)]
113+
114+
115+
def generate_annotation_json(num_frames: int, output_file: Path):
116+
"""Generate annotation JSON with moving/scaling geometry."""
117+
num_tracks = random.randint(3, 5)
118+
tracks = {}
119+
120+
for i in range(num_tracks):
121+
track_id = i
122+
shape_type = random.choice(["circle", "rectangle", "diamond", "star"])
123+
begin = 0
124+
end = num_frames - 1
125+
126+
x, y = random.randint(100, FRAME_WIDTH-100), random.randint(100, FRAME_HEIGHT-100)
127+
dx, dy = random.choice([-5, 5]), random.choice([-3, 3])
128+
base_size = random.randint(40, 80)
129+
growth_rate = random.uniform(0.05, 0.15)
130+
131+
features = []
132+
for frame in range(num_frames):
133+
x += dx
134+
y += dy
135+
if x < 50 or x > FRAME_WIDTH-50:
136+
dx *= -1
137+
x += dx
138+
if y < 50 or y > FRAME_HEIGHT-50:
139+
dy *= -1
140+
y += dy
141+
142+
scale = 0.5 * (1 + math.sin(growth_rate * frame))
143+
size = base_size * (0.75 + 0.5 * scale)
144+
145+
output_data = generate_geometry(shape_type, x, y, size)
146+
geom = output_data['geojson']
147+
coords = output_data['coords']
148+
bounds = geometry_bounds(coords)
149+
150+
feature = {
151+
"frame": frame,
152+
"bounds": bounds,
153+
"keyframe": True,
154+
"geometry": geom
155+
}
156+
features.append(feature)
157+
158+
tracks[str(track_id)] = {
159+
"id": track_id,
160+
"meta": {"shape": shape_type},
161+
"attributes": {},
162+
"confidencePairs": [[fake.word(), float(random.randrange(0, 100)/100)]],
163+
"begin": begin,
164+
"end": end,
165+
"features": features
166+
}
167+
168+
annotation = {
169+
"tracks": tracks,
170+
"groups": {},
171+
"version": 2
172+
}
173+
with open(output_file, "w") as f:
174+
json.dump(annotation, f, indent=2)
175+
176+
177+
# ---- Main orchestration ----
178+
179+
@click.command()
180+
@click.option('--output', '-o', default='./sample', show_default=True,
181+
type=click.Path(file_okay=False), help="Base output directory")
182+
@click.option('--folders', '-f', default=3, show_default=True,
183+
help="Number of top-level folders to create")
184+
@click.option('--max-depth', '-d', default=2, show_default=True,
185+
help="Maximum subfolder depth")
186+
@click.option('--videos', '-v', default=4, show_default=True,
187+
help="Number of videos per folder")
188+
@click.option('--total', '-t', default=50, show_default=True,
189+
help="Total number of videos to create")
190+
@click.option('--duration', default=10, show_default=True,
191+
help="Duration (in seconds) of each video")
192+
def main(output, folders, max_depth, videos, total, duration):
193+
base_path = Path(output)
194+
base_path.mkdir(parents=True, exist_ok=True)
195+
196+
# Create one base video
197+
base_video = base_path / "base_video.mp4"
198+
click.echo(f"Creating base video of {duration}s...")
199+
create_base_video(base_video, duration)
200+
201+
num_frames = duration * FPS
202+
count = 0
203+
for f in range(folders):
204+
if count >= total:
205+
break
206+
folder_path = base_path / fake.word()
207+
folder_path.mkdir(parents=True, exist_ok=True)
208+
209+
stack = [(folder_path, 1)]
210+
while stack and count < total:
211+
current_path, depth = stack.pop()
212+
213+
for _ in range(videos):
214+
if count >= total:
215+
break
216+
name = fake.word()
217+
video_path = current_path / f"{name}.mp4"
218+
json_path = current_path / f"{name}.json"
219+
220+
# Copy base video and create new annotations
221+
copy_video_to_new_location(base_video, video_path)
222+
generate_annotation_json(num_frames, json_path)
223+
count += 1
224+
225+
if depth < max_depth:
226+
for _ in range(2): # up to 2 subfolders each
227+
subfolder = current_path / fake.word()
228+
subfolder.mkdir(parents=True, exist_ok=True)
229+
stack.append((subfolder, depth+1))
230+
231+
click.echo(f"Done! Created {count} videos (all {duration}s long, with annotations).")
232+
233+
234+
if __name__ == '__main__':
235+
main()

server/dive_server/crud_rpc.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -569,7 +569,7 @@ def postprocess(
569569
additive=False,
570570
additivePrepend='',
571571
set='',
572-
) -> Tuple[types.GirderModel, Optional[List[str]]]:
572+
) -> dict:
573573
"""
574574
Post-processing to be run after media/annotation import
575575
@@ -581,12 +581,17 @@ def postprocess(
581581
582582
In either case, the following may run synchronously:
583583
Conversion of CSV annotations into track JSON
584+
Returns:
585+
dict: Contains 'folder' (the processed folder) and 'job_ids' (list of created job IDs)
584586
"""
585587
job_is_private = user.get(constants.UserPrivateQueueEnabledMarker, False)
586588
isClone = dsFolder.get(constants.ForeignMediaIdMarker, None) is not None
587589
# add default confidence filter threshold to folder metadata
588590
dsFolder['meta'][constants.ConfidenceFiltersMarker] = {'default': 0.1}
589591

592+
# Track job IDs for batch processing
593+
created_job_ids = []
594+
590595
# Validate user-supplied metadata fields are present
591596
if fromMeta(dsFolder, constants.FPSMarker) is None:
592597
raise RestException(f'{constants.FPSMarker} missing from metadata')
@@ -625,7 +630,8 @@ def postprocess(
625630
newjob.job[constants.JOBCONST_DATASET_ID] = str(item["folderId"])
626631
newjob.job[constants.JOBCONST_CREATOR] = str(user['_id'])
627632
Job().save(newjob.job)
628-
return dsFolder
633+
created_job_ids.append(newjob.job['_id'])
634+
return {'folder': dsFolder, 'job_ids': created_job_ids}
629635

630636
# transcode VIDEO if necessary
631637
videoItems = Folder().childItems(
@@ -649,6 +655,7 @@ def postprocess(
649655
newjob.job[constants.JOBCONST_PRIVATE_QUEUE] = job_is_private
650656
newjob.job[constants.JOBCONST_DATASET_ID] = dsFolder["_id"]
651657
Job().save(newjob.job)
658+
created_job_ids.append(newjob.job['_id'])
652659

653660
# transcode IMAGERY if necessary
654661
imageItems = Folder().childItems(
@@ -676,7 +683,8 @@ def postprocess(
676683
newjob.job[constants.JOBCONST_PRIVATE_QUEUE] = job_is_private
677684
newjob.job[constants.JOBCONST_DATASET_ID] = dsFolder["_id"]
678685
Job().save(newjob.job)
679-
686+
created_job_ids.append(newjob.job['_id'])
687+
680688
elif imageItems.count() > 0:
681689
dsFolder["meta"][constants.DatasetMarker] = True
682690
elif largeImageItems.count() > 0:
@@ -685,7 +693,7 @@ def postprocess(
685693
Folder().save(dsFolder)
686694

687695
aggregate_warnings = process_items(dsFolder, user, additive, additivePrepend, set)
688-
return dsFolder, aggregate_warnings
696+
return {'folder': dsFolder, 'warnings': aggregate_warnings, 'job_ids': created_job_ids}
689697

690698

691699
def convert_large_image(

0 commit comments

Comments
 (0)