Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/cubejs-docker/latest-debian-jdk.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ RUN apt-get update \
# We are copying root yarn.lock file to the context folder during the Publish GH
# action. So, a process will use the root lock file here.
RUN yarn install --prod \
# Remove DuckDB sources to reduce image size
&& rm -rf /cube/node_modules/duckdb/src \
&& yarn cache clean

FROM node:22.22.0-bookworm-slim
Expand Down
2 changes: 0 additions & 2 deletions packages/cubejs-docker/latest.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ RUN apt-get update \
# We are copying root yarn.lock file to the context folder during the Publish GH
# action. So, a process will use the root lock file here.
RUN yarn install --prod \
# Remove DuckDB sources to reduce image size
&& rm -rf /cube/node_modules/duckdb/src \
&& yarn cache clean

FROM node:22.22.0-bookworm-slim
Expand Down
2 changes: 1 addition & 1 deletion packages/cubejs-duckdb-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
"@cubejs-backend/base-driver": "1.6.64",
"@cubejs-backend/schema-compiler": "1.6.64",
"@cubejs-backend/shared": "1.6.64",
"duckdb": "^1.4.1"
"@duckdb/node-api": "1.5.4-r.1"
},
"license": "Apache-2.0",
"devDependencies": {
Expand Down
72 changes: 43 additions & 29 deletions packages/cubejs-duckdb-driver/src/DuckDBDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@ import {
QueryOptions,
StreamTableData,
GenericDataBaseType,
TableStructure,
TableColumnQueryResult,
} from '@cubejs-backend/base-driver';
import { getEnv } from '@cubejs-backend/shared';
import { promisify } from 'util';
import * as stream from 'stream';
import { Connection, Database } from 'duckdb';
import { DuckDBConnection, DuckDBInstance, DuckDBValue, timestampMillisValue } from '@duckdb/node-api';

import { DuckDBQuery } from './DuckDBQuery';
import { HydrationStream, transformRow } from './HydrationStream';
Expand All @@ -29,10 +26,16 @@ export type DuckDBDriverConfiguration = {
};

type InitPromise = {
defaultConnection: Connection,
db: Database;
defaultConnection: DuckDBConnection,
instance: DuckDBInstance;
};

type ExecFn = (sql: string) => Promise<unknown>;

const normalizeValues = (values: unknown[] = []): DuckDBValue[] => values.map(
value => (value instanceof Date ? timestampMillisValue(BigInt(value.getTime())) : value as DuckDBValue)
);

const DuckDBToGenericType: Record<string, GenericDataBaseType> = {
// DATE_TRUNC returns DATE, but Cube Store still doesn't support DATE type
// DuckDB's driver transform date/timestamp to Date object, but HydrationStream converts any Date object to ISO timestamp
Expand Down Expand Up @@ -64,7 +67,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
return DuckDBToGenericType[columnType.toLowerCase()] || super.toGenericType(columnType.toLowerCase(), precision, scale);
}

private async installExtensions(extensions: string[], execAsync: (sql: string, ...params: any[]) => Promise<void>, repository: string = ''): Promise<void> {
private async installExtensions(extensions: string[], execAsync: ExecFn, repository: string = ''): Promise<void> {
repository = repository ? ` FROM ${repository}` : '';
for (const extension of extensions) {
try {
Expand All @@ -79,7 +82,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
}
}

private async loadExtensions(extensions: string[], execAsync: (sql: string, ...params: any[]) => Promise<void>): Promise<void> {
private async loadExtensions(extensions: string[], execAsync: ExecFn): Promise<void> {
for (const extension of extensions) {
try {
await execAsync(`LOAD ${extension}`);
Expand All @@ -106,17 +109,16 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
dbUrl = ':memory:';
}

let dbOptions;
let dbOptions: Record<string, string> | undefined;
if (token) {
dbOptions = { custom_user_agent: `Cube/${version}` };
}

// Create a new Database instance with the determined URL and custom user agent
const db = new Database(dbUrl, dbOptions);
// Create a new DuckDB instance with the determined URL and custom user agent
const instance = await DuckDBInstance.create(dbUrl, dbOptions);

// Under the hood all methods of Database uses internal default connection, but there is no way to expose it
const defaultConnection = db.connect();
const execAsync: (sql: string, ...params: any[]) => Promise<void> = promisify(defaultConnection.exec).bind(defaultConnection) as any;
const defaultConnection = await instance.connect();
const execAsync: ExecFn = (sql: string) => defaultConnection.run(sql);

const configuration = [
{
Expand Down Expand Up @@ -206,7 +208,7 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {

return {
defaultConnection,
db
instance
};
}

Expand Down Expand Up @@ -250,13 +252,15 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {

public async query<R = unknown>(query: string, values: unknown[] = [], _options?: QueryOptions): Promise<R[]> {
const { defaultConnection } = await this.getInitiatedState();
const fetchAsync: (sql: string, ...params: any[]) => Promise<R[]> = promisify(defaultConnection.all).bind(defaultConnection) as any;

const result = await fetchAsync(query, ...values);
return result.map((item) => {
const reader = await defaultConnection.runAndReadAll(query, normalizeValues(values));
// getRowObjectsJS returns JS built-ins (numbers, bigints, Dates, strings),
// which HydrationStream's transformRow normalizes into Cube's expected shape.
const rows = reader.getRowObjectsJS();
return rows.map((item) => {
transformRow(item);

return item;
return item as R;
});
}

Expand All @@ -265,26 +269,36 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {
values: unknown[],
{ highWaterMark }: StreamOptions
): Promise<StreamTableData> {
const { db } = await this.getInitiatedState();
const { instance } = await this.getInitiatedState();

// new connection, because stream can break with
// Attempting to execute an unsuccessful or closed pending query result
// PreAggregation queue has a concurrency limit, it's why pool is not needed here
const connection = db.connect();
const closeAsync = promisify(connection.close).bind(connection);
const connection = await instance.connect();

try {
const asyncIterator = connection.stream(query, ...(values || []));
const rowStream = stream.Readable.from(asyncIterator, { highWaterMark }).pipe(new HydrationStream());
const result = await connection.stream(query, normalizeValues(values));

// yieldRowObjectJs yields one array of JS-converted row objects per chunk;
// flatten to a row-at-a-time async iterable for the Readable stream.
const rowIterator = async function* rows(): AsyncGenerator<Record<string, unknown>> {
for await (const chunk of result.yieldRowObjectJs()) {
for (const row of chunk) {
yield row;
}
}
};

const rowStream = stream.Readable.from(rowIterator(), { highWaterMark }).pipe(new HydrationStream());

return {
rowStream,
release: async () => {
await closeAsync();
connection.closeSync();
}
};
} catch (e) {
await closeAsync();
connection.closeSync();

throw e;
}
Expand All @@ -300,11 +314,11 @@ export class DuckDBDriver extends BaseDriver implements DriverInterface {

public async release(): Promise<void> {
if (this.initPromise) {
const { db } = await this.initPromise;
const close = promisify(db.close).bind(db);
const { defaultConnection, instance } = await this.initPromise;
this.initPromise = null;

await close();
defaultConnection.closeSync();
instance.closeSync();
}
}
}
18 changes: 18 additions & 0 deletions packages/cubejs-duckdb-driver/test/DuckDBDriver.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ describe('DuckDBDriver', () => {
]);
});

test('query with Date parameter', async () => {
const result = await driver.query('SELECT ?::TIMESTAMP AS created', [new Date('2020-04-04T04:04:04.444Z')]);

expect(result).toEqual([
{ created: '2020-04-04T04:04:04.444Z' }
]);
});

test('column types', async () => {
expect(await driver.tableColumnTypes('test.select_test')).toEqual([
{
Expand Down Expand Up @@ -73,4 +81,14 @@ describe('DuckDBDriver', () => {
{ id: '3', created: '2020-03-03T03:03:03.333Z', created_date: '2020-03-03T00:00:00.000Z', price: '300' }
]);
});

test('stream with Date parameter', async () => {
const tableData = await driver.stream('SELECT ?::TIMESTAMP AS created', [new Date('2020-04-04T04:04:04.444Z')], {
highWaterMark: 1000,
});

expect(await streamToArray(tableData.rowStream as any)).toEqual([
{ created: '2020-04-04T04:04:04.444Z' }
]);
});
});
Loading