diff --git a/docs/pages/product/caching/refreshing-pre-aggregations.mdx b/docs/pages/product/caching/refreshing-pre-aggregations.mdx index 1e947f009331b..c6207695e4d5d 100644 --- a/docs/pages/product/caching/refreshing-pre-aggregations.mdx +++ b/docs/pages/product/caching/refreshing-pre-aggregations.mdx @@ -16,6 +16,7 @@ behavior: - `CUBEJS_REFRESH_WORKER_CONCURRENCY` (see also `CUBEJS_CONCURRENCY`) - `CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID` - `CUBEJS_DROP_PRE_AGG_WITHOUT_TOUCH` +- `CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME` ## Troubleshooting diff --git a/docs/pages/product/configuration/reference/environment-variables.mdx b/docs/pages/product/configuration/reference/environment-variables.mdx index be11ea3eb7ea8..ba110d1317945 100644 --- a/docs/pages/product/configuration/reference/environment-variables.mdx +++ b/docs/pages/product/configuration/reference/environment-variables.mdx @@ -1155,6 +1155,17 @@ This can be overridden for individual pre-aggregations using the | --------------- | ---------------------- | --------------------- | | `true`, `false` | `true` | `true` | +## `CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME` + +The maximum time, in seconds, for exponential backoff for retries when pre-aggregation +builds fail. When a pre-aggregation refresh fails, retries will be executed with +exponentially increasing delays, but the delay will not exceed the value specified by +this variable. + +| Possible Values | Default in Development | Default in Production | +| ------------------------- | ---------------------- | --------------------- | +| A valid number in seconds | `600` | `600` | + ## `CUBEJS_REFRESH_WORKER` If `true`, this instance of Cube will **only** refresh pre-aggregations. diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index 327a5b14d3b2b..4781b0e3688d5 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -751,6 +751,13 @@ const variables: Record any> = { .default(60 * 60 * 24) .asIntPositive(), + /** + * Maximum time for exponential backoff for pre-aggs (in seconds) + */ + preAggBackoffMaxTime: (): number => get('CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME') + .default(10 * 60) + .asIntPositive(), + /** * Expire time for touch records */ diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index 43604fc040234..cbf90c511e279 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -251,6 +251,8 @@ export class PreAggregations { private readonly touchTablePersistTime: number; + private readonly preAggBackoffMaxTime: number; + public readonly dropPreAggregationsWithoutTouch: boolean; private readonly usedTablePersistTime: number; @@ -277,6 +279,7 @@ export class PreAggregations { this.externalDriverFactory = options.externalDriverFactory; this.structureVersionPersistTime = options.structureVersionPersistTime || 60 * 60 * 24 * 30; this.touchTablePersistTime = options.touchTablePersistTime || getEnv('touchPreAggregationTimeout'); + this.preAggBackoffMaxTime = options.preAggBackoffMaxTime || getEnv('preAggBackoffMaxTime'); this.dropPreAggregationsWithoutTouch = options.dropPreAggregationsWithoutTouch || getEnv('dropPreAggregationsWithoutTouch'); this.usedTablePersistTime = options.usedTablePersistTime || getEnv('dbQueryTimeout'); this.externalRefresh = options.externalRefresh; @@ -317,6 +320,11 @@ export class PreAggregations { return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName); } + protected preAggBackoffRedisKey(tableName: string): string { + // TODO add dataSource? + return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_BACKOFF', tableName); + } + protected refreshEndReachedKey() { // TODO add dataSource? return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_REFRESH_END_REACHED', ''); @@ -372,6 +380,36 @@ export class PreAggregations { .map(k => k.replace(this.tablesTouchRedisKey(''), '')); } + public async updatePreAggBackoff(tableName: string, backoffData: { backoffMultiplier: number, nextTimestamp: Date }): Promise { + await this.queryCache.getCacheDriver().set( + this.preAggBackoffRedisKey(tableName), + JSON.stringify(backoffData), + this.preAggBackoffMaxTime + ); + } + + public async removePreAggBackoff(tableName: string): Promise { + await this.queryCache.getCacheDriver().remove(this.preAggBackoffRedisKey(tableName)); + } + + public getPreAggBackoffMaxTime(): number { + return this.preAggBackoffMaxTime; + } + + public async getPreAggBackoff(tableName: string): Promise<{ backoffMultiplier: number, nextTimestamp: Date } | null> { + const res = await this.queryCache.getCacheDriver().get(this.preAggBackoffRedisKey(tableName)); + + if (!res) { + return null; + } + + const parsed = JSON.parse(res); + return { + backoffMultiplier: parsed.backoffMultiplier, + nextTimestamp: new Date(parsed.nextTimestamp), + }; + } + public async updateRefreshEndReached() { return this.queryCache.getCacheDriver().set(this.refreshEndReachedKey(), new Date().getTime(), this.touchTablePersistTime); } diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index 50b0d9b8dc924..e94001b9a6bb4 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -615,7 +615,49 @@ export class RefreshScheduler { const currentQuery = await queryIterator.current(); if (currentQuery && queryIterator.partitionCounter() % concurrency === workerIndex) { const orchestratorApi = await this.serverCore.getOrchestratorApi(context); - await orchestratorApi.executeQuery({ ...currentQuery, preAggregationsLoadCacheByDataSource }); + const preAggsInstance = orchestratorApi.getQueryOrchestrator().getPreAggregations(); + const now = new Date(); + + const backoffChecks = await Promise.all( + currentQuery.preAggregations.map(p => preAggsInstance.getPreAggBackoff(p.tableName)) + ); + + // Skip execution if any pre-aggregation is still in backoff window + const shouldSkip = backoffChecks.some(backoffData => backoffData && now < backoffData.nextTimestamp); + + if (!shouldSkip) { + try { + await orchestratorApi.executeQuery({ ...currentQuery, preAggregationsLoadCacheByDataSource }); + } catch (e: any) { + // Check if this is a "Continue wait" error - these are normal queue signals + // For Continue wait errors, re-throw to handle them in the normal flow + if (e.error === 'Continue wait') { + throw e; + } + + // Real datasource error - apply exponential backoff + for (const p of currentQuery.preAggregations) { + let backoffData = await preAggsInstance.getPreAggBackoff(p.tableName); + + if (backoffData && backoffData.backoffMultiplier > 0) { + const newMultiplier = backoffData.backoffMultiplier * 2; + const delaySeconds = Math.min(newMultiplier, preAggsInstance.getPreAggBackoffMaxTime()); + + backoffData = { + backoffMultiplier: newMultiplier, + nextTimestamp: new Date(now.valueOf() + delaySeconds * 1000), + }; + } else { + backoffData = { + backoffMultiplier: 1, + nextTimestamp: new Date(now.valueOf() + 1000), + }; + } + + await preAggsInstance.updatePreAggBackoff(p.tableName, backoffData); + } + } + } } const hasNext = await queryIterator.advance(); if (!hasNext) { diff --git a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts index 22309182253cc..8a274e0256f63 100644 --- a/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts +++ b/packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts @@ -248,6 +248,12 @@ class MockDriver extends BaseDriver { private schema: any; + public shouldFailQuery: boolean = false; + + public failQueryPattern: RegExp | null = null; + + public queryAttempts: number = 0; + public constructor() { super(); } @@ -257,9 +263,22 @@ class MockDriver extends BaseDriver { public query(query) { this.executedQueries.push(query); + + // Track query attempts for backoff testing + if (this.failQueryPattern && query.match(this.failQueryPattern)) { + this.queryAttempts++; + } + let promise: any = Promise.resolve([query]); promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 150))); + // Simulate query failure for backoff testing + if (this.shouldFailQuery && this.failQueryPattern && query.match(this.failQueryPattern)) { + promise = promise.then(() => { + throw new Error('Simulated datasource error'); + }); + } + if (query.match(/min\(.*timestamp.*foo/)) { promise = promise.then(() => [{ min: '2020-12-27T00:00:00.000' }]); } @@ -331,7 +350,11 @@ class MockDriver extends BaseDriver { let testCounter = 1; -const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertSecurityContext }: { repository: SchemaFileRepository, useOriginalSqlPreAggregations?: boolean, skipAssertSecurityContext?: true }) => { +const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertSecurityContext }: { + repository: SchemaFileRepository, + useOriginalSqlPreAggregations?: boolean, + skipAssertSecurityContext?: true +}) => { const mockDriver = new MockDriver(); const externalDriver = new MockDriver(); @@ -362,7 +385,7 @@ const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertS return externalDriver; }, orchestratorOptions: () => ({ - continueWaitTimeout: 0.1, + continueWaitTimeout: 1, queryCacheOptions: { queueOptions: () => ({ concurrency: 2, @@ -1112,4 +1135,101 @@ describe('Refresh Scheduler', () => { } } }); + + test('Exponential backoff', async () => { + process.env.CUBEJS_EXTERNAL_DEFAULT = 'false'; + process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true'; + process.env.CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME = '10'; // 10 seconds max backoff + + const { + refreshScheduler, mockDriver, serverCore + } = setupScheduler({ repository: repositoryWithPreAggregations }); + + const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' }; + + const orchestratorApi = await serverCore.getOrchestratorApi(ctx); + const preAggsInstance = orchestratorApi.getQueryOrchestrator().getPreAggregations(); + + // Target specific pre-aggregation: foo_first (all partitions) + // Scheduler processes multiple partitions: foo_first20201231, foo_first20201230, etc. + // Configure driver to fail only for foo_first table creation + mockDriver.shouldFailQuery = true; + mockDriver.failQueryPattern = /foo_first/; + + // Run refresh until it tries to create foo_first table and fails + const queryIteratorState = {}; + const maxIterations = 100; + for (let i = 0; i < maxIterations; i++) { + try { + await refreshScheduler.runScheduledRefresh(ctx, { + concurrency: 1, + workerIndices: [0], + timezones: ['UTC'], + queryIteratorState, + }); + } catch (e) { + // Expected to fail when hitting foo_first + } + + // Check if we started attempting to create foo_first table + if (mockDriver.queryAttempts > 0) { + break; + } + } + + const initialAttempts = mockDriver.queryAttempts; + expect(initialAttempts).toBeGreaterThan(0); + + // Wait for backoff to be set in storage (increased delay for async Redis writes) + await mockDriver.delay(1000); + + // Find which foo_first partition has backoff set + // Scheduler may process different partitions (20201231, 20201230, etc.) + const possiblePartitions = ['20201231', '20201230', '20201229', '20201228', '20201227']; + let backoffData: { backoffMultiplier: number, nextTimestamp: Date } | null = null; + let targetTableName: string | null = null; + + for (const partition of possiblePartitions) { + const tableName = `stb_pre_aggregations.foo_first${partition}`; + const data = await preAggsInstance.getPreAggBackoff(tableName); + if (data) { + backoffData = data; + targetTableName = tableName; + break; + } + } + + // Verify backoff was set for at least one foo_first table + expect(backoffData).not.toBeNull(); + expect(targetTableName).not.toBeNull(); + // Initial backoff multiplier is 1 second + expect(backoffData!.backoffMultiplier).toBeGreaterThanOrEqual(1); + + // Step 1: Immediate retry - should skip due to backoff (10-second window) + const beforeSkipAttempts = mockDriver.queryAttempts; + const immediateRetryCount = 5; + for (let i = 0; i < immediateRetryCount; i++) { + try { + await refreshScheduler.runScheduledRefresh(ctx, { + concurrency: 1, + workerIndices: [0], + timezones: ['UTC'], + queryIteratorState, + }); + } catch (e) { + // Expected to skip due to backoff + } + } + + // Query attempts should not increase significantly (skipped due to backoff) + // Allow some margin for other pre-aggregations processed by scheduler + expect(mockDriver.queryAttempts).toBeLessThanOrEqual(beforeSkipAttempts + 2); + + // Step 2: Verify backoff persists - pre-aggregation is still in backoff after 500ms + await mockDriver.delay(500); + const backoffDataStillActive = await preAggsInstance.getPreAggBackoff(targetTableName!); + expect(backoffDataStillActive).not.toBeNull(); + // backoffDataStillActive exists, which means backoff is still in place + // (nextTimestamp may be close to current time due to test execution delays) + }); });