Skip to content

Support BigQuery dialect in Beam SQL#35472

Merged
liferoad merged 3 commits intoapache:masterfrom
liferoad:use-calcite
Jul 2, 2025
Merged

Support BigQuery dialect in Beam SQL#35472
liferoad merged 3 commits intoapache:masterfrom
liferoad:use-calcite

Conversation

@liferoad
Copy link
Contributor

Fixes #35457


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@liferoad liferoad requested a review from Abacn June 29, 2025 14:18
@github-actions
Copy link
Contributor

Assigning reviewers:

R: @m-trieu for label java.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@flo076
Copy link

flo076 commented Jun 30, 2025

Hello @liferoad ,
thanks for this pull request but i think is not enough
I try your modification on my side and i have an error when i try to execute a new test in JdbcDriverTest.java.
According to the calcite documentation "BIT_COUNT" function is available only with bigquery dialect.

I continue to investigate on my side.

NOTE : this test is for debugging purpose.

@Test
  public void testInternalConnect_unbounded_limit_bq() throws Exception {
    ReadOnlyTableProvider tableProvider =
        new ReadOnlyTableProvider(
            "test",
            ImmutableMap.of(
                "test",
                TestUnboundedTable.of(
                        Schema.FieldType.INT32, "order_id",
                        Schema.FieldType.INT32, "site_id",
                        Schema.FieldType.INT32, "price",
                        Schema.FieldType.STRING, "label",
                        Schema.FieldType.STRING, "date",
                        Schema.FieldType.DATETIME, "order_time")
                    .timestampColumnIndex(5)
                    .addRows(
                        Duration.ZERO,
                        1,
                        1,
                        1,
                        "test-1",
                        "2025-06-30",
                        FIRST_DATE,
                        1,
                        2,
                        6,
                        "test-2",
                        "2025-06-30",
                        FIRST_DATE)));
    BeamSqlPipelineOptions options =
        PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
    options.setCalciteConnectionDialect("BIGQUERY");

    CalciteConnection connection = JdbcDriver.connect(tableProvider, options);
    assertEquals(
        "BIGQUERY", connection.getProperties().getProperty("fun", "CALCITE").toUpperCase());
    Statement statement = connection.createStatement();

    ResultSet resultSet1 =
        statement.executeQuery("SELECT SUBSTRING(`label`,2), BIT_COUNT(`price`) FROM test LIMIT 1");
    assertTrue(resultSet1.next());
    System.out.println("DEBUG " + resultSet1.getString(1));
    System.out.println("DEBUG " + resultSet1.getInt(2));
    //    System.out.println(resultSet1.getInt(2));
    assertFalse(resultSet1.next());

    ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 2");
    assertTrue(resultSet2.next());
    assertTrue(resultSet2.next());
    assertFalse(resultSet2.next());
  }

I already tried with calcite native functions and everything is alright

@Test
  public void testInternalConnect_unbounded_limit_bq_native() throws Exception {
    ReadOnlyTableProvider tableProvider =
        new ReadOnlyTableProvider(
            "test",
            ImmutableMap.of(
                "test",
                TestUnboundedTable.of(
                        Schema.FieldType.INT32, "order_id",
                        Schema.FieldType.INT32, "site_id",
                        Schema.FieldType.INT32, "price",
                        Schema.FieldType.STRING, "label",
                        Schema.FieldType.STRING, "date",
                        Schema.FieldType.DATETIME, "order_time")
                    .timestampColumnIndex(5)
                    .addRows(
                        Duration.ZERO,
                        1,
                        1,
                        1,
                        "test-1",
                        "2025-06-30",
                        FIRST_DATE,
                        1,
                        2,
                        6,
                        "test-2",
                        "2025-06-30",
                        FIRST_DATE)));
    BeamSqlPipelineOptions options =
        PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
    options.setCalciteConnectionDialect("BIGQUERY");

    CalciteConnection connection = JdbcDriver.connect(tableProvider, options);
    assertEquals(
        "BIGQUERY", connection.getProperties().getProperty("fun", "CALCITE").toUpperCase());
    Statement statement = connection.createStatement();

    ResultSet resultSet1 =
        statement.executeQuery(
            "SELECT SUBSTRING(`label`,2), CAST(`date` AS DATE) FROM test LIMIT 1");
    assertTrue(resultSet1.next());
    System.out.println("DEBUG " + resultSet1.getString(1));
    System.out.println("DEBUG " + resultSet1.getDate(2));
    //    System.out.println(resultSet1.getInt(2));
    assertFalse(resultSet1.next());

    ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 2");
    assertTrue(resultSet2.next());
    assertTrue(resultSet2.next());
    assertFalse(resultSet2.next());
  }

@liferoad
Copy link
Contributor Author

Thanks. i will check your tests.

@flo076
Copy link

flo076 commented Jun 30, 2025

Hello @liferoad ,

After further analysis, your modification appears to be correct. However, the current version of Calcite (1.28.0) lacks support for many BigQuery (or other dialect) functions. In fact, if you examine the
org.apache.calcite.sql.fun.SqlLibraryOperators

for BigQuery, there are only 18 functions available compared to 100 in the latest versions of Calcite (see reference: Calcite SqlLibraryOperators Documentation).

It seems necessary to create a separate issue to address updating Calcite, as version 1.28.0 was released 4 years ago.

As a suggestion for this Pull Request, adding the ability to configure the lexical policy (
lex
property) could be a useful enhancement.

My test updated with bigquery function supported by calcite 1.28

@Test
  public void testInternalConnect_unbounded_limit_bq() throws Exception {
    ReadOnlyTableProvider tableProvider =
        new ReadOnlyTableProvider(
            "test",
            ImmutableMap.of(
                "test",
                TestUnboundedTable.of(
                        Schema.FieldType.INT32, "order_id",
                        Schema.FieldType.INT32, "site_id",
                        Schema.FieldType.INT32, "price",
                        Schema.FieldType.STRING, "label",
                        Schema.FieldType.STRING, "date",
                        Schema.FieldType.DATETIME, "order_time")
                    .timestampColumnIndex(5)
                    .addRows(
                        Duration.ZERO,
                        1,
                        1,
                        1,
                        "test-1",
                        "2025-06-30",
                        FIRST_DATE,
                        1,
                        2,
                        6,
                        "test-2",
                        "2025-06-30",
                        FIRST_DATE)));
    BeamSqlPipelineOptions options =
        PipelineOptionsFactory.create().as(BeamSqlPipelineOptions.class);
    options.setCalciteConnectionDialect("bigquery");

    CalciteConnection connection = JdbcDriver.connect(tableProvider, options);
    assertEquals("bigquery", connection.getProperties().getProperty("fun", "CALCITE"));
    Statement statement = connection.createStatement();

    ResultSet resultSet1 =
        statement.executeQuery(
            "SELECT SUBSTRING(`label`,2), SUBSTR(`label`, 2,4) FROM test LIMIT 1");
    assertTrue(resultSet1.next());
    System.out.println("DEBUG " + resultSet1.getString(1));
    System.out.println("DEBUG " + resultSet1.getString(2));
    //    System.out.println(resultSet1.getInt(2));
    assertFalse(resultSet1.next());

    ResultSet resultSet2 = statement.executeQuery("SELECT * FROM test LIMIT 2");
    assertTrue(resultSet2.next());
    assertTrue(resultSet2.next());
    assertFalse(resultSet2.next());
  }

@liferoad liferoad requested a review from flo076 June 30, 2025 20:21
@liferoad liferoad merged commit 2e45270 into apache:master Jul 2, 2025
19 checks passed
Amar3tto added a commit that referenced this pull request Jul 14, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Use bigquery calcite dialect

3 participants