Skip to content

Commit 8fd6dcc

Browse files
authored
feat: add datasource schema read methods (cube-js#9818)
1 parent c2363c4 commit 8fd6dcc

File tree

3 files changed

+622
-4
lines changed

3 files changed

+622
-4
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import { LocalCacheDriver } from './LocalCacheDriver';
1818
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
1919
import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggregations';
2020
import { getCacheHash } from './utils';
21-
import { CacheAndQueryDriverType } from './QueryOrchestrator';
21+
import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator';
2222

2323
type QueryOptions = {
2424
external?: boolean;
@@ -563,8 +563,36 @@ export class QueryCache {
563563
): QueryQueue {
564564
const queue: any = new QueryQueue(redisPrefix, {
565565
queryHandlers: {
566+
metadata: async (req, _setCancelHandle) => {
567+
const client = await clientFactory();
568+
const { operation } = req;
569+
const params = req.params || {};
570+
571+
switch (operation) {
572+
case MetadataOperationType.GET_SCHEMAS:
573+
queue.logger('Getting datasource schemas', { dataSource: req.dataSource, requestId: req.requestId });
574+
return client.getSchemas();
575+
case MetadataOperationType.GET_TABLES_FOR_SCHEMAS:
576+
queue.logger('Getting tables for schemas', {
577+
dataSource: req.dataSource,
578+
schemaCount: params.schemas?.length || 0,
579+
requestId: req.requestId
580+
});
581+
return client.getTablesForSpecificSchemas(params.schemas);
582+
case MetadataOperationType.GET_COLUMNS_FOR_TABLES:
583+
queue.logger('Getting columns for tables', {
584+
dataSource: req.dataSource,
585+
tableCount: params.tables?.length || 0,
586+
requestId: req.requestId
587+
});
588+
return client.getColumnsForSpecificTables(params.tables);
589+
default:
590+
throw new Error(`Unknown metadata operation: ${operation}`);
591+
}
592+
},
566593
query: async (req, setCancelHandle) => {
567594
const client = await clientFactory();
595+
568596
const resultPromise = executeFn(client, req);
569597
let handle;
570598
if (resultPromise.cancel) {
@@ -632,6 +660,12 @@ export class QueryCache {
632660
}));
633661
},
634662
cancelHandlers: {
663+
metadata: async (req) => {
664+
if (req.cancelHandler && queue.handles[req.cancelHandler]) {
665+
await queue.handles[req.cancelHandler].cancel();
666+
delete queue.handles[req.cancelHandler];
667+
}
668+
},
635669
query: async (req) => {
636670
if (req.cancelHandler && queue.handles[req.cancelHandler]) {
637671
await queue.handles[req.cancelHandler].cancel();

packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts

Lines changed: 130 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,14 @@ import * as stream from 'stream';
22
import R from 'ramda';
33
import { getEnv } from '@cubejs-backend/shared';
44
import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver';
5-
6-
import { QueryKey } from '@cubejs-backend/base-driver';
7-
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable } from './QueryCache';
5+
import {
6+
QuerySchemasResult,
7+
QueryTablesResult,
8+
QueryColumnsResult,
9+
QueryKey
10+
} from '@cubejs-backend/base-driver';
11+
12+
import { QueryCache, QueryBody, TempTable, PreAggTableToTempTable, QueryWithParams, CacheKey } from './QueryCache';
813
import { PreAggregations, PreAggregationDescription, getLastUpdatedAtTimestamp } from './PreAggregations';
914
import { DriverFactory, DriverFactoryByDataSource } from './DriverFactory';
1015
import { QueryStream } from './QueryStream';
@@ -17,6 +22,12 @@ export enum DriverType {
1722
Cache = 'cache',
1823
}
1924

25+
export enum MetadataOperationType {
26+
GET_SCHEMAS = 'GET_SCHEMAS',
27+
GET_TABLES_FOR_SCHEMAS = 'GET_TABLES_FOR_SCHEMAS',
28+
GET_COLUMNS_FOR_TABLES = 'GET_COLUMNS_FOR_TABLES'
29+
}
30+
2031
export interface QueryOrchestratorOptions {
2132
externalDriverFactory?: DriverFactory;
2233
cacheAndQueueDriver?: CacheAndQueryDriverType;
@@ -428,4 +439,120 @@ export class QueryOrchestrator {
428439
public async updateRefreshEndReached() {
429440
return this.preAggregations.updateRefreshEndReached();
430441
}
442+
443+
private createMetadataQuery(operation: string, params: Record<string, any>): QueryWithParams {
444+
return [
445+
`METADATA:${operation}`,
446+
// TODO (@MikeNitsenko): Metadata queries need object params like [{ schema, table }]
447+
// but QueryWithParams expects string[]. This forces JSON.stringify workaround.
448+
[JSON.stringify(params)],
449+
{ external: false, renewalThreshold: 24 * 60 * 60 }
450+
];
451+
}
452+
453+
private async queryDataSourceMetadata<T>(
454+
operation: MetadataOperationType,
455+
params: Record<string, any>,
456+
dataSource: string = 'default',
457+
options: {
458+
requestId?: string;
459+
forceRefresh?: boolean;
460+
renewalThreshold?: number;
461+
expiration?: number;
462+
} = {}
463+
): Promise<T> {
464+
const {
465+
requestId,
466+
forceRefresh = false,
467+
renewalThreshold = 24 * 60 * 60,
468+
expiration = 7 * 24 * 60 * 60,
469+
} = options;
470+
471+
const metadataQuery = this.createMetadataQuery(operation, params);
472+
const cacheKey: CacheKey = [metadataQuery, dataSource];
473+
474+
const renewalKey = forceRefresh ? undefined : [
475+
`METADATA_RENEWAL:${operation}`,
476+
dataSource,
477+
Math.floor(Date.now() / (renewalThreshold * 1000))
478+
];
479+
480+
return this.queryCache.cacheQueryResult(
481+
metadataQuery,
482+
[],
483+
cacheKey,
484+
expiration,
485+
{
486+
renewalThreshold,
487+
renewalKey,
488+
forceNoCache: forceRefresh,
489+
requestId,
490+
dataSource,
491+
useInMemory: true,
492+
waitForRenew: true,
493+
}
494+
);
495+
}
496+
497+
/**
498+
* Query the data source for available schemas.
499+
*/
500+
public async queryDataSourceSchemas(
501+
dataSource: string = 'default',
502+
options: {
503+
requestId?: string;
504+
forceRefresh?: boolean;
505+
renewalThreshold?: number;
506+
expiration?: number;
507+
} = {}
508+
): Promise<QuerySchemasResult[]> {
509+
return this.queryDataSourceMetadata<QuerySchemasResult[]>(
510+
MetadataOperationType.GET_SCHEMAS,
511+
{},
512+
dataSource,
513+
options
514+
);
515+
}
516+
517+
/**
518+
* Query the data source for tables within the specified schemas.
519+
*/
520+
public async queryTablesForSchemas(
521+
schemas: QuerySchemasResult[],
522+
dataSource: string = 'default',
523+
options: {
524+
requestId?: string;
525+
forceRefresh?: boolean;
526+
renewalThreshold?: number;
527+
expiration?: number;
528+
} = {}
529+
): Promise<QueryTablesResult[]> {
530+
return this.queryDataSourceMetadata<QueryTablesResult[]>(
531+
MetadataOperationType.GET_TABLES_FOR_SCHEMAS,
532+
{ schemas },
533+
dataSource,
534+
options
535+
);
536+
}
537+
538+
/**
539+
* Query the data source for columns within the specified tables.
540+
*/
541+
public async queryColumnsForTables(
542+
tables: QueryTablesResult[],
543+
dataSource: string = 'default',
544+
options: {
545+
requestId?: string;
546+
forceRefresh?: boolean;
547+
renewalThreshold?: number;
548+
expiration?: number;
549+
} = {}
550+
): Promise<QueryColumnsResult[]> {
551+
return this.queryDataSourceMetadata<QueryColumnsResult[]>(
552+
MetadataOperationType.GET_COLUMNS_FOR_TABLES,
553+
{ tables },
554+
dataSource,
555+
options
556+
);
557+
}
431558
}

0 commit comments

Comments
 (0)