Support BigQuery dialect in Beam SQL#35472
Conversation
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
|
Assigning reviewers: R: @m-trieu for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Hello @liferoad , I continue to investigate on my side. NOTE : this test is for debugging purpose. @Test
public void testInternalConnect_unbounded_limit_bq() throws Exception {
ReadOnlyTableProvider tableProvider =
new ReadOnlyTableProvider(
"test",
ImmutableMap.of(
"test",
TestUnboundedTable.of(
Schema.FieldType.INT32, "order_id",
Schema.FieldType.INT32, "site_id",
Schema.FieldType.INT32, "price",
Schema.FieldType.STRING, "label",
Schema.FieldType.STRING, "date",
Schema.FieldType.DATETIME, "order_time")
.timestampColumnIndex(5)
.addRows(
Duration.ZERO,
1,
1,
1,
"test-1",
"2025-06-30",
FIRST_DATE,
1,
2,
6,
"test-2",
"2025-06-30",
FIRST_DATE)));
BeamSqlPipelineOptions options =
PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
options.setCalciteConnectionDialect("BIGQUERY");
CalciteConnection connection = JdbcDriver.connect(tableProvider, options);
assertEquals(
"BIGQUERY", connection.getProperties().getProperty("fun", "CALCITE").toUpperCase());
Statement statement = connection.createStatement();
ResultSet resultSet1 =
statement.executeQuery("SELECT SUBSTRING(`label`,2), BIT_COUNT(`price`) FROM test LIMIT 1");
assertTrue(resultSet1.next());
System.out.println("DEBUG " + resultSet1.getString(1));
System.out.println("DEBUG " + resultSet1.getInt(2));
// System.out.println(resultSet1.getInt(2));
assertFalse(resultSet1.next());
ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 2");
assertTrue(resultSet2.next());
assertTrue(resultSet2.next());
assertFalse(resultSet2.next());
}I already tried with calcite native functions and everything is alright @Test
public void testInternalConnect_unbounded_limit_bq_native() throws Exception {
ReadOnlyTableProvider tableProvider =
new ReadOnlyTableProvider(
"test",
ImmutableMap.of(
"test",
TestUnboundedTable.of(
Schema.FieldType.INT32, "order_id",
Schema.FieldType.INT32, "site_id",
Schema.FieldType.INT32, "price",
Schema.FieldType.STRING, "label",
Schema.FieldType.STRING, "date",
Schema.FieldType.DATETIME, "order_time")
.timestampColumnIndex(5)
.addRows(
Duration.ZERO,
1,
1,
1,
"test-1",
"2025-06-30",
FIRST_DATE,
1,
2,
6,
"test-2",
"2025-06-30",
FIRST_DATE)));
BeamSqlPipelineOptions options =
PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
options.setCalciteConnectionDialect("BIGQUERY");
CalciteConnection connection = JdbcDriver.connect(tableProvider, options);
assertEquals(
"BIGQUERY", connection.getProperties().getProperty("fun", "CALCITE").toUpperCase());
Statement statement = connection.createStatement();
ResultSet resultSet1 =
statement.executeQuery(
"SELECT SUBSTRING(`label`,2), CAST(`date` AS DATE) FROM test LIMIT 1");
assertTrue(resultSet1.next());
System.out.println("DEBUG " + resultSet1.getString(1));
System.out.println("DEBUG " + resultSet1.getDate(2));
// System.out.println(resultSet1.getInt(2));
assertFalse(resultSet1.next());
ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 2");
assertTrue(resultSet2.next());
assertTrue(resultSet2.next());
assertFalse(resultSet2.next());
} |
|
Thanks. i will check your tests. |
|
Hello @liferoad , After further analysis, your modification appears to be correct. However, the current version of Calcite (1.28.0) lacks support for many BigQuery (or other dialect) functions. In fact, if you examine the for BigQuery, there are only 18 functions available compared to 100 in the latest versions of Calcite (see reference: Calcite SqlLibraryOperators Documentation). It seems necessary to create a separate issue to address updating Calcite, as version 1.28.0 was released 4 years ago. As a suggestion for this Pull Request, adding the ability to configure the lexical policy ( My test updated with bigquery function supported by calcite 1.28 @Test
public void testInternalConnect_unbounded_limit_bq() throws Exception {
ReadOnlyTableProvider tableProvider =
new ReadOnlyTableProvider(
"test",
ImmutableMap.of(
"test",
TestUnboundedTable.of(
Schema.FieldType.INT32, "order_id",
Schema.FieldType.INT32, "site_id",
Schema.FieldType.INT32, "price",
Schema.FieldType.STRING, "label",
Schema.FieldType.STRING, "date",
Schema.FieldType.DATETIME, "order_time")
.timestampColumnIndex(5)
.addRows(
Duration.ZERO,
1,
1,
1,
"test-1",
"2025-06-30",
FIRST_DATE,
1,
2,
6,
"test-2",
"2025-06-30",
FIRST_DATE)));
BeamSqlPipelineOptions options =
PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
options.setCalciteConnectionDialect("bigquery");
CalciteConnection connection = JdbcDriver.connect(tableProvider, options);
assertEquals("bigquery", connection.getProperties().getProperty("fun", "CALCITE"));
Statement statement = connection.createStatement();
ResultSet resultSet1 =
statement.executeQuery(
"SELECT SUBSTRING(`label`,2), SUBSTR(`label`, 2,4) FROM test LIMIT 1");
assertTrue(resultSet1.next());
System.out.println("DEBUG " + resultSet1.getString(1));
System.out.println("DEBUG " + resultSet1.getString(2));
// System.out.println(resultSet1.getInt(2));
assertFalse(resultSet1.next());
ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 2");
assertTrue(resultSet2.next());
assertTrue(resultSet2.next());
assertFalse(resultSet2.next());
} |
Fixes #35457
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.