diff --git a/cli/api/BUILD b/cli/api/BUILD index 352f3ef06..0b2e2f0c4 100644 --- a/cli/api/BUILD +++ b/cli/api/BUILD @@ -58,7 +58,8 @@ node_modules( ts_test_suite( name = "tests", srcs = [ - "utils_test.ts", + "commands/jit/rpc_test.ts", + "dbadapters/bigquery_test.ts", ], data = [ ":node_modules", @@ -68,10 +69,11 @@ ts_test_suite( "@nodejs//:npm", ], deps = [ - "//cli/api", + ":api", "//core", "//protos:ts", "//testing", + "@npm//@google-cloud/bigquery", "@npm//@types/chai", "@npm//@types/fs-extra", "@npm//@types/js-yaml", diff --git a/cli/api/commands/base_worker.ts b/cli/api/commands/base_worker.ts new file mode 100644 index 000000000..a188ddbc9 --- /dev/null +++ b/cli/api/commands/base_worker.ts @@ -0,0 +1,70 @@ +import { ChildProcess, fork } from "child_process"; + +export abstract class BaseWorker { + protected constructor(private readonly loaderPath: string) {} + + protected async runWorker( + timeoutMillis: number, + onBoot: (child: ChildProcess) => void, + onMessage: (message: TMessage, child: ChildProcess, resolve: (res: TResponse) => void, reject: (err: Error) => void) => void + ): Promise { + const forkScript = this.resolveScript(); + const child = fork(forkScript, [], { + stdio: [0, 1, 2, "ipc", "pipe"] + }); + + return new Promise((resolve, reject) => { + let completed = false; + + const terminate = (fn: () => void) => { + if (completed) { + return; + } + completed = true; + clearTimeout(timeout); + child.kill(); + fn(); + }; + + const timeout = setTimeout(() => { + terminate(() => + reject(new Error(`Worker timed out after ${timeoutMillis / 1000} seconds`)) + ); + }, timeoutMillis); + + child.on("message", (message: any) => { + if (message.type === "worker_booted") { + onBoot(child); + return; + } + onMessage(message, child, (res) => terminate(() => resolve(res)), (err) => terminate(() => reject(err))); + }); + + child.on("error", err => { + terminate(() => reject(err)); + }); + + child.on("exit", (code, signal) => { + if (!completed) { + const errorMsg = + code !== 0 && code !== null + ? `Worker exited with code ${code} and signal ${signal}` + : "Worker exited without sending a response message"; + terminate(() => reject(new Error(errorMsg))); + } + }); + }); + } + + private resolveScript() { + const pathsToTry = [this.loaderPath, "./worker_bundle.js"]; + for (const p of pathsToTry) { + try { + return require.resolve(p); + } catch (e) { + // Continue to next path. + } + } + throw new Error(`Could not resolve worker script. Tried: ${pathsToTry.join(", ")}`); + } +} diff --git a/cli/api/commands/jit/compiler.ts b/cli/api/commands/jit/compiler.ts new file mode 100644 index 000000000..b08b349df --- /dev/null +++ b/cli/api/commands/jit/compiler.ts @@ -0,0 +1,102 @@ +import { ChildProcess } from "child_process"; + +import { BaseWorker } from "df/cli/api/commands/base_worker"; +import { handleDbRequest } from "df/cli/api/commands/jit/rpc"; +import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters"; +import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery"; +import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants"; +import { dataform } from "df/protos/ts"; + +export interface IJitWorkerMessage { + type: "rpc_request" | "jit_response" | "jit_error"; + method?: string; + request?: string; + correlationId?: string; + response?: any; + error?: string; +} + +export class JitCompileChildProcess extends BaseWorker< + dataform.IJitCompilationResponse, + IJitWorkerMessage +> { + public static async compile( + request: dataform.IJitCompilationRequest, + projectDir: string, + dbadapter: IDbAdapter, + dbclient: IDbClient, + timeoutMillis: number = DEFAULT_COMPILATION_TIMEOUT_MILLIS, + options?: IBigQueryExecutionOptions + ): Promise { + return await new JitCompileChildProcess().run( + request, + projectDir, + dbadapter, + dbclient, + timeoutMillis, + options + ); + } + + constructor() { + super("../../../vm/jit_loader"); + } + + private async run( + request: dataform.IJitCompilationRequest, + projectDir: string, + dbadapter: IDbAdapter, + dbclient: IDbClient, + timeoutMillis: number, + options?: IBigQueryExecutionOptions + ): Promise { + return await this.runWorker( + timeoutMillis, + child => { + child.send({ + type: "jit_compile", + request, + projectDir + }); + }, + async (message, child, resolve, reject) => { + if (message.type === "rpc_request") { + await this.handleRpcRequest(message, child, dbadapter, dbclient, options); + } else if (message.type === "jit_response") { + resolve(dataform.JitCompilationResponse.fromObject(message.response)); + } else if (message.type === "jit_error") { + reject(new Error(message.error)); + } + } + ); + } + + private async handleRpcRequest( + message: IJitWorkerMessage, + child: ChildProcess, + dbadapter: IDbAdapter, + dbclient: IDbClient, + options?: IBigQueryExecutionOptions + ) { + try { + const response = await handleDbRequest( + dbadapter, + dbclient, + message.method, + Buffer.from(message.request, "base64"), + options + ); + child.send({ + type: "rpc_response", + correlationId: message.correlationId, + response: Buffer.from(response).toString("base64") + }); + } catch (e) { + child.send({ + type: "rpc_response", + correlationId: message.correlationId, + error: e.message + }); + } + } +} diff --git a/cli/api/commands/jit/rpc.ts b/cli/api/commands/jit/rpc.ts new file mode 100644 index 000000000..05d76ce88 --- /dev/null +++ b/cli/api/commands/jit/rpc.ts @@ -0,0 +1,90 @@ +import Long from "long"; + +import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters"; +import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery"; +import { Structs } from "df/common/protos/structs"; +import { dataform, google } from "df/protos/ts"; + +export async function handleDbRequest( + dbadapter: IDbAdapter, + dbclient: IDbClient, + method: string, + request: Uint8Array, + options?: IBigQueryExecutionOptions +): Promise { + switch (method) { + case "Execute": + return await handleExecute(dbclient, request, options); + case "ListTables": + return await handleListTables(dbadapter, request); + case "GetTable": + return await handleGetTable(dbadapter, request); + case "DeleteTable": + return await handleDeleteTable(dbadapter, request, options?.dryRun); + default: + throw new Error(`Unrecognized RPC method: ${method}`); + } +} + +async function handleExecute( + dbclient: IDbClient, + request: Uint8Array, + options?: IBigQueryExecutionOptions +): Promise { + const executeRequest = dataform.ExecuteRequest.decode(request); + const executeRequestObj = dataform.ExecuteRequest.toObject(executeRequest, { + defaults: false + }); + const requestOptions = executeRequestObj.bigQueryOptions; + + const results = await dbclient.executeRaw(executeRequest.statement, { + rowLimit: executeRequest.rowLimit ? (executeRequest.rowLimit as Long).toNumber() : undefined, + params: Structs.toObject(executeRequest.params), + bigquery: { + ...options, + ...requestOptions, + labels: { + ...options?.labels, + ...requestOptions?.labels + }, + jobPrefix: [options?.jobPrefix, requestOptions?.jobPrefix].filter(Boolean).join("-") || undefined + } + }); + + return dataform.ExecuteResponse.encode({ + rows: (results.rows || []).map(row => Structs.fromObject(row)), + schemaFields: results.schema || [] + } as any).finish(); +} + +async function handleListTables(dbadapter: IDbAdapter, request: Uint8Array): Promise { + const listTablesRequest = dataform.ListTablesRequest.decode(request); + const targets = await dbadapter.tables(listTablesRequest.database, listTablesRequest.schema); + const tablesMetadata = await Promise.all(targets.map(target => dbadapter.table(target))); + const listTablesResponse = dataform.ListTablesResponse.create({ + tables: tablesMetadata + }); + return dataform.ListTablesResponse.encode(listTablesResponse).finish(); +} + +async function handleGetTable(dbadapter: IDbAdapter, request: Uint8Array): Promise { + const getTableRequest = dataform.GetTableRequest.decode(request); + const tableMetadata = await dbadapter.table(getTableRequest.target); + if (!tableMetadata) { + throw new Error(`Table not found: ${JSON.stringify(getTableRequest.target)}`); + } + return dataform.TableMetadata.encode(tableMetadata).finish(); +} + +async function handleDeleteTable( + dbadapter: IDbAdapter, + request: Uint8Array, + dryRun?: boolean +): Promise { + const deleteTableRequest = dataform.DeleteTableRequest.decode(request); + if (dryRun) { + return new Uint8Array(); + } + await dbadapter.deleteTable(deleteTableRequest.target); + return new Uint8Array(); +} diff --git a/cli/api/commands/jit/rpc_test.ts b/cli/api/commands/jit/rpc_test.ts new file mode 100644 index 000000000..8741fb809 --- /dev/null +++ b/cli/api/commands/jit/rpc_test.ts @@ -0,0 +1,479 @@ +import { expect } from "chai"; +import Long from "long"; +import { anything, capture, instance, mock, verify, when } from "ts-mockito"; + +import { handleDbRequest } from "df/cli/api/commands/jit/rpc"; +import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters"; +import { dataform } from "df/protos/ts"; +import { suite, test } from "df/testing"; + +suite("jit_rpc", () => { + test("Execute RPC maps to client.execute with all options", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const statement = "SELECT * FROM table"; + const executeRequest = dataform.ExecuteRequest.create({ + statement, + rowLimit: Long.fromNumber(100), + byteLimit: Long.fromNumber(1024), + bigQueryOptions: { + interactive: true, + location: "US", + labels: { key: "val" }, + jobPrefix: "prefix", + dryRun: true + } + }); + const encodedRequest = dataform.ExecuteRequest.encode(executeRequest).finish(); + + // Real raw BigQuery f/v format + const rawRows = [ + { + f: [ + { v: "42" }, + { v: "val" }, + { v: "true" }, + { v: null } + ] + } + ]; + + const schema = [ + { name: "num", primitive: dataform.Field.Primitive.INTEGER }, + { name: "str", primitive: dataform.Field.Primitive.STRING }, + { name: "bool", primitive: dataform.Field.Primitive.BOOLEAN }, + { name: "n", primitive: dataform.Field.Primitive.STRING } + ]; + when(mockClient.executeRaw(statement, anything())).thenResolve({ + rows: rawRows, + schema, + metadata: { bigquery: { jobId: "job1" } } + }); + + const response = await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest); + const decoded = dataform.ExecuteResponse.decode(response); + + expect(decoded.rows.length).equals(1); + const row = decoded.rows[0]; + const fList = row.fields.f.listValue.values; + expect(fList[0].structValue.fields.v.stringValue).equals("42"); + expect(fList[1].structValue.fields.v.stringValue).equals("val"); + expect(fList[2].structValue.fields.v.stringValue).equals("true"); + expect(fList[3].structValue.fields.v.nullValue).equals(0); + + expect(decoded.schemaFields.length).equals(4); + expect(decoded.schemaFields[0].name).equals("num"); + expect(decoded.schemaFields[1].name).equals("str"); + + verify(mockClient.executeRaw(statement, anything())).once(); + const capturedArgs = capture(mockClient.executeRaw).last(); + expect(capturedArgs[0]).to.equal(statement); + const capturedOptions = capturedArgs[1]; + expect(capturedOptions.bigquery.location).equals("US"); + expect(capturedOptions.bigquery.labels).deep.equals({ key: "val" }); + expect(capturedOptions.bigquery.jobPrefix).equals("prefix"); + expect(capturedOptions.bigquery.dryRun).equals(true); + }); + + test("DeleteTable RPC calls adapter and returns empty buffer", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const target = { database: "db", schema: "sch", name: "tab" }; + const request = dataform.DeleteTableRequest.create({ target }); + const encodedRequest = dataform.DeleteTableRequest.encode(request).finish(); + + const response = await handleDbRequest(instance(mockAdapter), instance(mockClient), "DeleteTable", encodedRequest); + + verify(mockAdapter.deleteTable(anything())).once(); + const capturedTarget = capture(mockAdapter.deleteTable).last()[0]; + expect(dataform.Target.create(capturedTarget)).deep.equals(dataform.Target.create(target)); + expect(response.length).equals(0); + }); + + test("Execute RPC handles null and empty result sets", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const statement = "SELECT null as n"; + const encodedRequest = dataform.ExecuteRequest.encode(dataform.ExecuteRequest.create({ statement })).finish(); + + // Test with a null value + when(mockClient.executeRaw(statement, anything())).thenResolve({ + rows: [ + { + f: [{ v: null }] + } + ], + schema: [{ name: "n", primitive: dataform.Field.Primitive.STRING }], + metadata: {} + }); + + const response = await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest); + const decoded = dataform.ExecuteResponse.decode(response); + + expect(decoded.rows.length).equals(1); + const fListNull = decoded.rows[0].fields.f.listValue.values; + expect(fListNull[0].structValue.fields.v.nullValue).equals(0); // Protobuf NullValue.NULL_VALUE is 0 + + verify(mockClient.executeRaw(statement, anything())).once(); + const capturedArgs1 = capture(mockClient.executeRaw).last(); + expect(capturedArgs1[0]).to.equal(statement); + expect(capturedArgs1[1].bigquery.dryRun).equals(undefined); + + // Test with empty rows + when(mockClient.executeRaw(statement, anything())).thenResolve({ + rows: [], + metadata: {} + }); + + const responseEmpty = await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest); + const decodedEmpty = dataform.ExecuteResponse.decode(responseEmpty); + expect(decodedEmpty.rows.length).equals(0); + + verify(mockClient.executeRaw(statement, anything())).twice(); + const capturedArgs2 = capture(mockClient.executeRaw).last(); + expect(capturedArgs2[0]).to.equal(statement); + expect(capturedArgs2[1].bigquery.dryRun).equals(undefined); + }); + + test("ListTables RPC returns tables from adapter", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const request = dataform.ListTablesRequest.create({ database: "db", schema: "sch" }); + const encodedRequest = dataform.ListTablesRequest.encode(request).finish(); + + const target1 = { database: "db", schema: "sch", name: "table1" }; + when(mockAdapter.tables("db", "sch")).thenResolve([target1]); + when(mockAdapter.table(anything())).thenResolve({ target: target1 } as any); + + const response = await handleDbRequest(instance(mockAdapter), instance(mockClient), "ListTables", encodedRequest); + const decoded = dataform.ListTablesResponse.decode(response); + + expect(decoded.tables.length).equals(1); + expect(decoded.tables[0].target.name).equals("table1"); + + verify(mockAdapter.tables("db", "sch")).once(); + verify(mockAdapter.table(anything())).once(); + const capturedTarget = capture(mockAdapter.table).last()[0]; + expect(dataform.Target.create(capturedTarget)).deep.equals(dataform.Target.create(target1)); + }); + + test("GetTable RPC returns metadata from adapter", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const target = { database: "db", schema: "sch", name: "tab" }; + const request = dataform.GetTableRequest.create({ target }); + const encodedRequest = dataform.GetTableRequest.encode(request).finish(); + + when(mockAdapter.table(anything())).thenResolve({ target } as any); + + const response = await handleDbRequest(instance(mockAdapter), instance(mockClient), "GetTable", encodedRequest); + const decoded = dataform.TableMetadata.decode(response); + + expect(decoded.target.name).equals("tab"); + verify(mockAdapter.table(anything())).once(); + const capturedTarget = capture(mockAdapter.table).last()[0]; + expect(dataform.Target.create(capturedTarget)).deep.equals(dataform.Target.create(target)); + }); + + test("GetTable RPC throws error when table not found", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const target = { database: "db", schema: "sch", name: "missing" }; + const request = dataform.GetTableRequest.create({ target }); + const encodedRequest = dataform.GetTableRequest.encode(request).finish(); + + // Adapter returns null for missing table + when(mockAdapter.table(anything())).thenResolve(null); + + try { + await handleDbRequest(instance(mockAdapter), instance(mockClient), "GetTable", encodedRequest); + expect.fail("Should have thrown an error"); + } catch (e) { + expect(e.message).to.contain("Table not found"); + expect(e.message).to.contain("missing"); + } + + verify(mockAdapter.table(anything())).once(); + }); + + test("DeleteTable RPC respects global dryRun flag", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const request = dataform.DeleteTableRequest.create({ + target: { database: "db", schema: "sch", name: "tab" } + }); + const encodedRequest = dataform.DeleteTableRequest.encode(request).finish(); + + // Call with dryRun = true + await handleDbRequest(instance(mockAdapter), instance(mockClient), "DeleteTable", encodedRequest, { dryRun: true }); + + // Verify that the adapter method was NOT called + verify(mockAdapter.deleteTable(anything())).never(); + }); + + test("Execute RPC respects global dryRun flag", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const statement = "SELECT 1"; + const encodedRequest = dataform.ExecuteRequest.encode(dataform.ExecuteRequest.create({ statement })).finish(); + + when(mockClient.executeRaw(anything(), anything())).thenResolve({ rows: [], metadata: {} }); + + // Call with dryRun = true + await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest, { dryRun: true }); + + verify(mockClient.executeRaw(statement, anything())).once(); + const capturedArgs = capture(mockClient.executeRaw).last(); + expect(capturedArgs[0]).to.equal(statement); + expect(capturedArgs[1].bigquery.dryRun).to.equal(true); + }); + + test("Throws error for unrecognized RPC method", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + try { + await handleDbRequest(instance(mockAdapter), instance(mockClient), "UnknownMethod", new Uint8Array()); + expect.fail("Should have thrown"); + } catch (e) { + expect(e.message).to.contain("Unrecognized RPC method"); + } + }); + + test("Execute RPC merges global BigQuery options with request-specific options", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const statement = "SELECT 1"; + const executeRequest = dataform.ExecuteRequest.create({ + statement, + bigQueryOptions: { + labels: { request_label: "request_val" }, + jobPrefix: "request-prefix" + } + }); + const encodedRequest = dataform.ExecuteRequest.encode(executeRequest).finish(); + + const globalOptions = { + labels: { global_label: "global_val" }, + location: "EU", + jobPrefix: "global-prefix" + }; + + when(mockClient.executeRaw(statement, anything())).thenResolve({ rows: [], metadata: {} }); + + await handleDbRequest( + instance(mockAdapter), + instance(mockClient), + "Execute", + encodedRequest, + globalOptions + ); + + verify(mockClient.executeRaw(statement, anything())).once(); + const capturedOptions = capture(mockClient.executeRaw).last()[1]; + + // We expect both labels to be present + expect(capturedOptions.bigquery.labels).deep.equals({ + global_label: "global_val", + request_label: "request_val" + }); + // We expect global location to be applied + expect(capturedOptions.bigquery.location).equals("EU"); + // We expect job prefixes to be merged + expect(capturedOptions.bigquery.jobPrefix).equals("global-prefix-request-prefix"); + }); + + test("Execute RPC label merging: both global and request labels", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + const statement = "SELECT 1"; + const encodedRequest = dataform.ExecuteRequest.encode({ + statement, + bigQueryOptions: { labels: { request_label: "request_val" } } + }).finish(); + + when(mockClient.executeRaw(statement, anything())).thenResolve({ rows: [], metadata: {} }); + + await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest, { + labels: { global_label: "global_val" } + }); + + const capturedOptions = capture(mockClient.executeRaw).last()[1]; + expect(capturedOptions.bigquery.labels).deep.equals({ + global_label: "global_val", + request_label: "request_val" + }); + }); + + test("Execute RPC label merging: undefined global, defined request", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + const statement = "SELECT 1"; + const encodedRequest = dataform.ExecuteRequest.encode({ + statement, + bigQueryOptions: { labels: { request_label: "request_val" } } + }).finish(); + + when(mockClient.executeRaw(statement, anything())).thenResolve({ rows: [], metadata: {} }); + + // Global options have no labels + await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest, { + location: "US" + }); + + const capturedOptions = capture(mockClient.executeRaw).last()[1]; + expect(capturedOptions.bigquery.labels).deep.equals({ + request_label: "request_val" + }); + }); + + test("Execute RPC label merging: empty global, defined request", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + const statement = "SELECT 1"; + const encodedRequest = dataform.ExecuteRequest.encode({ + statement, + bigQueryOptions: { labels: { request_label: "request_val" } } + }).finish(); + + when(mockClient.executeRaw(statement, anything())).thenResolve({ rows: [], metadata: {} }); + + // Global options have empty labels object + await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest, { + labels: {} + }); + + const capturedOptions = capture(mockClient.executeRaw).last()[1]; + expect(capturedOptions.bigquery.labels).deep.equals({ + request_label: "request_val" + }); + }); + + test("Execute RPC label merging: defined global, undefined request", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + const statement = "SELECT 1"; + // Request has no labels + const encodedRequest = dataform.ExecuteRequest.encode({ + statement, + bigQueryOptions: { location: "US" } + }).finish(); + + when(mockClient.executeRaw(statement, anything())).thenResolve({ rows: [], metadata: {} }); + + await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest, { + labels: { global_label: "global_val" } + }); + + const capturedOptions = capture(mockClient.executeRaw).last()[1]; + expect(capturedOptions.bigquery.labels).deep.equals({ + global_label: "global_val" + }); + }); + + test("Execute RPC label merging: defined global, empty request", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + const statement = "SELECT 1"; + // Request has empty labels + const encodedRequest = dataform.ExecuteRequest.encode({ + statement, + bigQueryOptions: { labels: {} } + }).finish(); + + when(mockClient.executeRaw(statement, anything())).thenResolve({ rows: [], metadata: {} }); + + await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest, { + labels: { global_label: "global_val" } + }); + + const capturedOptions = capture(mockClient.executeRaw).last()[1]; + expect(capturedOptions.bigquery.labels).deep.equals({ + global_label: "global_val" + }); + }); + + test("Execute RPC handles raw BigQuery f,v format results", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const statement = "SELECT * FROM table"; + const encodedRequest = dataform.ExecuteRequest.encode(dataform.ExecuteRequest.create({ statement })).finish(); + + // Real raw BigQuery f/v format + const rawRows = [ + { + f: [ + { v: "42" } + ] + } + ]; + + when(mockClient.executeRaw(statement, anything())).thenResolve({ + rows: rawRows, + schema: [{ name: "id", primitive: dataform.Field.Primitive.STRING }], + metadata: { bigquery: { jobId: "job1" } } + }); + + const response = await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest); + const decoded = dataform.ExecuteResponse.decode(response); + + expect(decoded.rows.length).equals(1); + const row = decoded.rows[0]; + expect(row.fields.f).to.not.equal(undefined); + const fList = row.fields.f.listValue.values; + expect(fList[0].structValue.fields.v.stringValue).equals("42"); + expect(decoded.schemaFields.length).equals(1); + expect(decoded.schemaFields[0].name).equals("id"); + }); + + test("Execute RPC preserves complex nested BigQuery f,v format", async () => { + const mockAdapter = mock(); + const mockClient = mock(); + + const statement = "SELECT complex_struct FROM table"; + const encodedRequest = dataform.ExecuteRequest.encode(dataform.ExecuteRequest.create({ statement })).finish(); + + // Real raw BigQuery complex nested f/v format + const rawRows = [ + { + f: [ + { + v: { + f: [ + { v: "nested_val" }, + { v: "123" } + ] + } + } + ] + } + ]; + + when(mockClient.executeRaw(statement, anything())).thenResolve({ + rows: rawRows, + schema: [{ name: "complex_struct", primitive: dataform.Field.Primitive.STRING }], + metadata: { bigquery: { jobId: "job1" } } + }); + + const response = await handleDbRequest(instance(mockAdapter), instance(mockClient), "Execute", encodedRequest); + const decoded = dataform.ExecuteResponse.decode(response); + + expect(decoded.rows.length).equals(1); + const row = decoded.rows[0]; + const nestedStruct = row.fields.f.listValue.values[0].structValue.fields.v.structValue; + const nestedFList = nestedStruct.fields.f.listValue.values; + expect(nestedFList[0].structValue.fields.v.stringValue).equals("nested_val"); + expect(decoded.schemaFields.length).equals(1); + expect(decoded.schemaFields[0].name).equals("complex_struct"); + }); +}); diff --git a/cli/api/dbadapters/bigquery.ts b/cli/api/dbadapters/bigquery.ts index 6f02bc609..dc9e18cdc 100644 --- a/cli/api/dbadapters/bigquery.ts +++ b/cli/api/dbadapters/bigquery.ts @@ -36,8 +36,14 @@ export class BigQueryDbAdapter implements IDbAdapter { private readonly clients = new Map(); - constructor(credentials: dataform.IBigQuery, options?: { concurrencyLimit: number }) { + constructor( + credentials: dataform.IBigQuery, + options?: { concurrencyLimit?: number; bigqueryClient?: BigQuery } + ) { this.bigQueryCredentials = credentials; + if (options?.bigqueryClient) { + this.clients.set(credentials.projectId, options.bigqueryClient); + } // Bigquery allows 50 concurrent queries, and a rate limit of 100/user/second by default. // These limits should be safely low enough for most projects. this.pool = new PromisePoolExecutor({ @@ -92,6 +98,31 @@ export class BigQueryDbAdapter implements IDbAdapter { .promise(); } + public async executeRaw( + statement: string, + options: { + params?: { [name: string]: any }; + rowLimit?: number; + bigquery?: IBigQueryExecutionOptions; + } = { rowLimit: 1000 } + ): Promise { + if (!statement) { + throw new Error("Query string cannot be empty"); + } + return this.pool + .addSingleTask({ + generator: async () => { + const [rows, , apiResponse] = (await this.getClient().query({ + ...this.prepareQueryOptions(statement, options.rowLimit, options.bigquery, options.params), + skipParsing: true + } as any)) as any; + const schema = apiResponse?.schema?.fields?.map((field: any) => convertField(field)); + return { rows, schema, metadata: {} }; + } + }) + .promise(); + } + public async withClientLock(callback: (client: IDbClient) => Promise) { return await callback(this); } @@ -129,13 +160,17 @@ export class BigQueryDbAdapter implements IDbAdapter { ); } - public async tables(): Promise { - const datasets = await this.getClient().getDatasets({ autoPaginate: true, maxResults: 1000 }); - const tables = await Promise.all( - datasets[0].map(dataset => dataset.getTables({ autoPaginate: true, maxResults: 1000 })) + public async tables(database?: string, schema?: string): Promise { + const datasets = schema + ? [this.getClient(database).dataset(schema)] + : (await this.getClient(database).getDatasets({ autoPaginate: true, maxResults: 1000 }))[0]; + + const allTablesResults = await Promise.all( + datasets.map(dataset => dataset.getTables({ autoPaginate: true, maxResults: 1000 })) ); + const allTables: dataform.ITarget[] = []; - tables.forEach((tablesResult: GetTablesResponse) => + allTablesResults.forEach((tablesResult: GetTablesResponse) => tablesResult[0].forEach(table => allTables.push({ database: table.bigQuery.projectId, @@ -218,6 +253,13 @@ export class BigQueryDbAdapter implements IDbAdapter { }); } + public async deleteTable(target: dataform.ITarget): Promise { + await this.getClient(target.database) + .dataset(target.schema) + .table(target.name) + .delete({ ignoreNotFound: true }); + } + public async schemas(database: string): Promise { const data = await this.getClient(database).getDatasets(); return data[0].map(dataset => dataset.id); @@ -314,6 +356,25 @@ export class BigQueryDbAdapter implements IDbAdapter { return { rows: cleanRows(results), metadata: {} }; } + private prepareQueryOptions( + query: string, + rowLimit?: number, + bigqueryOptions?: IBigQueryExecutionOptions, + params?: { [name: string]: any } + ) { + return { + query, + useLegacySql: false, + jobPrefix: "dataform-" + (bigqueryOptions?.jobPrefix ? `${bigqueryOptions.jobPrefix}-` : ""), + location: bigqueryOptions?.location, + maxResults: rowLimit, + labels: bigqueryOptions?.labels, + dryRun: bigqueryOptions?.dryRun, + reservation: bigqueryOptions?.reservation, + params + }; + } + private async createQueryJob( query: string, params?: { [name: string]: any }, @@ -332,16 +393,20 @@ export class BigQueryDbAdapter implements IDbAdapter { return retry( async () => { try { - const job = await this.getClient().createQueryJob({ - useLegacySql: false, - jobPrefix: "dataform-" + (jobPrefix ? `${jobPrefix}-` : ""), - query, - params, - labels, - location, - dryRun, - reservation - } as any); + const job = await this.getClient().createQueryJob( + this.prepareQueryOptions( + query, + rowLimit, + { + labels, + location, + jobPrefix, + dryRun, + reservation + }, + params + ) as any + ); const resultStream = job[0].getQueryResultsStream(); return new Promise((resolve, reject) => { if (isCancelled) { @@ -392,8 +457,14 @@ export class BigQueryDbAdapter implements IDbAdapter { reject(error); return; } + const [, , apiResponse] = (await job[0].getQueryResults({ + maxResults: rowLimit, + location + })) as any; + const schema = apiResponse?.schema?.fields?.map((field: any) => convertField(field)); resolve({ rows: results.rows, + schema, metadata: { bigquery: { jobId: jobMetadata.jobReference.jobId, @@ -492,6 +563,9 @@ function addDescriptionToMetadata( columnDescriptions: dataform.IColumnDescriptor[], metadataArray: TableField[] ): TableField[] { + if (!columnDescriptions) { + return metadataArray; + } const findDescription = (path: string[]) => columnDescriptions.find(column => column.path.join("") === path.join("")); diff --git a/cli/api/dbadapters/bigquery_test.ts b/cli/api/dbadapters/bigquery_test.ts new file mode 100644 index 000000000..8f2d8950e --- /dev/null +++ b/cli/api/dbadapters/bigquery_test.ts @@ -0,0 +1,79 @@ +import { Dataset, Table } from "@google-cloud/bigquery"; +import { expect } from "chai"; +import { anything, instance, mock, verify, when } from "ts-mockito"; + +import { BigQueryDbAdapter } from "df/cli/api/dbadapters/bigquery"; +import { dataform } from "df/protos/ts"; +import { suite, test } from "df/testing"; + +suite("BigQueryDbAdapter", () => { + test("tables() with schema filters correctly", async () => { + const mockBigQuery = mock(); + const mockDataset = mock(); + + const tableName = "table1"; + const schemaName = "schema1"; + const projectId = "project1"; + + const credentials = dataform.BigQuery.create({ projectId, location: "US" }); + const adapter = new BigQueryDbAdapter(credentials, { bigqueryClient: instance(mockBigQuery) }); + + when(mockBigQuery.dataset(schemaName)).thenReturn(instance(mockDataset)); + when(mockDataset.getTables(anything())).thenResolve([ + [ + { + id: tableName, + dataset: { id: schemaName }, + bigQuery: { projectId } + } as any + ] + ]); + + const result = await adapter.tables(projectId, schemaName); + + expect(result.length).to.equal(1); + expect(result[0]).to.deep.equal({ + database: projectId, + schema: schemaName, + name: tableName + }); + + verify(mockBigQuery.dataset(schemaName)).once(); + verify(mockDataset.getTables(anything())).once(); + }); + + test("tables() without schema lists all datasets and tables", async () => { + const mockBigQuery = mock(); + const mockDataset = mock(); + + const credentials = dataform.BigQuery.create({ projectId: "project", location: "US" }); + const adapter = new BigQueryDbAdapter(credentials, { bigqueryClient: instance(mockBigQuery) }); + + const schemaName = "schema1"; + const tableName = "table1"; + const projectId = "project"; + + when(mockBigQuery.getDatasets(anything())).thenResolve([[instance(mockDataset)]] as any); + when(mockDataset.getTables(anything())).thenResolve([ + [ + { + id: tableName, + dataset: { id: schemaName }, + bigQuery: { projectId } + } as any + ] + ]); + + const result = await adapter.tables(projectId); + + expect(result.length).to.equal(1); + expect(result[0]).to.deep.equal({ + database: projectId, + schema: schemaName, + name: tableName + }); + + verify(mockBigQuery.getDatasets(anything())).once(); + verify(mockDataset.getTables(anything())).once(); + }); +}); diff --git a/cli/api/dbadapters/index.ts b/cli/api/dbadapters/index.ts index 1fe69dff7..9a9b89531 100644 --- a/cli/api/dbadapters/index.ts +++ b/cli/api/dbadapters/index.ts @@ -5,6 +5,7 @@ export type OnCancel = (handleCancel: () => void) => void; export interface IExecutionResult { rows: any[]; + schema?: dataform.IField[]; metadata: dataform.IExecutionMetadata; } @@ -25,6 +26,22 @@ export interface IDbClient { location?: string; jobPrefix?: string; dryRun?: boolean; + reservation?: string; + }; + } + ): Promise; + + executeRaw( + statement: string, + options?: { + params?: { [name: string]: any }; + rowLimit?: number; + bigquery?: { + labels?: { [label: string]: string }; + location?: string; + jobPrefix?: string; + dryRun?: boolean; + reservation?: string; }; } ): Promise; @@ -38,10 +55,10 @@ export interface IDbAdapter extends IDbClient { schemas(database: string): Promise; createSchema(database: string, schema: string): Promise; - // TODO: This should take parameters to allow for retrieving from a specific database/schema. - tables(): Promise; + tables(database?: string, schema?: string): Promise; search(searchText: string, options?: { limit: number }): Promise; table(target: dataform.ITarget): Promise; + deleteTable(target: dataform.ITarget): Promise; setMetadata(action: dataform.IExecutionAction): Promise; } diff --git a/cli/api/utils/constants.ts b/cli/api/utils/constants.ts new file mode 100644 index 000000000..c4049a9df --- /dev/null +++ b/cli/api/utils/constants.ts @@ -0,0 +1 @@ +export const DEFAULT_COMPILATION_TIMEOUT_MILLIS = 30000; diff --git a/cli/vm/BUILD b/cli/vm/BUILD index 69d30e808..4adc5f391 100644 --- a/cli/vm/BUILD +++ b/cli/vm/BUILD @@ -4,7 +4,10 @@ load("//tools:ts_library.bzl", "ts_library") ts_library( name = "vm", - srcs = ["compile.ts"], + srcs = [ + "compile.ts", + "jit_worker.ts", + ], deps = [ "//common/protos", "//core", @@ -23,6 +26,7 @@ ts_library( srcs = [], data = [ ":compile_loader.js", + ":jit_loader.js", ], deps = [ ":vm", @@ -50,3 +54,16 @@ nodejs_binary( "--bazel_patch_module_resolver", ], ) + +nodejs_binary( + name = "jit_worker_bin", + data = [ + ":vm", + "@npm//source-map-support", + ], + entry_point = ":jit_worker.ts", + templated_args = [ + "--node_options=--require=source-map-support/register", + "--bazel_patch_module_resolver", + ], +) diff --git a/cli/vm/jit_loader.js b/cli/vm/jit_loader.js new file mode 100644 index 000000000..cb325cee9 --- /dev/null +++ b/cli/vm/jit_loader.js @@ -0,0 +1,12 @@ +'use strict'; + +if (require.main === module) { + var entryPointPath = 'df/cli/vm/jit_worker.js'; + var mainScript = process.argv[1] = entryPointPath; + try { + module.constructor._load(mainScript, this, /*isMain=*/true); + } catch (e) { + console.error(e.stack || e); + process.exit(1); + } +} diff --git a/cli/vm/jit_worker.ts b/cli/vm/jit_worker.ts new file mode 100644 index 000000000..59c5ca624 --- /dev/null +++ b/cli/vm/jit_worker.ts @@ -0,0 +1,120 @@ +import * as fs from "fs"; +import * as path from "path"; +import { NodeVM } from "vm2"; + +import { dataform } from "df/protos/ts"; + +const pendingRpcCallbacks = new Map void>(); +let hasStartedProcessing = false; + +process.on("message", (res: any) => { + if (res.type === "rpc_response") { + const callback = pendingRpcCallbacks.get(res.correlationId); + if (callback) { + pendingRpcCallbacks.delete(res.correlationId); + if (res.error) { + callback(res.error, null); + } else { + callback(null, res.response); + } + } + } +}); + +export async function handleJitRequest(message: { + request: any; + projectDir: string; +}) { + try { + const { request, projectDir } = message; + + const rpcCallback = (method: string, reqBase64: string, callback: (err: string | null, resBase64: string | null) => void) => { + const correlationId = Math.random().toString(36).substring(7); + pendingRpcCallbacks.set(correlationId, callback); + + process.send({ + type: "rpc_request", + method, + request: reqBase64, + correlationId + }); + }; + + const projectLocalCorePath = path.join(projectDir, "node_modules", "@dataform", "core", "bundle.js"); + const hasProjectLocalCore = fs.existsSync(projectLocalCorePath); + + const requestMessage = dataform.JitCompilationRequest.fromObject(request); + const requestBytes = dataform.JitCompilationRequest.encode(requestMessage).finish(); + const requestBase64 = Buffer.from(requestBytes).toString("base64"); + + const vmFileName = path.resolve(projectDir, "index.js"); + + const vm = new NodeVM({ + env: process.env, + require: { + builtin: [], + context: "sandbox", + external: { + modules: ["@dataform/*"] + }, + root: projectDir, + mock: hasProjectLocalCore ? {} : { + "@dataform/core": require("@dataform/core") + } + }, + sourceExtensions: ["js", "json"] + }); + + const jitCompileInVm = vm.run(` + const { jitCompiler } = require("@dataform/core"); + + global.require = require; + + module.exports = async (requestBase64, armoredRpcCallback) => { + const requestBytes = new Uint8Array(Buffer.from(requestBase64, "base64")); + + const internalRpcCallback = (method, reqBytes, callback) => { + const reqBase64 = Buffer.from(reqBytes).toString("base64"); + armoredRpcCallback(method, reqBase64, (errStr, resBase64) => { + if (errStr) { + callback(new Error(errStr), null); + } else { + callback(null, new Uint8Array(Buffer.from(resBase64, "base64"))); + } + }); + }; + + const compilerInstance = jitCompiler(internalRpcCallback); + const responseBytes = await compilerInstance.compile(requestBytes); + + return Buffer.from(responseBytes).toString("base64"); + }; + `, vmFileName); + + const responseBase64 = await jitCompileInVm(requestBase64, rpcCallback); + const response = dataform.JitCompilationResponse.decode(Buffer.from(responseBase64, "base64")); + + process.send({ type: "jit_response", response: response.toJSON() }); + } catch (e) { + process.send({ type: "jit_error", error: e.stack || e.message }); + } +} + +if (require.main === module) { + if (process.send) { + process.send({ type: "worker_booted" }); + } + process.on("message", async (message: any) => { + if (message.request) { + if (hasStartedProcessing) { + process.send({ + type: "jit_error", + error: "Worker process received multiple JiT compilation requests. Subsequent requests are rejected." + }); + return; + } + hasStartedProcessing = true; + await handleJitRequest(message); + } + }); +} diff --git a/common/protos/BUILD b/common/protos/BUILD index 87efbbe1c..6f12d80ed 100644 --- a/common/protos/BUILD +++ b/common/protos/BUILD @@ -10,6 +10,7 @@ ts_library( deps = [ "//:modules-fix", "//common/strings", + "//protos:ts", "@npm//protobufjs", ], ) diff --git a/common/protos/structs.ts b/common/protos/structs.ts new file mode 100644 index 000000000..b937ed517 --- /dev/null +++ b/common/protos/structs.ts @@ -0,0 +1,70 @@ +import { google } from "df/protos/ts"; + +export class Structs { + public static toObject(struct?: google.protobuf.IStruct): { [key: string]: any } | undefined { + if (!struct || !struct.fields) { + return undefined; + } + const result: { [key: string]: any } = {}; + for (const [key, value] of Object.entries(struct.fields)) { + result[key] = this.fromValue(value); + } + return result; + } + + public static fromObject(obj: { [key: string]: any }): google.protobuf.IStruct { + const fields: { [key: string]: google.protobuf.IValue } = {}; + for (const [key, val] of Object.entries(obj)) { + fields[key] = this.toValue(val); + } + return { fields }; + } + + private static fromValue(value: google.protobuf.IValue): any { + if (value.nullValue !== null && value.nullValue !== undefined) { + return null; + } + if (value.numberValue !== null && value.numberValue !== undefined) { + return value.numberValue; + } + if (value.stringValue !== null && value.stringValue !== undefined) { + return value.stringValue; + } + if (value.boolValue !== null && value.boolValue !== undefined) { + return value.boolValue; + } + if (value.structValue !== null && value.structValue !== undefined) { + return this.toObject(value.structValue); + } + if (value.listValue !== null && value.listValue !== undefined) { + return (value.listValue.values || []).map((v: any) => this.fromValue(v)); + } + return undefined; + } + + private static toValue(val: any): google.protobuf.IValue { + if (typeof val === "number") { + return { numberValue: val }; + } + if (typeof val === "string") { + return { stringValue: val }; + } + if (typeof val === "boolean") { + return { boolValue: val }; + } + if (val === null || val === undefined) { + return { nullValue: 0 }; + } + if (Array.isArray(val)) { + return { + listValue: { + values: val.map(v => this.toValue(v)) + } + }; + } + if (typeof val === "object") { + return { structValue: this.fromObject(val) }; + } + return { nullValue: 0 }; + } +} diff --git a/core/jit_context.ts b/core/jit_context.ts index d7e994836..ed0432bde 100644 --- a/core/jit_context.ts +++ b/core/jit_context.ts @@ -1,3 +1,4 @@ +import { Structs } from "df/common/protos/structs"; import { IActionContext, ITableContext, JitContext, Resolvable } from "df/core/contextables"; import { ambiguousActionNameMsg, resolvableAsTarget, ResolvableMap, stringifyResolvable, toResolvable } from "df/core/utils"; import { dataform, google } from "df/protos/ts"; @@ -24,7 +25,7 @@ export class SqlActionJitContext implements JitContext { actionTarget: dep, value: canonicalTargetValue(dep) }))); - this.data = jitDataToJsValue(request.jitData); + this.data = Structs.toObject(request.jitData); } public self(): string { @@ -103,46 +104,3 @@ export class IncrementalTableJitContext extends TableJitContext { } } -function jitDataToJsValue(value?: google.protobuf.IStruct): { [key: string]: {} } | undefined { - if (value === undefined || value === null) { - return - } - function protobufValueToJs(val: google.protobuf.IValue): {} { - if (val.nullValue != null) { - return null; - } - if (val.stringValue != null) { - return val.stringValue; - } - if (val.numberValue != null) { - return val.numberValue; - } - if (val.boolValue != null) { - return val.boolValue; - } - if (val.listValue != null) { - return (val.listValue.values || []).map(protobufValueToJs); - } - if (val.structValue != null) { - return Object.fromEntries( - Object.entries(val.structValue.fields || {}).map( - ([fieldKey, fieldValue]) => ([ - fieldKey, - protobufValueToJs(fieldValue) - ]) - ) - ); - } - - throw new Error(`Unsupported protobuf value: ${JSON.stringify(val)}`); - } - - return Object.fromEntries( - Object.entries(value.fields || {}).map( - ([fieldKey, fieldValue]) => [ - fieldKey, - protobufValueToJs(fieldValue) - ] - ) - ); -} diff --git a/protos/jit.proto b/protos/jit.proto index 999a98e92..7beaa5236 100644 --- a/protos/jit.proto +++ b/protos/jit.proto @@ -34,6 +34,8 @@ message ExecuteRequest { bool fetch_results = 4; BigQueryExecuteOptions big_query_options = 5; + // Query parameters for parameterized queries. + google.protobuf.Struct params = 6; } message BigQueryExecuteOptions { @@ -49,6 +51,8 @@ message BigQueryExecuteOptions { string job_prefix = 5; // Is dry run job. bool dry_run = 6; + // BigQuery reservation to use for the job. + string reservation = 7; } // Synchronous execution response result.