Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/pages/product/caching/refreshing-pre-aggregations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ behavior:
- `CUBEJS_REFRESH_WORKER_CONCURRENCY` (see also `CUBEJS_CONCURRENCY`)
- `CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_APP_ID`
- `CUBEJS_DROP_PRE_AGG_WITHOUT_TOUCH`
- `CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME`

## Troubleshooting

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1155,6 +1155,17 @@ This can be overridden for individual pre-aggregations using the
| --------------- | ---------------------- | --------------------- |
| `true`, `false` | `true` | `true` |

## `CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME`

The maximum time, in seconds, for exponential backoff for retries when pre-aggregation
builds fail. When a pre-aggregation refresh fails, retries will be executed with
exponentially increasing delays, but the delay will not exceed the value specified by
this variable.

| Possible Values | Default in Development | Default in Production |
| ------------------------- | ---------------------- | --------------------- |
| A valid number in seconds | `600` | `600` |

## `CUBEJS_REFRESH_WORKER`

If `true`, this instance of Cube will **only** refresh pre-aggregations.
Expand Down
7 changes: 7 additions & 0 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,13 @@ const variables: Record<string, (...args: any) => any> = {
.default(60 * 60 * 24)
.asIntPositive(),

/**
* Maximum time for exponential backoff for pre-aggs (in seconds)
*/
preAggBackoffMaxTime: (): number => get('CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME')
.default(10 * 60)
.asIntPositive(),

/**
* Expire time for touch records
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ export class PreAggregations {

private readonly touchTablePersistTime: number;

private readonly preAggBackoffMaxTime: number;

public readonly dropPreAggregationsWithoutTouch: boolean;

private readonly usedTablePersistTime: number;
Expand All @@ -277,6 +279,7 @@ export class PreAggregations {
this.externalDriverFactory = options.externalDriverFactory;
this.structureVersionPersistTime = options.structureVersionPersistTime || 60 * 60 * 24 * 30;
this.touchTablePersistTime = options.touchTablePersistTime || getEnv('touchPreAggregationTimeout');
this.preAggBackoffMaxTime = options.preAggBackoffMaxTime || getEnv('preAggBackoffMaxTime');
this.dropPreAggregationsWithoutTouch = options.dropPreAggregationsWithoutTouch || getEnv('dropPreAggregationsWithoutTouch');
this.usedTablePersistTime = options.usedTablePersistTime || getEnv('dbQueryTimeout');
this.externalRefresh = options.externalRefresh;
Expand Down Expand Up @@ -317,6 +320,11 @@ export class PreAggregations {
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_TABLES_TOUCH', tableName);
}

protected preAggBackoffRedisKey(tableName: string): string {
// TODO add dataSource?
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_BACKOFF', tableName);
}

protected refreshEndReachedKey() {
// TODO add dataSource?
return this.queryCache.getKey('SQL_PRE_AGGREGATIONS_REFRESH_END_REACHED', '');
Expand Down Expand Up @@ -372,6 +380,36 @@ export class PreAggregations {
.map(k => k.replace(this.tablesTouchRedisKey(''), ''));
}

public async updatePreAggBackoff(tableName: string, backoffData: { backoffMultiplier: number, nextTimestamp: Date }): Promise<void> {
await this.queryCache.getCacheDriver().set(
this.preAggBackoffRedisKey(tableName),
JSON.stringify(backoffData),
this.preAggBackoffMaxTime
);
}

public async removePreAggBackoff(tableName: string): Promise<void> {
await this.queryCache.getCacheDriver().remove(this.preAggBackoffRedisKey(tableName));
}

public getPreAggBackoffMaxTime(): number {
return this.preAggBackoffMaxTime;
}

public async getPreAggBackoff(tableName: string): Promise<{ backoffMultiplier: number, nextTimestamp: Date } | null> {
const res = await this.queryCache.getCacheDriver().get(this.preAggBackoffRedisKey(tableName));

if (!res) {
return null;
}

const parsed = JSON.parse(res);
return {
backoffMultiplier: parsed.backoffMultiplier,
nextTimestamp: new Date(parsed.nextTimestamp),
};
}

public async updateRefreshEndReached() {
return this.queryCache.getCacheDriver().set(this.refreshEndReachedKey(), new Date().getTime(), this.touchTablePersistTime);
}
Expand Down
44 changes: 43 additions & 1 deletion packages/cubejs-server-core/src/core/RefreshScheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,49 @@ export class RefreshScheduler {
const currentQuery = await queryIterator.current();
if (currentQuery && queryIterator.partitionCounter() % concurrency === workerIndex) {
const orchestratorApi = await this.serverCore.getOrchestratorApi(context);
await orchestratorApi.executeQuery({ ...currentQuery, preAggregationsLoadCacheByDataSource });
const preAggsInstance = orchestratorApi.getQueryOrchestrator().getPreAggregations();
const now = new Date();

const backoffChecks = await Promise.all(
currentQuery.preAggregations.map(p => preAggsInstance.getPreAggBackoff(p.tableName))
);

// Skip execution if any pre-aggregation is still in backoff window
const shouldSkip = backoffChecks.some(backoffData => backoffData && now < backoffData.nextTimestamp);

if (!shouldSkip) {
try {
await orchestratorApi.executeQuery({ ...currentQuery, preAggregationsLoadCacheByDataSource });
} catch (e: any) {
// Check if this is a "Continue wait" error - these are normal queue signals
// For Continue wait errors, re-throw to handle them in the normal flow
if (e.error === 'Continue wait') {
throw e;
}

// Real datasource error - apply exponential backoff
for (const p of currentQuery.preAggregations) {
let backoffData = await preAggsInstance.getPreAggBackoff(p.tableName);

if (backoffData && backoffData.backoffMultiplier > 0) {
const newMultiplier = backoffData.backoffMultiplier * 2;
const delaySeconds = Math.min(newMultiplier, preAggsInstance.getPreAggBackoffMaxTime());

backoffData = {
backoffMultiplier: newMultiplier,
nextTimestamp: new Date(now.valueOf() + delaySeconds * 1000),
};
} else {
backoffData = {
backoffMultiplier: 1,
nextTimestamp: new Date(now.valueOf() + 1000),
};
}

await preAggsInstance.updatePreAggBackoff(p.tableName, backoffData);
}
}
}
}
const hasNext = await queryIterator.advance();
if (!hasNext) {
Expand Down
124 changes: 122 additions & 2 deletions packages/cubejs-server-core/test/unit/RefreshScheduler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,12 @@ class MockDriver extends BaseDriver {

private schema: any;

public shouldFailQuery: boolean = false;

public failQueryPattern: RegExp | null = null;

public queryAttempts: number = 0;

public constructor() {
super();
}
Expand All @@ -257,9 +263,22 @@ class MockDriver extends BaseDriver {

public query(query) {
this.executedQueries.push(query);

// Track query attempts for backoff testing
if (this.failQueryPattern && query.match(this.failQueryPattern)) {
this.queryAttempts++;
}

let promise: any = Promise.resolve([query]);
promise = promise.then((res) => new Promise(resolve => setTimeout(() => resolve(res), 150)));

// Simulate query failure for backoff testing
if (this.shouldFailQuery && this.failQueryPattern && query.match(this.failQueryPattern)) {
promise = promise.then(() => {
throw new Error('Simulated datasource error');
});
}

if (query.match(/min\(.*timestamp.*foo/)) {
promise = promise.then(() => [{ min: '2020-12-27T00:00:00.000' }]);
}
Expand Down Expand Up @@ -331,7 +350,11 @@ class MockDriver extends BaseDriver {

let testCounter = 1;

const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertSecurityContext }: { repository: SchemaFileRepository, useOriginalSqlPreAggregations?: boolean, skipAssertSecurityContext?: true }) => {
const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertSecurityContext }: {
repository: SchemaFileRepository,
useOriginalSqlPreAggregations?: boolean,
skipAssertSecurityContext?: true
}) => {
const mockDriver = new MockDriver();
const externalDriver = new MockDriver();

Expand Down Expand Up @@ -362,7 +385,7 @@ const setupScheduler = ({ repository, useOriginalSqlPreAggregations, skipAssertS
return externalDriver;
},
orchestratorOptions: () => ({
continueWaitTimeout: 0.1,
continueWaitTimeout: 1,
queryCacheOptions: {
queueOptions: () => ({
concurrency: 2,
Expand Down Expand Up @@ -1112,4 +1135,101 @@ describe('Refresh Scheduler', () => {
}
}
});

test('Exponential backoff', async () => {
process.env.CUBEJS_EXTERNAL_DEFAULT = 'false';
process.env.CUBEJS_SCHEDULED_REFRESH_DEFAULT = 'true';
process.env.CUBEJS_PRE_AGGREGATIONS_BACKOFF_MAX_TIME = '10'; // 10 seconds max backoff

const {
refreshScheduler, mockDriver, serverCore
} = setupScheduler({ repository: repositoryWithPreAggregations });

const ctx = { authInfo: { tenantId: 'tenant1' }, securityContext: { tenantId: 'tenant1' }, requestId: 'XXX' };

const orchestratorApi = await serverCore.getOrchestratorApi(ctx);
const preAggsInstance = orchestratorApi.getQueryOrchestrator().getPreAggregations();

// Target specific pre-aggregation: foo_first (all partitions)
// Scheduler processes multiple partitions: foo_first20201231, foo_first20201230, etc.
// Configure driver to fail only for foo_first table creation
mockDriver.shouldFailQuery = true;
mockDriver.failQueryPattern = /foo_first/;

// Run refresh until it tries to create foo_first table and fails
const queryIteratorState = {};
const maxIterations = 100;
for (let i = 0; i < maxIterations; i++) {
try {
await refreshScheduler.runScheduledRefresh(ctx, {
concurrency: 1,
workerIndices: [0],
timezones: ['UTC'],
queryIteratorState,
});
} catch (e) {
// Expected to fail when hitting foo_first
}

// Check if we started attempting to create foo_first table
if (mockDriver.queryAttempts > 0) {
break;
}
}

const initialAttempts = mockDriver.queryAttempts;
expect(initialAttempts).toBeGreaterThan(0);

// Wait for backoff to be set in storage (increased delay for async Redis writes)
await mockDriver.delay(1000);

// Find which foo_first partition has backoff set
// Scheduler may process different partitions (20201231, 20201230, etc.)
const possiblePartitions = ['20201231', '20201230', '20201229', '20201228', '20201227'];
let backoffData: { backoffMultiplier: number, nextTimestamp: Date } | null = null;
let targetTableName: string | null = null;

for (const partition of possiblePartitions) {
const tableName = `stb_pre_aggregations.foo_first${partition}`;
const data = await preAggsInstance.getPreAggBackoff(tableName);
if (data) {
backoffData = data;
targetTableName = tableName;
break;
}
}

// Verify backoff was set for at least one foo_first table
expect(backoffData).not.toBeNull();
expect(targetTableName).not.toBeNull();
// Initial backoff multiplier is 1 second
expect(backoffData!.backoffMultiplier).toBeGreaterThanOrEqual(1);

// Step 1: Immediate retry - should skip due to backoff (10-second window)
const beforeSkipAttempts = mockDriver.queryAttempts;
const immediateRetryCount = 5;
for (let i = 0; i < immediateRetryCount; i++) {
try {
await refreshScheduler.runScheduledRefresh(ctx, {
concurrency: 1,
workerIndices: [0],
timezones: ['UTC'],
queryIteratorState,
});
} catch (e) {
// Expected to skip due to backoff
}
}

// Query attempts should not increase significantly (skipped due to backoff)
// Allow some margin for other pre-aggregations processed by scheduler
expect(mockDriver.queryAttempts).toBeLessThanOrEqual(beforeSkipAttempts + 2);

// Step 2: Verify backoff persists - pre-aggregation is still in backoff after 500ms
await mockDriver.delay(500);
const backoffDataStillActive = await preAggsInstance.getPreAggBackoff(targetTableName!);
expect(backoffDataStillActive).not.toBeNull();
// backoffDataStillActive exists, which means backoff is still in place
// (nextTimestamp may be close to current time due to test execution delays)
});
});
Loading