Skip to content

Commit f2e1909

Browse files
committed
fix: importer..
1 parent 7f85b2a commit f2e1909

File tree

2 files changed

+70
-70
lines changed

2 files changed

+70
-70
lines changed

apps/worker/src/jobs/import.ts

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import {
22
backfillSessionsToProduction,
3+
cleanupSessionStartEndEvents,
34
cleanupStagingData,
45
createSessionsStartEndEvents,
56
db,
@@ -27,7 +28,7 @@ function yieldToEventLoop(): Promise<void> {
2728
});
2829
}
2930

30-
const PRODUCTION_STEPS = ['moving', 'backfilling_sessions'];
31+
const RESUMABLE_STEPS = ['creating_sessions', 'moving', 'backfilling_sessions'];
3132

3233
export async function importJob(job: Job<ImportQueuePayload>) {
3334
const { importId } = job.data.payload;
@@ -45,16 +46,16 @@ export async function importJob(job: Job<ImportQueuePayload>) {
4546

4647
try {
4748
const isRetry = record.currentStep !== null;
48-
const hasReachedProduction =
49-
isRetry && PRODUCTION_STEPS.includes(record.currentStep as string);
49+
const canResume =
50+
isRetry && RESUMABLE_STEPS.includes(record.currentStep as string);
5051

5152
// -------------------------------------------------------
5253
// STAGING PHASE: clean slate on failure, run from scratch
5354
// -------------------------------------------------------
54-
if (!hasReachedProduction) {
55+
if (!canResume) {
5556
if (isRetry) {
5657
jobLogger.info(
57-
'Retry detected before production phase — cleaning staging data'
58+
'Retry detected before resumable phase — cleaning staging data'
5859
);
5960
await cleanupStagingData(importId);
6061
}
@@ -183,8 +184,22 @@ export async function importJob(job: Job<ImportQueuePayload>) {
183184
await yieldToEventLoop();
184185
jobLogger.info('Session ID generation complete');
185186
}
187+
}
188+
189+
// -------------------------------------------------------
190+
// SESSION CREATION PHASE: resumable by cleaning session_start/end
191+
// -------------------------------------------------------
192+
const skipSessionCreation =
193+
canResume && record.currentStep !== 'creating_sessions';
194+
195+
if (!skipSessionCreation) {
196+
if (canResume && record.currentStep === 'creating_sessions') {
197+
jobLogger.info(
198+
'Retry at creating_sessions — cleaning existing session_start/end events'
199+
);
200+
await cleanupSessionStartEndEvents(importId);
201+
}
186202

187-
// Phase 3: Create session_start / session_end events
188203
await updateImportStatus(jobLogger, job, importId, {
189204
step: 'creating_sessions',
190205
batch: 'all sessions',
@@ -201,13 +216,15 @@ export async function importJob(job: Job<ImportQueuePayload>) {
201216

202217
// Phase 3: Move staging events to production (per-day)
203218
const resumeMovingFrom =
204-
hasReachedProduction && record.currentStep === 'moving'
219+
canResume && record.currentStep === 'moving'
205220
? (record.currentBatch ?? undefined)
206221
: undefined;
207222

208223
// currentBatch is the last successfully completed day — resume from the next day to avoid re-inserting it
209224
const moveFromDate = (() => {
210-
if (!resumeMovingFrom) return undefined;
225+
if (!resumeMovingFrom) {
226+
return undefined;
227+
}
211228
const next = new Date(`${resumeMovingFrom}T12:00:00Z`);
212229
next.setUTCDate(next.getUTCDate() + 1);
213230
return next.toISOString().split('T')[0]!;

packages/db/src/services/import.service.ts

Lines changed: 45 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ import {
88
TABLE_NAMES,
99
} from '../clickhouse/client';
1010
import { db, type Prisma } from '../prisma-client';
11-
import type { IClickhouseProfile } from './profile.service';
1211
import type { IClickhouseEvent } from './event.service';
12+
import type { IClickhouseProfile } from './profile.service';
1313

1414
export interface ImportStageResult {
1515
importId: string;
@@ -172,38 +172,6 @@ export async function insertProfilesBatch(
172172
return { inserted: normalized.length };
173173
}
174174

175-
176-
177-
178-
179-
180-
181-
182-
183-
184-
185-
186-
187-
188-
189-
190-
191-
192-
193-
194-
195-
196-
197-
198-
199-
200-
201-
202-
203-
204-
205-
206-
207175
/**
208176
* Delete all staging data for an import. Used to get a clean slate on retry
209177
* when the failure happened before moving data to production.
@@ -222,6 +190,22 @@ export async function cleanupStagingData(importId: string): Promise<void> {
222190
});
223191
}
224192

193+
export async function cleanupSessionStartEndEvents(
194+
importId: string
195+
): Promise<void> {
196+
const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports);
197+
await ch.command({
198+
query: `ALTER TABLE ${mutationTableName} DELETE WHERE import_id = {importId:String} AND name IN ('session_start', 'session_end')`,
199+
query_params: { importId },
200+
clickhouse_settings: {
201+
wait_end_of_query: 1,
202+
mutations_sync: '2',
203+
send_progress_in_http_headers: 1,
204+
http_headers_progress_interval_ms: '50000',
205+
},
206+
});
207+
}
208+
225209
/**
226210
* Reconstruct sessions across ALL dates for the import.
227211
* Each session_id gets exactly one session_start and one session_end,
@@ -242,27 +226,16 @@ export async function createSessionsStartEndEvents(
242226
"name NOT IN ('session_start', 'session_end')",
243227
].join(' AND ');
244228

245-
while (true) {
246-
const idsResult = await ch.query({
247-
query: `
248-
SELECT DISTINCT session_id
249-
FROM ${TABLE_NAMES.events_imports}
250-
WHERE ${baseWhere}
251-
AND session_id > {lastSessionId:String}
252-
ORDER BY session_id
253-
LIMIT {limit:UInt32}
254-
`,
255-
query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE },
256-
format: 'JSONEachRow',
257-
});
258-
259-
const idRows = (await idsResult.json()) as Array<{ session_id: string }>;
260-
if (idRows.length === 0) {
261-
break;
262-
}
263-
264-
const sessionIds = idRows.map((r) => r.session_id);
229+
const sessionBatchSubquery = `
230+
(SELECT DISTINCT session_id
231+
FROM ${TABLE_NAMES.events_imports}
232+
WHERE ${baseWhere}
233+
AND session_id > {lastSessionId:String}
234+
ORDER BY session_id
235+
LIMIT {limit:UInt32})
236+
`;
265237

238+
while (true) {
266239
const sessionEventsQuery = `
267240
SELECT
268241
device_id,
@@ -279,13 +252,13 @@ export async function createSessionsStartEndEvents(
279252
max(created_at) AS last_timestamp
280253
FROM ${TABLE_NAMES.events_imports}
281254
WHERE ${baseWhere}
282-
AND session_id IN ({sessionIds:Array(String)})
255+
AND session_id IN ${sessionBatchSubquery}
283256
GROUP BY session_id, device_id, project_id
284257
`;
285258

286259
const sessionEventsResult = await ch.query({
287260
query: sessionEventsQuery,
288-
query_params: { importId, sessionIds },
261+
query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE },
289262
format: 'JSONEachRow',
290263
});
291264

@@ -438,8 +411,11 @@ export async function createSessionsStartEndEvents(
438411
await insertImportBatch(sessionEvents, importId);
439412
}
440413

441-
lastSessionId = idRows[idRows.length - 1]!.session_id;
442-
if (idRows.length < SESSION_BATCH_SIZE) {
414+
if (sessionData.length === 0) {
415+
break;
416+
}
417+
lastSessionId = sessionData.at(-1)!.session_id;
418+
if (sessionData.length < SESSION_BATCH_SIZE) {
443419
break;
444420
}
445421
}
@@ -500,6 +476,15 @@ export async function backfillSessionsToProduction(
500476
const SESSION_BATCH_SIZE = 5000;
501477
let lastSessionId = '';
502478

479+
const baseWhere = 'import_id = {importId:String} AND session_id > {lastSessionId:String}';
480+
const sessionBatchSubquery = `
481+
(SELECT DISTINCT session_id
482+
FROM ${TABLE_NAMES.events_imports}
483+
WHERE ${baseWhere}
484+
ORDER BY session_id
485+
LIMIT {limit:UInt32})
486+
`;
487+
503488
while (true) {
504489
const idsResult = await ch.query({
505490
query: `
@@ -519,8 +504,6 @@ export async function backfillSessionsToProduction(
519504
break;
520505
}
521506

522-
const sessionIds = idRows.map((r) => r.session_id);
523-
524507
const sessionsInsertQuery = `
525508
INSERT INTO ${TABLE_NAMES.sessions} (
526509
id, project_id, profile_id, device_id, created_at, ended_at,
@@ -577,21 +560,21 @@ export async function backfillSessionsToProduction(
577560
FROM ${TABLE_NAMES.events_imports} e
578561
WHERE
579562
e.import_id = {importId:String}
580-
AND e.session_id IN ({sessionIds:Array(String)})
563+
AND e.session_id IN ${sessionBatchSubquery}
581564
GROUP BY e.session_id
582565
`;
583566

584567
await ch.command({
585568
query: sessionsInsertQuery,
586-
query_params: { importId, sessionIds },
569+
query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE },
587570
clickhouse_settings: {
588571
wait_end_of_query: 1,
589572
send_progress_in_http_headers: 1,
590573
http_headers_progress_interval_ms: '50000',
591574
},
592575
});
593576

594-
lastSessionId = idRows[idRows.length - 1]!.session_id;
577+
lastSessionId = idRows.at(-1)!.session_id;
595578
if (idRows.length < SESSION_BATCH_SIZE) {
596579
break;
597580
}

0 commit comments

Comments
 (0)