Skip to content

Commit 34ffb05

Browse files
authored
refactor(query-orchestrator): Migrate QueryQueue to TypeScript (cube-js#6086)
1 parent 58cc4ac commit 34ffb05

File tree

15 files changed

+317
-344
lines changed

15 files changed

+317
-344
lines changed

packages/cubejs-backend-shared/src/type-helpers.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* This module export only type helpers for using it across Cube.js project
2+
* This module exports only type helpers for using it across the Cube project
33
*/
44

55
export type ResolveAwait<T> = T extends {
@@ -16,3 +16,6 @@ export type Required<T, K extends keyof T> = {
1616
};
1717

1818
export type Optional<T, K extends keyof T> = Pick<Partial<T>, K> & Omit<T, K>;
19+
20+
// <M extends Method<Class/Interface, M>>
21+
export type MethodName<T> = { [K in keyof T]: T[K] extends (...args: any[]) => any ? K : never }[keyof T];

packages/cubejs-base-driver/src/queue-driver.interface.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export type QueryDef = unknown;
1+
export type QueryDef = any;
22
// Primary key of Queue item
33
export type QueueId = string | number | bigint;
44
// This was used as a lock for Redis, deprecated.
@@ -38,10 +38,12 @@ export interface AddToQueueQuery {
3838
}
3939

4040
export interface AddToQueueOptions {
41-
stageQueryKey: string,
41+
// It's an ugly workaround for skip queue tasks
42+
queueId?: QueueId,
43+
stageQueryKey?: any,
4244
requestId: string,
45+
spanId?: string,
4346
orphanedTimeout?: number,
44-
queueId: QueueId,
4547
}
4648

4749
export interface QueueDriverOptions {
@@ -70,19 +72,19 @@ export interface QueueDriverConnectionInterface {
7072
* @param options
7173
*/
7274
addToQueue(keyScore: number, queryKey: QueryKey, orphanedTime: number, queryHandler: string, query: AddToQueueQuery, priority: number, options: AddToQueueOptions): Promise<AddToQueueResponse>;
73-
// Return query keys that were sorted by priority and time
75+
// Return query keys which was sorted by priority and time
7476
getToProcessQueries(): Promise<QueryKeysTuple[]>;
7577
getActiveQueries(): Promise<QueryKeysTuple[]>;
7678
getQueryDef(hash: QueryKeyHash, queueId: QueueId | null): Promise<QueryDef | null>;
77-
// Queries that were added to queue, but was not processed and not needed
79+
// Queries which was added to queue, but was not processed and not needed
7880
getOrphanedQueries(): Promise<QueryKeysTuple[]>;
79-
// Queries that were not completed with old heartbeat
81+
// Queries which was not completed with old heartbeat
8082
getStalledQueries(): Promise<QueryKeysTuple[]>;
8183
getQueryStageState(onlyKeys: boolean): Promise<QueryStageStateResponse>;
8284
updateHeartBeat(hash: QueryKeyHash, queueId: QueueId | null): Promise<void>;
8385
getNextProcessingId(): Promise<ProcessingId>;
8486
// Trying to acquire a lock for processing a queue item, this method can return null when
85-
// multiple nodes try to process the same query
87+
// multiple nodes tries to process the same query
8688
retrieveForProcessing(hash: QueryKeyHash, processingId: ProcessingId): Promise<RetrieveForProcessingResponse>;
8789
freeProcessingLock(hash: QueryKeyHash, processingId: ProcessingId, activated: unknown): Promise<void>;
8890
optimisticQueryUpdate(hash: QueryKeyHash, toUpdate: unknown, processingId: ProcessingId, queueId: QueueId | null): Promise<boolean>;

packages/cubejs-query-orchestrator/CLAUDE.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ yarn test
2626
# Run only unit tests
2727
yarn unit
2828

29-
# Run only integration tests
29+
# Run only integration tests
3030
yarn integration
3131

3232
# Run CubeStore integration tests specifically
@@ -46,16 +46,16 @@ yarn lint:fix
4646
The Query Orchestrator consists of several interconnected components:
4747

4848
1. **QueryOrchestrator** (`src/orchestrator/QueryOrchestrator.ts`): Main orchestration class that coordinates query execution and manages drivers
49-
2. **QueryCache** (`src/orchestrator/QueryCache.ts`): Handles query result caching with configurable cache drivers
50-
3. **QueryQueue** (`src/orchestrator/QueryQueue.js`): Manages query queuing and background processing
49+
2. **QueryCache** (`src/orchestrator/QueryCache.ts`): Handles query result caching with configurable cache drivers
50+
3. **QueryQueue** (`src/orchestrator/QueryQueue.ts`): Manages query queuing and background processing
5151
4. **PreAggregations** (`src/orchestrator/PreAggregations.ts`): Manages pre-aggregation building and loading
5252
5. **DriverFactory** (`src/orchestrator/DriverFactory.ts`): Creates and manages database driver instances
5353

5454
### Cache and Queue Driver Architecture
5555

5656
The orchestrator supports multiple backend drivers:
5757
- **Memory**: In-memory caching and queuing (development)
58-
- **CubeStore**: Distributed storage engine (production)
58+
- **CubeStore**: Distributed storage engine (production)
5959
- **Redis**: External Redis-based caching (legacy, being phased out)
6060

6161
Driver selection logic in `QueryOrchestrator.ts:detectQueueAndCacheDriver()`:
@@ -128,4 +128,4 @@ Key configuration options in `QueryOrchestratorOptions`:
128128
- Inherits linting rules from `@cubejs-backend/linter`
129129
- Jest configuration extends base repository config
130130
- Docker Compose setup for integration testing
131-
- Coverage reports generated in `coverage/` directory
131+
- Coverage reports generated in `coverage/` directory

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregationLoader.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,8 @@ export class PreAggregationLoader {
418418
const queue = await this.preAggregations.getQueue(this.preAggregation.dataSource);
419419
return queue.executeInQueue(
420420
'query',
421-
this.preAggregationQueryKey(invalidationKeys),
421+
// TODO: Sync types
422+
this.preAggregationQueryKey(invalidationKeys) as any,
422423
{
423424
preAggregation: this.preAggregation,
424425
preAggregationsTablesToTempTables: this.preAggregationsTablesToTempTables,

packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,9 @@ export class PreAggregations {
376376
tables = tables.filter(row => `${schema}.${row.table_name}` === table);
377377

378378
// fetching query result
379-
const { queueDriver } = this.queue[dataSource];
380-
const conn = await queueDriver.createConnection();
379+
const conn = await this.queue[dataSource].getQueueDriver().createConnection();
381380
const result = await conn.getResult(key);
382-
queueDriver.release(conn);
381+
this.queue[dataSource].getQueueDriver().release(conn);
383382

384383
// calculating status
385384
let status: string;

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import {
99
InlineTables,
1010
CacheDriverInterface,
1111
TableStructure,
12-
DriverInterface,
12+
DriverInterface, QueryKey,
1313
} from '@cubejs-backend/base-driver';
1414

15-
import { QueryQueue } from './QueryQueue';
15+
import { QueryQueue, QueryQueueOptions } from './QueryQueue';
1616
import { ContinueWaitError } from './ContinueWaitError';
1717
import { LocalCacheDriver } from './LocalCacheDriver';
1818
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
@@ -115,7 +115,7 @@ export interface QueryCacheOptions {
115115
}>;
116116
cubeStoreDriverFactory?: () => Promise<CubeStoreDriver>,
117117
continueWaitTimeout?: number;
118-
cacheAndQueueDriver?: CacheAndQueryDriverType;
118+
cacheAndQueueDriver: CacheAndQueryDriverType;
119119
maxInMemoryCacheEntries?: number;
120120
skipExternalCacheAndQueue?: boolean;
121121
}
@@ -133,7 +133,7 @@ export class QueryCache {
133133
protected readonly redisPrefix: string,
134134
protected readonly driverFactory: DriverFactoryByDataSource,
135135
protected readonly logger: any,
136-
public readonly options: QueryCacheOptions = {}
136+
public readonly options: QueryCacheOptions
137137
) {
138138
switch (options.cacheAndQueueDriver || 'memory') {
139139
case 'memory':
@@ -455,9 +455,9 @@ export class QueryCache {
455455
};
456456

457457
if (!persistent) {
458-
return queue.executeInQueue('query', cacheKey, _query, priority, opt);
458+
return queue.executeInQueue('query', cacheKey as QueryKey, _query, priority, opt);
459459
} else {
460-
return queue.executeInQueue('stream', cacheKey, {
460+
return queue.executeInQueue('stream', cacheKey as QueryKey, {
461461
..._query,
462462
aliasNameToMember,
463463
}, priority, opt);
@@ -563,7 +563,7 @@ export class QueryCache {
563563
redisPrefix: string,
564564
clientFactory: DriverFactory,
565565
executeFn: (client: BaseDriver, req: any) => any,
566-
options: Record<string, any> = {}
566+
options: Omit<QueryQueueOptions, 'queryHandlers' | 'cancelHandlers'>
567567
): QueryQueue {
568568
const queue: any = new QueryQueue(redisPrefix, {
569569
queryHandlers: {
@@ -583,57 +583,57 @@ export class QueryCache {
583583
}
584584
return result;
585585
},
586-
stream: async (req, target) => {
587-
queue.logger('Streaming SQL', { ...req });
588-
await (new Promise((resolve, reject) => {
589-
let logged = false;
590-
Promise
591-
.all([clientFactory()])
592-
.then(([client]) => (<DriverInterface>client).stream(req.query, req.values, { highWaterMark: getEnv('dbQueryStreamHighWaterMark') }))
593-
.then((source) => {
594-
const cleanup = async (error) => {
595-
if (source.release) {
596-
const toRelease = source.release;
597-
delete source.release;
598-
await toRelease();
599-
}
600-
if (error && !target.destroyed) {
601-
target.destroy(error);
602-
}
603-
if (!logged && target.destroyed) {
604-
logged = true;
605-
if (error) {
606-
queue.logger('Streaming done with error', {
607-
query: req.query,
608-
query_values: req.values,
609-
error,
610-
});
611-
reject(error);
612-
} else {
613-
queue.logger('Streaming successfully completed', {
614-
requestId: req.requestId,
615-
});
616-
resolve(req.requestId);
617-
}
586+
},
587+
streamHandler: async (req, target) => {
588+
queue.logger('Streaming SQL', { ...req });
589+
await (new Promise((resolve, reject) => {
590+
let logged = false;
591+
Promise
592+
.all([clientFactory()])
593+
.then(([client]) => (<DriverInterface>client).stream(req.query, req.values, { highWaterMark: getEnv('dbQueryStreamHighWaterMark') }))
594+
.then((source) => {
595+
const cleanup = async (error) => {
596+
if (source.release) {
597+
const toRelease = source.release;
598+
delete source.release;
599+
await toRelease();
600+
}
601+
if (error && !target.destroyed) {
602+
target.destroy(error);
603+
}
604+
if (!logged && target.destroyed) {
605+
logged = true;
606+
if (error) {
607+
queue.logger('Streaming done with error', {
608+
query: req.query,
609+
query_values: req.values,
610+
error,
611+
});
612+
reject(error);
613+
} else {
614+
queue.logger('Streaming successfully completed', {
615+
requestId: req.requestId,
616+
});
617+
resolve(req.requestId);
618618
}
619-
};
620-
621-
source.rowStream.once('end', () => cleanup(undefined));
622-
source.rowStream.once('error', cleanup);
623-
source.rowStream.once('close', () => cleanup(undefined));
624-
625-
target.once('end', () => cleanup(undefined));
626-
target.once('error', cleanup);
627-
target.once('close', () => cleanup(undefined));
628-
629-
source.rowStream.pipe(target);
630-
})
631-
.catch((reason) => {
632-
target.emit('error', reason);
633-
resolve(reason);
634-
});
635-
}));
636-
},
619+
}
620+
};
621+
622+
source.rowStream.once('end', () => cleanup(undefined));
623+
source.rowStream.once('error', cleanup);
624+
source.rowStream.once('close', () => cleanup(undefined));
625+
626+
target.once('end', () => cleanup(undefined));
627+
target.once('error', cleanup);
628+
target.once('close', () => cleanup(undefined));
629+
630+
source.rowStream.pipe(target);
631+
})
632+
.catch((reason) => {
633+
target.emit('error', reason);
634+
resolve(reason);
635+
});
636+
}));
637637
},
638638
cancelHandlers: {
639639
query: async (req) => {

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import R from 'ramda';
33
import { getEnv } from '@cubejs-backend/shared';
44
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
55

6+
import { QueryKey } from '@cubejs-backend/base-driver';
67
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache';
78
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
89
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
@@ -306,7 +307,7 @@ export class QueryOrchestrator {
306307

307308
if (pendingPreAggregationIndex === -1) {
308309
const qcQueue = await this.queryCache.getQueue(queryBody.dataSource);
309-
return qcQueue.getQueryStage(QueryCache.queryCacheKey(queryBody));
310+
return qcQueue.getQueryStage(QueryCache.queryCacheKey(queryBody) as QueryKey);
310311
}
311312

312313
const preAggregation = queryBody.preAggregations[pendingPreAggregationIndex];

0 commit comments

Comments
 (0)