-
Notifications
You must be signed in to change notification settings - Fork 198
JiT Worker in CLI and RPC infrastructure #2109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| import { ChildProcess, fork } from "child_process"; | ||
|
|
||
| export abstract class BaseWorker<TResponse, TMessage = any> { | ||
| protected constructor(private readonly loaderPath: string) {} | ||
|
|
||
| protected async runWorker( | ||
| timeoutMillis: number, | ||
| onBoot: (child: ChildProcess) => void, | ||
| onMessage: (message: TMessage, child: ChildProcess, resolve: (res: TResponse) => void, reject: (err: Error) => void) => void | ||
| ): Promise<TResponse> { | ||
| const forkScript = this.resolveScript(); | ||
| const child = fork(forkScript, [], { | ||
| stdio: [0, 1, 2, "ipc", "pipe"] | ||
| }); | ||
|
|
||
| return new Promise((resolve, reject) => { | ||
| let completed = false; | ||
|
|
||
| const terminate = (fn: () => void) => { | ||
| if (completed) { | ||
| return; | ||
| } | ||
| completed = true; | ||
| clearTimeout(timeout); | ||
| child.kill(); | ||
| fn(); | ||
| }; | ||
|
|
||
| const timeout = setTimeout(() => { | ||
| terminate(() => | ||
| reject(new Error(`Worker timed out after ${timeoutMillis / 1000} seconds`)) | ||
| ); | ||
| }, timeoutMillis); | ||
|
|
||
| child.on("message", (message: any) => { | ||
| if (message.type === "worker_booted") { | ||
| onBoot(child); | ||
| return; | ||
| } | ||
| onMessage(message, child, (res) => terminate(() => resolve(res)), (err) => terminate(() => reject(err))); | ||
| }); | ||
|
|
||
| child.on("error", err => { | ||
| terminate(() => reject(err)); | ||
| }); | ||
|
|
||
| child.on("exit", (code, signal) => { | ||
| if (!completed) { | ||
| const errorMsg = | ||
| code !== 0 && code !== null | ||
| ? `Worker exited with code ${code} and signal ${signal}` | ||
| : "Worker exited without sending a response message"; | ||
| terminate(() => reject(new Error(errorMsg))); | ||
| } | ||
| }); | ||
| }); | ||
| } | ||
|
|
||
| private resolveScript() { | ||
| const pathsToTry = [this.loaderPath, "./worker_bundle.js"]; | ||
| for (const p of pathsToTry) { | ||
| try { | ||
| return require.resolve(p); | ||
| } catch (e) { | ||
| // Continue to next path. | ||
| } | ||
| } | ||
| throw new Error(`Could not resolve worker script. Tried: ${pathsToTry.join(", ")}`); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| import { ChildProcess } from "child_process"; | ||
|
|
||
| import { BaseWorker } from "df/cli/api/commands/base_worker"; | ||
| import { handleDbRequest } from "df/cli/api/commands/jit/rpc"; | ||
| import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters"; | ||
| import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery"; | ||
| import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants"; | ||
| import { dataform } from "df/protos/ts"; | ||
|
|
||
| export interface IJitWorkerMessage { | ||
| type: "rpc_request" | "jit_response" | "jit_error"; | ||
| method?: string; | ||
| request?: string; | ||
| correlationId?: string; | ||
| response?: any; | ||
| error?: string; | ||
| } | ||
|
|
||
| export class JitCompileChildProcess extends BaseWorker< | ||
| dataform.IJitCompilationResponse, | ||
| IJitWorkerMessage | ||
| > { | ||
| public static async compile( | ||
| request: dataform.IJitCompilationRequest, | ||
| projectDir: string, | ||
| dbadapter: IDbAdapter, | ||
| dbclient: IDbClient, | ||
| timeoutMillis: number = DEFAULT_COMPILATION_TIMEOUT_MILLIS, | ||
| options?: IBigQueryExecutionOptions | ||
| ): Promise<dataform.IJitCompilationResponse> { | ||
| return await new JitCompileChildProcess().run( | ||
| request, | ||
| projectDir, | ||
| dbadapter, | ||
| dbclient, | ||
| timeoutMillis, | ||
| options | ||
| ); | ||
| } | ||
|
|
||
| constructor() { | ||
| super("../../../vm/jit_loader"); | ||
| } | ||
|
|
||
| private async run( | ||
| request: dataform.IJitCompilationRequest, | ||
| projectDir: string, | ||
| dbadapter: IDbAdapter, | ||
| dbclient: IDbClient, | ||
| timeoutMillis: number, | ||
| options?: IBigQueryExecutionOptions | ||
| ): Promise<dataform.IJitCompilationResponse> { | ||
| return await this.runWorker( | ||
| timeoutMillis, | ||
| child => { | ||
| child.send({ | ||
| type: "jit_compile", | ||
| request, | ||
| projectDir | ||
| }); | ||
| }, | ||
| async (message, child, resolve, reject) => { | ||
| if (message.type === "rpc_request") { | ||
rafal-hawrylak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| await this.handleRpcRequest(message, child, dbadapter, dbclient, options); | ||
| } else if (message.type === "jit_response") { | ||
| resolve(dataform.JitCompilationResponse.fromObject(message.response)); | ||
| } else if (message.type === "jit_error") { | ||
| reject(new Error(message.error)); | ||
| } | ||
| } | ||
| ); | ||
| } | ||
|
|
||
| private async handleRpcRequest( | ||
| message: IJitWorkerMessage, | ||
| child: ChildProcess, | ||
| dbadapter: IDbAdapter, | ||
| dbclient: IDbClient, | ||
| options?: IBigQueryExecutionOptions | ||
| ) { | ||
| try { | ||
| const response = await handleDbRequest( | ||
| dbadapter, | ||
| dbclient, | ||
| message.method, | ||
| Buffer.from(message.request, "base64"), | ||
| options | ||
| ); | ||
| child.send({ | ||
| type: "rpc_response", | ||
| correlationId: message.correlationId, | ||
| response: Buffer.from(response).toString("base64") | ||
| }); | ||
| } catch (e) { | ||
| child.send({ | ||
| type: "rpc_response", | ||
| correlationId: message.correlationId, | ||
| error: e.message | ||
| }); | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| import Long from "long"; | ||
|
|
||
| import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters"; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To clarify: is this protobuf bridge protocol the same for GCP and CLI implementations?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both the CLI and GCP implementations follow the dataform.DbAdapter service defined in jit.proto.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See my comment below on the issue with "Execute" compatibility
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PTAL on the approach with executeRaw |
||
| import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery"; | ||
| import { Structs } from "df/common/protos/structs"; | ||
| import { dataform, google } from "df/protos/ts"; | ||
|
|
||
| export async function handleDbRequest( | ||
| dbadapter: IDbAdapter, | ||
| dbclient: IDbClient, | ||
| method: string, | ||
| request: Uint8Array, | ||
| options?: IBigQueryExecutionOptions | ||
| ): Promise<Uint8Array> { | ||
| switch (method) { | ||
| case "Execute": | ||
| return await handleExecute(dbclient, request, options); | ||
| case "ListTables": | ||
| return await handleListTables(dbadapter, request); | ||
| case "GetTable": | ||
| return await handleGetTable(dbadapter, request); | ||
| case "DeleteTable": | ||
| return await handleDeleteTable(dbadapter, request, options?.dryRun); | ||
| default: | ||
| throw new Error(`Unrecognized RPC method: ${method}`); | ||
| } | ||
| } | ||
|
|
||
| async function handleExecute( | ||
rafal-hawrylak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| dbclient: IDbClient, | ||
| request: Uint8Array, | ||
| options?: IBigQueryExecutionOptions | ||
| ): Promise<Uint8Array> { | ||
| const executeRequest = dataform.ExecuteRequest.decode(request); | ||
| const executeRequestObj = dataform.ExecuteRequest.toObject(executeRequest, { | ||
| defaults: false | ||
| }); | ||
| const requestOptions = executeRequestObj.bigQueryOptions; | ||
|
|
||
| const results = await dbclient.executeRaw(executeRequest.statement, { | ||
| rowLimit: executeRequest.rowLimit ? (executeRequest.rowLimit as Long).toNumber() : undefined, | ||
| params: Structs.toObject(executeRequest.params), | ||
| bigquery: { | ||
| ...options, | ||
| ...requestOptions, | ||
| labels: { | ||
| ...options?.labels, | ||
| ...requestOptions?.labels | ||
| }, | ||
| jobPrefix: [options?.jobPrefix, requestOptions?.jobPrefix].filter(Boolean).join("-") || undefined | ||
| } | ||
rafal-hawrylak marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| }); | ||
|
|
||
| return dataform.ExecuteResponse.encode({ | ||
| rows: (results.rows || []).map(row => Structs.fromObject(row)), | ||
| schemaFields: results.schema || [] | ||
| } as any).finish(); | ||
| } | ||
|
|
||
| async function handleListTables(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> { | ||
| const listTablesRequest = dataform.ListTablesRequest.decode(request); | ||
| const targets = await dbadapter.tables(listTablesRequest.database, listTablesRequest.schema); | ||
| const tablesMetadata = await Promise.all(targets.map(target => dbadapter.table(target))); | ||
| const listTablesResponse = dataform.ListTablesResponse.create({ | ||
| tables: tablesMetadata | ||
| }); | ||
| return dataform.ListTablesResponse.encode(listTablesResponse).finish(); | ||
| } | ||
|
|
||
| async function handleGetTable(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> { | ||
| const getTableRequest = dataform.GetTableRequest.decode(request); | ||
| const tableMetadata = await dbadapter.table(getTableRequest.target); | ||
| if (!tableMetadata) { | ||
| throw new Error(`Table not found: ${JSON.stringify(getTableRequest.target)}`); | ||
| } | ||
| return dataform.TableMetadata.encode(tableMetadata).finish(); | ||
| } | ||
|
|
||
| async function handleDeleteTable( | ||
| dbadapter: IDbAdapter, | ||
| request: Uint8Array, | ||
| dryRun?: boolean | ||
| ): Promise<Uint8Array> { | ||
| const deleteTableRequest = dataform.DeleteTableRequest.decode(request); | ||
| if (dryRun) { | ||
| return new Uint8Array(); | ||
| } | ||
| await dbadapter.deleteTable(deleteTableRequest.target); | ||
| return new Uint8Array(); | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.