Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cli/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ node_modules(
ts_test_suite(
name = "tests",
srcs = [
"utils_test.ts",
"commands/jit/rpc_test.ts",
"dbadapters/bigquery_test.ts",
],
data = [
":node_modules",
Expand All @@ -68,10 +69,11 @@ ts_test_suite(
"@nodejs//:npm",
],
deps = [
"//cli/api",
":api",
"//core",
"//protos:ts",
"//testing",
"@npm//@google-cloud/bigquery",
"@npm//@types/chai",
"@npm//@types/fs-extra",
"@npm//@types/js-yaml",
Expand Down
70 changes: 70 additions & 0 deletions cli/api/commands/base_worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { ChildProcess, fork } from "child_process";

export abstract class BaseWorker<TResponse, TMessage = any> {
protected constructor(private readonly loaderPath: string) {}

protected async runWorker(
timeoutMillis: number,
onBoot: (child: ChildProcess) => void,
onMessage: (message: TMessage, child: ChildProcess, resolve: (res: TResponse) => void, reject: (err: Error) => void) => void
): Promise<TResponse> {
const forkScript = this.resolveScript();
const child = fork(forkScript, [], {
stdio: [0, 1, 2, "ipc", "pipe"]
});

return new Promise((resolve, reject) => {
let completed = false;

const terminate = (fn: () => void) => {
if (completed) {
return;
}
completed = true;
clearTimeout(timeout);
child.kill();
fn();
};

const timeout = setTimeout(() => {
terminate(() =>
reject(new Error(`Worker timed out after ${timeoutMillis / 1000} seconds`))
);
}, timeoutMillis);

child.on("message", (message: any) => {
if (message.type === "worker_booted") {
onBoot(child);
return;
}
onMessage(message, child, (res) => terminate(() => resolve(res)), (err) => terminate(() => reject(err)));
});

child.on("error", err => {
terminate(() => reject(err));
});

child.on("exit", (code, signal) => {
if (!completed) {
const errorMsg =
code !== 0 && code !== null
? `Worker exited with code ${code} and signal ${signal}`
: "Worker exited without sending a response message";
terminate(() => reject(new Error(errorMsg)));
}
});
});
}

private resolveScript() {
const pathsToTry = [this.loaderPath, "./worker_bundle.js"];
for (const p of pathsToTry) {
try {
return require.resolve(p);
} catch (e) {
// Continue to next path.
}
}
throw new Error(`Could not resolve worker script. Tried: ${pathsToTry.join(", ")}`);
}
}
102 changes: 102 additions & 0 deletions cli/api/commands/jit/compiler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import { ChildProcess } from "child_process";

import { BaseWorker } from "df/cli/api/commands/base_worker";
import { handleDbRequest } from "df/cli/api/commands/jit/rpc";
import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
import { DEFAULT_COMPILATION_TIMEOUT_MILLIS } from "df/cli/api/utils/constants";
import { dataform } from "df/protos/ts";

export interface IJitWorkerMessage {
type: "rpc_request" | "jit_response" | "jit_error";
method?: string;
request?: string;
correlationId?: string;
response?: any;
error?: string;
}

export class JitCompileChildProcess extends BaseWorker<
dataform.IJitCompilationResponse,
IJitWorkerMessage
> {
public static async compile(
request: dataform.IJitCompilationRequest,
projectDir: string,
dbadapter: IDbAdapter,
dbclient: IDbClient,
timeoutMillis: number = DEFAULT_COMPILATION_TIMEOUT_MILLIS,
options?: IBigQueryExecutionOptions
): Promise<dataform.IJitCompilationResponse> {
return await new JitCompileChildProcess().run(
request,
projectDir,
dbadapter,
dbclient,
timeoutMillis,
options
);
}

constructor() {
super("../../../vm/jit_loader");
}

private async run(
request: dataform.IJitCompilationRequest,
projectDir: string,
dbadapter: IDbAdapter,
dbclient: IDbClient,
timeoutMillis: number,
options?: IBigQueryExecutionOptions
): Promise<dataform.IJitCompilationResponse> {
return await this.runWorker(
timeoutMillis,
child => {
child.send({
type: "jit_compile",
request,
projectDir
});
},
async (message, child, resolve, reject) => {
if (message.type === "rpc_request") {
await this.handleRpcRequest(message, child, dbadapter, dbclient, options);
} else if (message.type === "jit_response") {
resolve(dataform.JitCompilationResponse.fromObject(message.response));
} else if (message.type === "jit_error") {
reject(new Error(message.error));
}
}
);
}

private async handleRpcRequest(
message: IJitWorkerMessage,
child: ChildProcess,
dbadapter: IDbAdapter,
dbclient: IDbClient,
options?: IBigQueryExecutionOptions
) {
try {
const response = await handleDbRequest(
dbadapter,
dbclient,
message.method,
Buffer.from(message.request, "base64"),
options
);
child.send({
type: "rpc_response",
correlationId: message.correlationId,
response: Buffer.from(response).toString("base64")
});
} catch (e) {
child.send({
type: "rpc_response",
correlationId: message.correlationId,
error: e.message
});
}
}
}
90 changes: 90 additions & 0 deletions cli/api/commands/jit/rpc.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import Long from "long";

import { IDbAdapter, IDbClient } from "df/cli/api/dbadapters";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify: is this protobuf bridge protocol the same for GCP and CLI implementations?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both the CLI and GCP implementations follow the dataform.DbAdapter service defined in jit.proto.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment below on the issue with "Execute" compatibility

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL on the approach with executeRaw

import { IBigQueryExecutionOptions } from "df/cli/api/dbadapters/bigquery";
import { Structs } from "df/common/protos/structs";
import { dataform, google } from "df/protos/ts";

export async function handleDbRequest(
dbadapter: IDbAdapter,
dbclient: IDbClient,
method: string,
request: Uint8Array,
options?: IBigQueryExecutionOptions
): Promise<Uint8Array> {
switch (method) {
case "Execute":
return await handleExecute(dbclient, request, options);
case "ListTables":
return await handleListTables(dbadapter, request);
case "GetTable":
return await handleGetTable(dbadapter, request);
case "DeleteTable":
return await handleDeleteTable(dbadapter, request, options?.dryRun);
default:
throw new Error(`Unrecognized RPC method: ${method}`);
}
}

async function handleExecute(
dbclient: IDbClient,
request: Uint8Array,
options?: IBigQueryExecutionOptions
): Promise<Uint8Array> {
const executeRequest = dataform.ExecuteRequest.decode(request);
const executeRequestObj = dataform.ExecuteRequest.toObject(executeRequest, {
defaults: false
});
const requestOptions = executeRequestObj.bigQueryOptions;

const results = await dbclient.executeRaw(executeRequest.statement, {
rowLimit: executeRequest.rowLimit ? (executeRequest.rowLimit as Long).toNumber() : undefined,
params: Structs.toObject(executeRequest.params),
bigquery: {
...options,
...requestOptions,
labels: {
...options?.labels,
...requestOptions?.labels
},
jobPrefix: [options?.jobPrefix, requestOptions?.jobPrefix].filter(Boolean).join("-") || undefined
}
});

return dataform.ExecuteResponse.encode({
rows: (results.rows || []).map(row => Structs.fromObject(row)),
schemaFields: results.schema || []
} as any).finish();
}

async function handleListTables(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> {
const listTablesRequest = dataform.ListTablesRequest.decode(request);
const targets = await dbadapter.tables(listTablesRequest.database, listTablesRequest.schema);
const tablesMetadata = await Promise.all(targets.map(target => dbadapter.table(target)));
const listTablesResponse = dataform.ListTablesResponse.create({
tables: tablesMetadata
});
return dataform.ListTablesResponse.encode(listTablesResponse).finish();
}

async function handleGetTable(dbadapter: IDbAdapter, request: Uint8Array): Promise<Uint8Array> {
const getTableRequest = dataform.GetTableRequest.decode(request);
const tableMetadata = await dbadapter.table(getTableRequest.target);
if (!tableMetadata) {
throw new Error(`Table not found: ${JSON.stringify(getTableRequest.target)}`);
}
return dataform.TableMetadata.encode(tableMetadata).finish();
}

async function handleDeleteTable(
dbadapter: IDbAdapter,
request: Uint8Array,
dryRun?: boolean
): Promise<Uint8Array> {
const deleteTableRequest = dataform.DeleteTableRequest.decode(request);
if (dryRun) {
return new Uint8Array();
}
await dbadapter.deleteTable(deleteTableRequest.target);
return new Uint8Array();
}
Loading
Loading