From 9bb6a0f96acda77683d65f71ca9c49b93c36877a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 2 Mar 2026 15:23:32 -0500 Subject: [PATCH 01/16] add iceberg quickstart --- examples/java/build.gradle | 1 + .../io/iceberg/IcebergBeamSchemaAndRow.java | 86 +++++ .../transforms/io/iceberg/Quickstart.java | 78 +++++ .../apache_beam/examples/snippets/snippets.py | 69 ++++ website/www/site/.hugo_build.lock | 0 .../www/site/assets/js/language-switch-v2.js | 2 + .../assets/scss/_syntax-highlighting.scss | 3 + .../en/documentation/io/built-in/iceberg.md | 321 ++++++++++++++++++ .../section-menu/en/documentation.html | 17 +- .../layouts/shortcodes/language-switcher.html | 3 + .../www/site/layouts/shortcodes/section.html | 15 + website/www/site/layouts/shortcodes/tab.html | 27 ++ 12 files changed, 614 insertions(+), 8 deletions(-) create mode 100644 examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java create mode 100644 examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java create mode 100644 website/www/site/.hugo_build.lock create mode 100644 website/www/site/content/en/documentation/io/built-in/iceberg.md create mode 100644 website/www/site/layouts/shortcodes/section.html create mode 100644 website/www/site/layouts/shortcodes/tab.html diff --git a/examples/java/build.gradle b/examples/java/build.gradle index 068c0d1b56fd..5aae8146e462 100644 --- a/examples/java/build.gradle +++ b/examples/java/build.gradle @@ -56,6 +56,7 @@ dependencies { implementation project(":sdks:java:io:google-cloud-platform") implementation project(":sdks:java:io:kafka") implementation project(":sdks:java:extensions:ml") + implementation project(":sdks:java:managed") implementation library.java.avro implementation library.java.bigdataoss_util implementation library.java.google_api_client diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java new file mode 100644 index 000000000000..149045edb1f0 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.snippets.transforms.io.iceberg; + +// [START iceberg_schema_and_row] + +import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; + +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Arrays; + +public class IcebergBeamSchemaAndRow { + Schema NESTED_SCHEMA = + Schema.builder() + .addStringField("nested_field") + .addInt32Field("nested_field_2") + .build(); + Schema BEAM_SCHEMA = + Schema.builder() + .addBooleanField("boolean_field") + .addInt32Field("int_field") + .addInt64Field("long_field") + .addFloatField("float_field") + .addDoubleField("double_field") + .addDecimalField("numeric_field") + .addByteArrayField("bytes_field") + .addStringField("string_field") + .addLogicalTypeField("time_field", SqlTypes.TIME) + .addLogicalTypeField("date_field", SqlTypes.DATE) + .addLogicalTypeField("timestamp_field", Timestamp.MICROS) + .addDateTimeField("timestamptz_field") + .addArrayField("array_field", Schema.FieldType.INT32) + .addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32) + .addRowField("struct_field", NESTED_SCHEMA) + .build(); + + Row beamRow = + Row.withSchema(BEAM_SCHEMA) + .withFieldValues( + ImmutableMap.builder() + .put("boolean_field", true) + .put("int_field", 1) + .put("long_field", 2L) + .put("float_field", 3.4f) + .put("double_field", 4.5d) + .put("numeric_field", new BigDecimal(67)) + .put("bytes_field", new byte[] {1, 2, 3}) + .put("string_field", "value") + .put("time_field", LocalTime.now()) + .put("date_field", LocalDate.now()) + .put("timestamp_field", Instant.now()) + .put("timestamptz_field", DateTime.now()) + .put("array_field", Arrays.asList(1, 2, 3)) + .put("map_field", ImmutableMap.of("a", 1, "b", 2)) + .put( + "struct_field", + Row.withSchema(NESTED_SCHEMA) + .addValues("nested_value", 123) + .build()) + .build()) + .build(); +} +// [END iceberg_schema_and_row] diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java new file mode 100644 index 000000000000..5bf658190d43 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -0,0 +1,78 @@ +package org.apache.beam.examples.snippets.transforms.io.iceberg; + +import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.managed.Managed; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +import java.util.Map; + +public class Quickstart { + public static void main(String[] args) { + // [START hadoop_catalog_props] + Map catalogProps = ImmutableMap.of( + "type", "hadoop", + "warehouse", "file://tmp/beam-iceberg-local-quickstart" + ); + // [END hadoop_catalog_props] + } + public static void other() { + // [START biglake_catalog_props] + Map catalogProps = ImmutableMap.of( + "type", "rest", + "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", + "warehouse", "gs://biglake-public-nyc-taxi-iceberg", + "header.x-goog-user-project", "$PROJECT_ID", + "rest.auth.type", "google", + "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", + "header.X-Iceberg-Access-Delegation", "vended-credentials" + ); + // [END biglake_catalog_props] + + // [START managed_iceberg_config] + Map managedConfig = ImmutableMap.of( + "table", "my_db.my_table", + "catalog_properties", catalogProps + ); + // Note: The table will get created when inserting data (see below) + // [END managed_iceberg_config] + + // [START managed_iceberg_insert] + Schema inputSchema = Schema.builder() + .addInt64Field("id") + .addStringField("name") + .addInt32Field("age") + .build(); + + Pipeline p = Pipeline.create(); + p + .apply(Create.of( + Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(), + Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(), + Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build())) + .apply(Managed.write("iceberg").withConfig(managedConfig)); + + p.run(); + // [END managed_iceberg_insert] + + // [START managed_iceberg_read] + Pipeline q = Pipeline.create(); + PCollection rows = q + .apply(Managed.read("iceberg").withConfig(managedConfig)) + .getSinglePCollection(); + + rows + .apply(MapElements.into(TypeDescriptors.voids()).via(row -> { + System.out.println(row); + return null; + })); + + q.run(); + // [END managed_iceberg_read] + } +} diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index f6bf5e5d44ec..21c675bc2592 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1076,6 +1076,75 @@ def model_bigqueryio_xlang( method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API) # [END model_bigqueryio_write_with_storage_write_api] +def model_managed_iceberg(): + """Examples for Managed Iceberg sources and sinks.""" + # [START hadoop_catalog_config] + hadoop_catalog_props = { + 'type': 'hadoop', + 'warehouse': 'file://tmp/beam-iceberg-local-quickstart' + } + # [END hadoop_catalog_config] + + # [START managed_iceberg_config] + managed_config = { + 'table': 'my_db.my_table', + 'catalog_properties': hadoop_catalog_props + } + # Note: The table will get created when inserting data (see below) + # [END managed_iceberg_config] + + # [START managed_iceberg_insert] + with beam.Pipeline() as p: + (p + | beam.Create([beam.Row(1, "Mark", 32), + beam.Row(2, "Omar", 24), + beam.Row(3, "Rachel", 27)]) + | beam.managed.Write("iceberg", config=managed_config)) + # [END managed_iceberg_insert] + + # [START managed_iceberg_read] + with beam.Pipeline() as p: + (p + | beam.managed.Read("iceberg", config=managed_config) + | beam.LogElements()) + # [END managed_iceberg_read] + + + + + # [START biglake_catalog_config] + biglake_catalog_config = { + 'type': 'rest', + 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse': 'gs://biglake-public-nyc-taxi-iceberg', + 'header.x-goog-user-project': '$PROJECT_ID', + 'rest.auth.type': 'google', + 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation': 'vended-credentials' + } + # [END biglake_catalog_config] + + # [START model_managed_iceberg_data_types] + from decimal import Decimal + import apache_beam as beam + from apache_beam.utils.timestamp import Timestamp + + row = beam.Row( + boolean_field=True, + int_field=1, + float_field=2.3, + numeric_field=Decimal('34'), + bytes_field=b'value', + string_field="value", + timestamptz_field=Timestamp(4, 5), + array_field=[1, 2, 3], + map_field={"a": 1, "b": 2}, + struct_field=beam.Row( + nested_field="nested_value", + nested_field2=123) + ) + # [END model_managed_iceberg_data_types] + def model_composite_transform_example(contents, output_path): """Example of a composite transform. diff --git a/website/www/site/.hugo_build.lock b/website/www/site/.hugo_build.lock new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/website/www/site/assets/js/language-switch-v2.js b/website/www/site/assets/js/language-switch-v2.js index f4dd73a5b04f..ff6fdb0253a8 100644 --- a/website/www/site/assets/js/language-switch-v2.js +++ b/website/www/site/assets/js/language-switch-v2.js @@ -61,6 +61,7 @@ $(document).ready(function() { "valueToTabTitle": function (value) { switch (value) { case 'py': return 'Python'; + case 'sql': return 'SQL'; case 'scio': return 'SCIO'; case 'typescript': return 'TypeScript'; } @@ -287,6 +288,7 @@ $(document).ready(function() { }).render(); Switcher({"name": "runner", "default": "direct"}).render(); + Switcher({"name": "tab"}).render(); Switcher({"name": "shell", "default": "unix"}).render(); Switcher({"name": "version"}).render(); }); diff --git a/website/www/site/assets/scss/_syntax-highlighting.scss b/website/www/site/assets/scss/_syntax-highlighting.scss index 8a347d3f74f8..fd9f3216349e 100644 --- a/website/www/site/assets/scss/_syntax-highlighting.scss +++ b/website/www/site/assets/scss/_syntax-highlighting.scss @@ -322,6 +322,9 @@ pre { } } } +.tab-switcher { + @extend .runner-switcher; +} .shell-switcher { ul.nav-tabs { padding-left: 0; diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md new file mode 100644 index 000000000000..3f0bd5efd5f1 --- /dev/null +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -0,0 +1,321 @@ +--- +title: "Apache Iceberg" +--- + + +[Built-in I/O Transforms](/documentation/io/built-in/) + + +# Apache Iceberg I/O connector + +The Beam SDKs include built-in transforms that can read data from and write data +to [Apache Iceberg](https://iceberg.apache.org/) tables. + +{{< language-switcher sql java py yaml>}} + +{{< paragraph class="language-java" >}} +To use IcebergIO, add the Maven artifact dependency to your `pom.xml` file. +{{< /paragraph >}} + +{{< highlight java >}} + + org.apache.beam + beam-sdks-java-io-iceberg + {{< param release_latest >}} + +{{< /highlight >}} + +{{< paragraph class="language-sql" >}} +To use IcebergIO, install the [Beam SQL Shell](https://beam.apache.org/documentation/dsls/sql/shell/#installation) and run the following command: +{{< /paragraph >}} + +{{% section class="language-sql" %}} +```shell +./beam-sql.sh --io +``` +{{% /section %}} + +{{< paragraph >}} +Additional resources: +{{< /paragraph >}} + +{{< paragraph wrap="span" >}} +* [IcebergIO configuration parameters](https://beam.apache.org/documentation/io/managed-io/#iceberg-write) +* [IcebergIO source code](https://github.com/apache/beam/tree/master/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg) +* [IcebergIO Javadoc](https://beam.apache.org/releases/javadoc/{{< param release_latest >}}/org/apache/beam/sdk/io/iceberg/IcebergIO.html) +* [Apache Iceberg spec](https://iceberg.apache.org/spec/) +* [Apache Iceberg terms](https://iceberg.apache.org/terms/) +{{< /paragraph >}} + +## Iceberg basics + +### Catalogs + +A catalog is a top-level entity used to manage and access Iceberg tables. There are numerous catalog implementations out there. +In this guide, we'll be using the REST-based BigLake catalog implemented by Google. + +### Namespaces + +A namespace lives inside a catalog and may contain a number of Iceberg tables. This is the equivalent of a "database". + +### Tables + +The actual entity containing data, and is described by a schema and partition spec. + +### Snapshots + +A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change +and references its parent snapshot. An Iceberg table's history is essentially a list of snapshots. + +## Quickstart Guide + +### Choose Your Catalog + +First, select a Catalog implementation to handle metadata management and storage interaction. +Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two common paths: +**Hadoop** for easy local development and **BigLake** for managing production data at cloud scale. + +{{< tab hadoop >}} +

+ Use Hadoop Catalog for quick, local testing with zero setup and no external dependencies. + The following examples use a temporary local directory. +

+ +
+ {{< highlight sql >}} + CREATE CATALOG quickstart_local TYPE 'iceberg' + PROPERTIES ( + 'type' = 'hadoop', + 'warehouse' = 'file://tmp/beam-iceberg-local-quickstart', + ); + {{< /highlight >}} + {{< highlight java>}} + {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" hadoop_catalog_props >}} + {{< /highlight >}} + {{< highlight py >}} + {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" hadoop_catalog_config >}} + {{< /highlight >}} +{{< /tab >}} +{{< tab BigLake >}} +{{% section %}} +Use BigLake Catalog for a fully managed REST-based experience. It simplifies access to cloud storage with +built-in credential delegation and unified metadata management. It requires a few pre-requisites: + +- A Google Cloud Project (for authentication). Create an account [here](https://docs.cloud.google.com/docs/get-started) if you don't have one. +- Standard Google [Application Default Credentials](https://docs.cloud.google.com/docs/authentication/set-up-adc-local-dev-environment#local-user-cred) (ADC) set up in your environment. +- A [Google Cloud Storage bucket](https://docs.cloud.google.com/storage/docs/creating-buckets) + +{{% /section %}} + {{< highlight sql>}} + CREATE CATALOG quickstart_catalog TYPE 'iceberg' + PROPERTIES ( + 'type' = 'rest', + 'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse' = 'gs://$BUCKET_NAME', + 'header.x-goog-user-project' = '$PROJECT_ID', + 'rest.auth.type' = 'google', + 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation' = 'vended-credentials' + ); + {{< /highlight >}} + {{< highlight java>}} + {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" biglake_catalog_props >}} + {{< /highlight >}} + {{< highlight py >}} + {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" biglake_catalog_config >}} + {{< /highlight >}} + {{< highlight yaml >}} + catalog_props: &catalog_props + type: "rest" + uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog" + warehouse: "gs://$BUCKET_NAME" + header.x-goog-user-project: "$PROJECT_ID" + rest.auth.type: "google" + io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO" + header.X-Iceberg-Access-Delegation = "vended-credentials" + {{< /highlight >}} +{{< /tab >}} + +### Create a Namespace + +You can use Beam SQL to create a new namespace through an explicit DDL statement: +```sql +CREATE DATABASE quickstart_local.my_db; +``` + +Or you can leave it up to the IcebergIO sink, which will automatically create missing namespaces at runtime. This is +particularly useful when writing to dynamic destinations, where namespace names are determined by input data. + +### Create a Table + +{{< highlight sql>}} +CREATE EXTERNAL TABLE quickstart_local.my_db.my_table ( + id BIGINT, + name VARCHAR, + age INTEGER +) +TYPE 'iceberg' +{{< /highlight >}} +{{< highlight java>}} +{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" managed_iceberg_config >}} +{{< /highlight >}} +{{< highlight py >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" managed_iceberg_config >}} +{{< /highlight >}} +{{< highlight yaml >}} +- type: WriteToIceberg + input: Create + config: + table: "my_db.my_table" + catalog_properties: *catalog_props +{{< /highlight >}} + +### Insert Data + +{{< highlight sql>}} +INSERT INTO quickstart_local.my_db.my_table VALUES + (1, 'Mark', 32), + (2, 'Omar', 24), + (3, 'Rachel', 27) +); +{{< /highlight >}} +{{< highlight java>}} +{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" managed_iceberg_insert >}} +{{< /highlight >}} +{{< highlight py >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" managed_iceberg_insert >}} +{{< /highlight >}} +{{< highlight yaml >}} +pipeline: + transforms: + - type: Create + config: + elements: + - id: 1 + name: "Mark" + age: 32 + - id: 2 + name: "Omar" + age: 24 + - id: 3 + name: "Rachel" + age: 27 + - type: WriteToIceberg + input: Create + config: + table: "my_db.my_table" + catalog_properties: *catalog_props +{{< /highlight >}} + +### View Namespaces and Tables + +You can use Beam SQL to view the newly created resources: +```sql +SHOW DATABASES quickstart_catalog; +``` +```sql +SHOW TABLES quickstart_catalog.my_db; +``` + +### Query Data + +{{< highlight sql>}} +SELECT * FROM quickstart_local.my_db.my_table; +{{< /highlight >}} +{{< highlight java>}} +{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" managed_iceberg_read >}} +{{< /highlight >}} +{{< highlight py >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" managed_iceberg_read >}} +{{< /highlight >}} +{{< highlight yaml >}} +pipeline: + transforms: + - type: ReadFromIceberg + config: + table: "my_db.my_table" + catalog_properties: *catalog_props + - type: LogForTesting + input: ReadFromIceberg +{{< /highlight >}} + + + +## Data Types + +Check this [overview of Iceberg data types](https://iceberg.apache.org/spec/#schemas-and-data-types). + +IcebergIO leverages Beam Schemas to bridge the gap between SDK-native types and the Iceberg specification. +While the Java SDK provides full coverage for the Iceberg v2 spec (with v3 support currently in development), +other SDKs may have specific constraints on complex or experimental types. The following examples demonstrate +the standard mapping for core data types across SQL, Java, Python, and YAML: + +{{< highlight sql >}} +INSERT INTO catalog.namespace.table VALUES ( +9223372036854775807, -- BIGINT +2147483647, -- INTEGER +1.0, -- FLOAT +1.0, -- DOUBLE +TRUE, -- BOOLEAN +TIMESTAMP '2018-05-28 20:17:40.123', -- TIMESTAMP +'varchar', -- VARCHAR +'char', -- CHAR +ARRAY['abc', 'xyz'], -- ARRAY +ARRAY[CAST(ROW('abc', 123) AS ROW(nested_str VARCHAR, nested_int INTEGER))] -- ARRAY[STRUCT] +) +{{< /highlight >}} +{{< highlight java >}} +{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java" iceberg_schema_and_row >}} +{{< /highlight >}} +{{< highlight py >}} +{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_managed_iceberg_data_types >}} +{{< /highlight >}} +{{< highlight yaml >}} +pipeline: + transforms: + - type: Create + config: + elements: + - boolean_field: false + integer_field: 123 + number_field: 4.56 + string_field: "abc" + struct_field: + nested_1: a + nested_2: 1 + array_field: [1, 2, 3] + output_schema: + type: object + properties: + boolean_field: + type: boolean + integer_field: + type: integer + number_field: + type: number + string_field: + type: string + struct_field: + type: object + properties: + nested_1: + type: string + nested_2: + type: integer + array_field: + type: array + items: + type: integer +{{< /highlight >}} diff --git a/website/www/site/layouts/partials/section-menu/en/documentation.html b/website/www/site/layouts/partials/section-menu/en/documentation.html index 0cc197d95fdc..97b694538ee8 100755 --- a/website/www/site/layouts/partials/section-menu/en/documentation.html +++ b/website/www/site/layouts/partials/section-menu/en/documentation.html @@ -69,15 +69,16 @@
  • I/O connector guides
  • diff --git a/website/www/site/layouts/shortcodes/language-switcher.html b/website/www/site/layouts/shortcodes/language-switcher.html index cc9b5864a3b8..2f50691e3cca 100644 --- a/website/www/site/layouts/shortcodes/language-switcher.html +++ b/website/www/site/layouts/shortcodes/language-switcher.html @@ -20,6 +20,9 @@ {{ if eq $lang "py" }}
  • Python SDK
  • {{ end }} + {{ if eq $lang "sql" }} +
  • SQL Shell
  • + {{ end }} {{ if eq $lang "go" }}
  • Go SDK
  • {{ end }} diff --git a/website/www/site/layouts/shortcodes/section.html b/website/www/site/layouts/shortcodes/section.html new file mode 100644 index 000000000000..5b7f14fe0f05 --- /dev/null +++ b/website/www/site/layouts/shortcodes/section.html @@ -0,0 +1,15 @@ +{{/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. See accompanying LICENSE file. +*/}} + +
    +{{ .Inner | markdownify }} +
    diff --git a/website/www/site/layouts/shortcodes/tab.html b/website/www/site/layouts/shortcodes/tab.html new file mode 100644 index 000000000000..a5d6ecd607a4 --- /dev/null +++ b/website/www/site/layouts/shortcodes/tab.html @@ -0,0 +1,27 @@ +{{/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. See accompanying LICENSE file. +*/}} + +{{ $content := (trim .Inner "\n\r") | htmlUnescape | safeHTML }} +{{ $ctx := . }} +{{ $language := .Get 0 }} +{{ with $language }} +
    + {{ $content }} +
    +{{ end }} From 9de3ce41a18fa669f49fd18e8e758ef661b712e8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 2 Mar 2026 15:57:53 -0500 Subject: [PATCH 02/16] add license --- .../transforms/io/iceberg/Quickstart.java | 17 +++++++++++++++++ website/www/site/.hugo_build.lock | 0 2 files changed, 17 insertions(+) delete mode 100644 website/www/site/.hugo_build.lock diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java index 5bf658190d43..f4e4be3e4795 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.examples.snippets.transforms.io.iceberg; import com.google.common.collect.ImmutableMap; diff --git a/website/www/site/.hugo_build.lock b/website/www/site/.hugo_build.lock deleted file mode 100644 index e69de29bb2d1..000000000000 From 942415db469313ed7cec3c640373f45eb562454d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 2 Mar 2026 16:09:30 -0500 Subject: [PATCH 03/16] spotless --- .../io/iceberg/IcebergBeamSchemaAndRow.java | 20 ++---- .../transforms/io/iceberg/Quickstart.java | 72 +++++++++---------- 2 files changed, 41 insertions(+), 51 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java index 149045edb1f0..bd16a675816b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java @@ -20,24 +20,20 @@ // [START iceberg_schema_and_row] import com.google.common.collect.ImmutableMap; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; -import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; -import org.apache.beam.sdk.values.Row; -import org.joda.time.DateTime; - import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; import java.util.Arrays; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; +import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; +import org.apache.beam.sdk.values.Row; +import org.joda.time.DateTime; public class IcebergBeamSchemaAndRow { Schema NESTED_SCHEMA = - Schema.builder() - .addStringField("nested_field") - .addInt32Field("nested_field_2") - .build(); + Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build(); Schema BEAM_SCHEMA = Schema.builder() .addBooleanField("boolean_field") @@ -77,9 +73,7 @@ public class IcebergBeamSchemaAndRow { .put("map_field", ImmutableMap.of("a", 1, "b", 2)) .put( "struct_field", - Row.withSchema(NESTED_SCHEMA) - .addValues("nested_value", 123) - .build()) + Row.withSchema(NESTED_SCHEMA).addValues("nested_value", 123).build()) .build()) .build(); } diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java index f4e4be3e4795..0c00b02f4002 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -18,6 +18,7 @@ package org.apache.beam.examples.snippets.transforms.io.iceberg; import com.google.common.collect.ImmutableMap; +import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; @@ -27,67 +28,62 @@ import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; -import java.util.Map; - public class Quickstart { public static void main(String[] args) { // [START hadoop_catalog_props] - Map catalogProps = ImmutableMap.of( - "type", "hadoop", - "warehouse", "file://tmp/beam-iceberg-local-quickstart" - ); + Map catalogProps = + ImmutableMap.of( + "type", "hadoop", + "warehouse", "file://tmp/beam-iceberg-local-quickstart"); // [END hadoop_catalog_props] } + public static void other() { // [START biglake_catalog_props] - Map catalogProps = ImmutableMap.of( - "type", "rest", - "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", - "warehouse", "gs://biglake-public-nyc-taxi-iceberg", - "header.x-goog-user-project", "$PROJECT_ID", - "rest.auth.type", "google", - "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", - "header.X-Iceberg-Access-Delegation", "vended-credentials" - ); + Map catalogProps = + ImmutableMap.of( + "type", "rest", + "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", + "warehouse", "gs://biglake-public-nyc-taxi-iceberg", + "header.x-goog-user-project", "$PROJECT_ID", + "rest.auth.type", "google", + "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", + "header.X-Iceberg-Access-Delegation", "vended-credentials"); // [END biglake_catalog_props] // [START managed_iceberg_config] - Map managedConfig = ImmutableMap.of( - "table", "my_db.my_table", - "catalog_properties", catalogProps - ); + Map managedConfig = + ImmutableMap.of("table", "my_db.my_table", "catalog_properties", catalogProps); // Note: The table will get created when inserting data (see below) // [END managed_iceberg_config] // [START managed_iceberg_insert] - Schema inputSchema = Schema.builder() - .addInt64Field("id") - .addStringField("name") - .addInt32Field("age") - .build(); + Schema inputSchema = + Schema.builder().addInt64Field("id").addStringField("name").addInt32Field("age").build(); Pipeline p = Pipeline.create(); - p - .apply(Create.of( - Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(), - Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(), - Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build())) - .apply(Managed.write("iceberg").withConfig(managedConfig)); + p.apply( + Create.of( + Row.withSchema(inputSchema).addValues(1, "Mark", 34).build(), + Row.withSchema(inputSchema).addValues(2, "Omar", 24).build(), + Row.withSchema(inputSchema).addValues(3, "Rachel", 27).build())) + .apply(Managed.write("iceberg").withConfig(managedConfig)); p.run(); // [END managed_iceberg_insert] // [START managed_iceberg_read] Pipeline q = Pipeline.create(); - PCollection rows = q - .apply(Managed.read("iceberg").withConfig(managedConfig)) - .getSinglePCollection(); + PCollection rows = + q.apply(Managed.read("iceberg").withConfig(managedConfig)).getSinglePCollection(); - rows - .apply(MapElements.into(TypeDescriptors.voids()).via(row -> { - System.out.println(row); - return null; - })); + rows.apply( + MapElements.into(TypeDescriptors.voids()) + .via( + row -> { + System.out.println(row); + return null; + })); q.run(); // [END managed_iceberg_read] From 250940125451a64c0f17b52dbb059cdc86050abe Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 2 Mar 2026 16:31:12 -0500 Subject: [PATCH 04/16] adjustments --- .../en/documentation/io/built-in/iceberg.md | 52 +++++++++++-------- 1 file changed, 31 insertions(+), 21 deletions(-) diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md index 3f0bd5efd5f1..6b194e94d146 100644 --- a/website/www/site/content/en/documentation/io/built-in/iceberg.md +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -63,8 +63,8 @@ Additional resources: ### Catalogs -A catalog is a top-level entity used to manage and access Iceberg tables. There are numerous catalog implementations out there. -In this guide, we'll be using the REST-based BigLake catalog implemented by Google. +A catalog is a top-level entity used to manage and access Iceberg tables. There are many catalog implementations out there; +this guide focuses on the Hadoop catalog for easy local testing and BigLake REST catalog for cloud-scale development. ### Namespaces @@ -77,7 +77,8 @@ The actual entity containing data, and is described by a schema and partition sp ### Snapshots A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change -and references its parent snapshot. An Iceberg table's history is essentially a list of snapshots. +and references its parent snapshot. An Iceberg table's history is a chronological list of snapshots, enabling features +like time travel and ACID-compliant concurrent writes. ## Quickstart Guide @@ -95,7 +96,7 @@ Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two
    {{< highlight sql >}} - CREATE CATALOG quickstart_local TYPE 'iceberg' + CREATE CATALOG my_catalog TYPE 'iceberg' PROPERTIES ( 'type' = 'hadoop', 'warehouse' = 'file://tmp/beam-iceberg-local-quickstart', @@ -119,7 +120,7 @@ built-in credential delegation and unified metadata management. It requires a fe {{% /section %}} {{< highlight sql>}} - CREATE CATALOG quickstart_catalog TYPE 'iceberg' + CREATE CATALOG my_catalog TYPE 'iceberg' PROPERTIES ( 'type' = 'rest', 'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', @@ -144,7 +145,7 @@ built-in credential delegation and unified metadata management. It requires a fe header.x-goog-user-project: "$PROJECT_ID" rest.auth.type: "google" io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO" - header.X-Iceberg-Access-Delegation = "vended-credentials" + header.X-Iceberg-Access-Delegation: "vended-credentials" {{< /highlight >}} {{< /tab >}} @@ -152,16 +153,18 @@ built-in credential delegation and unified metadata management. It requires a fe You can use Beam SQL to create a new namespace through an explicit DDL statement: ```sql -CREATE DATABASE quickstart_local.my_db; +CREATE DATABASE my_catalog.my_db; ``` -Or you can leave it up to the IcebergIO sink, which will automatically create missing namespaces at runtime. This is -particularly useful when writing to dynamic destinations, where namespace names are determined by input data. +Alternatively, the IcebergIO sink can handle namespace creation automatically at runtime. +This is ideal for dynamic pipelines where destinations are determined by the incoming data ### Create a Table +Tables are defined by a schema and an optional partition spec. +You can create a table using SQL DDL or by configuring the Iceberg destination in your Beam pipeline. {{< highlight sql>}} -CREATE EXTERNAL TABLE quickstart_local.my_db.my_table ( +CREATE EXTERNAL TABLE my_catalog.my_db.my_table ( id BIGINT, name VARCHAR, age INTEGER @@ -176,20 +179,22 @@ TYPE 'iceberg' {{< /highlight >}} {{< highlight yaml >}} - type: WriteToIceberg - input: Create config: - table: "my_db.my_table" - catalog_properties: *catalog_props + table: "my_db.my_table" + catalog_properties: *catalog_props + +# Note: The table will get created when inserting data (see below) {{< /highlight >}} ### Insert Data +Once your table is defined, you can write data using standard SQL `INSERT` or by calling the IcebergIO sink in your SDK of choice. + {{< highlight sql>}} -INSERT INTO quickstart_local.my_db.my_table VALUES +INSERT INTO my_catalog.my_db.my_table VALUES (1, 'Mark', 32), (2, 'Omar', 24), - (3, 'Rachel', 27) -); + (3, 'Rachel', 27); {{< /highlight >}} {{< highlight java>}} {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" managed_iceberg_insert >}} @@ -199,6 +204,7 @@ INSERT INTO quickstart_local.my_db.my_table VALUES {{< /highlight >}} {{< highlight yaml >}} pipeline: + type: chain transforms: - type: Create config: @@ -213,7 +219,6 @@ pipeline: name: "Rachel" age: 27 - type: WriteToIceberg - input: Create config: table: "my_db.my_table" catalog_properties: *catalog_props @@ -223,16 +228,16 @@ pipeline: You can use Beam SQL to view the newly created resources: ```sql -SHOW DATABASES quickstart_catalog; +SHOW DATABASES my_catalog; ``` ```sql -SHOW TABLES quickstart_catalog.my_db; +SHOW TABLES my_catalog.my_db; ``` ### Query Data {{< highlight sql>}} -SELECT * FROM quickstart_local.my_db.my_table; +SELECT * FROM my_catalog.my_db.my_table; {{< /highlight >}} {{< highlight java>}} {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" managed_iceberg_read >}} @@ -242,13 +247,13 @@ SELECT * FROM quickstart_local.my_db.my_table; {{< /highlight >}} {{< highlight yaml >}} pipeline: + type: chain transforms: - type: ReadFromIceberg config: table: "my_db.my_table" catalog_properties: *catalog_props - type: LogForTesting - input: ReadFromIceberg {{< /highlight >}} @@ -319,3 +324,8 @@ pipeline: items: type: integer {{< /highlight >}} + +## Further steps + +Check out the full [IcebergIO configuration](https://beam.apache.org/documentation/io/managed-io/#iceberg-write) to make +use of other features like applying a partition spec, table properties, row filtering, column pruning, etc. From 712d12483c84644853bd4c81eb996b3641a4be68 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 3 Mar 2026 11:13:41 -0500 Subject: [PATCH 05/16] lint --- .../io/iceberg/IcebergBeamSchemaAndRow.java | 12 ++-- .../transforms/io/iceberg/Quickstart.java | 2 +- .../apache_beam/examples/snippets/snippets.py | 69 +++++++++---------- 3 files changed, 41 insertions(+), 42 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java index bd16a675816b..bdbd59ebd8d7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java @@ -19,7 +19,6 @@ // [START iceberg_schema_and_row] -import com.google.common.collect.ImmutableMap; import java.math.BigDecimal; import java.time.Instant; import java.time.LocalDate; @@ -29,12 +28,13 @@ import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.DateTime; public class IcebergBeamSchemaAndRow { - Schema NESTED_SCHEMA = + Schema nestedSchema = Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build(); - Schema BEAM_SCHEMA = + Schema beamSchema = Schema.builder() .addBooleanField("boolean_field") .addInt32Field("int_field") @@ -50,11 +50,11 @@ public class IcebergBeamSchemaAndRow { .addDateTimeField("timestamptz_field") .addArrayField("array_field", Schema.FieldType.INT32) .addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32) - .addRowField("struct_field", NESTED_SCHEMA) + .addRowField("struct_field", nestedSchema) .build(); Row beamRow = - Row.withSchema(BEAM_SCHEMA) + Row.withSchema(beamSchema) .withFieldValues( ImmutableMap.builder() .put("boolean_field", true) @@ -73,7 +73,7 @@ public class IcebergBeamSchemaAndRow { .put("map_field", ImmutableMap.of("a", 1, "b", 2)) .put( "struct_field", - Row.withSchema(NESTED_SCHEMA).addValues("nested_value", 123).build()) + Row.withSchema(nestedSchema).addValues("nested_value", 123).build()) .build()) .build(); } diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java index 0c00b02f4002..fa519f748f6c 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -17,7 +17,6 @@ */ package org.apache.beam.examples.snippets.transforms.io.iceberg; -import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; @@ -27,6 +26,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class Quickstart { public static void main(String[] args) { diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 21c675bc2592..8edcaace70d9 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1076,51 +1076,51 @@ def model_bigqueryio_xlang( method=beam.io.WriteToBigQuery.Method.STORAGE_WRITE_API) # [END model_bigqueryio_write_with_storage_write_api] + def model_managed_iceberg(): """Examples for Managed Iceberg sources and sinks.""" # [START hadoop_catalog_config] hadoop_catalog_props = { - 'type': 'hadoop', - 'warehouse': 'file://tmp/beam-iceberg-local-quickstart' + 'type': 'hadoop', 'warehouse': 'file://tmp/beam-iceberg-local-quickstart' } # [END hadoop_catalog_config] # [START managed_iceberg_config] managed_config = { - 'table': 'my_db.my_table', - 'catalog_properties': hadoop_catalog_props + 'table': 'my_db.my_table', 'catalog_properties': hadoop_catalog_props } # Note: The table will get created when inserting data (see below) # [END managed_iceberg_config] # [START managed_iceberg_insert] with beam.Pipeline() as p: - (p - | beam.Create([beam.Row(1, "Mark", 32), - beam.Row(2, "Omar", 24), - beam.Row(3, "Rachel", 27)]) - | beam.managed.Write("iceberg", config=managed_config)) + ( + p + | beam.Create([ + beam.Row(id=1, name="Mark", age=32), + beam.Row(id=2, name="Omar", age=24), + beam.Row(id=3, name="Rachel", age=27) + ]) + | beam.managed.Write("iceberg", config=managed_config)) # [END managed_iceberg_insert] # [START managed_iceberg_read] with beam.Pipeline() as p: - (p - | beam.managed.Read("iceberg", config=managed_config) - | beam.LogElements()) + ( + p + | beam.managed.Read("iceberg", config=managed_config) + | beam.LogElements()) # [END managed_iceberg_read] - - - # [START biglake_catalog_config] biglake_catalog_config = { - 'type': 'rest', - 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog', - 'warehouse': 'gs://biglake-public-nyc-taxi-iceberg', - 'header.x-goog-user-project': '$PROJECT_ID', - 'rest.auth.type': 'google', - 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO', - 'header.X-Iceberg-Access-Delegation': 'vended-credentials' + 'type': 'rest', + 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse': 'gs://biglake-public-nyc-taxi-iceberg', + 'header.x-goog-user-project': '$PROJECT_ID', + 'rest.auth.type': 'google', + 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation': 'vended-credentials' } # [END biglake_catalog_config] @@ -1130,19 +1130,18 @@ def model_managed_iceberg(): from apache_beam.utils.timestamp import Timestamp row = beam.Row( - boolean_field=True, - int_field=1, - float_field=2.3, - numeric_field=Decimal('34'), - bytes_field=b'value', - string_field="value", - timestamptz_field=Timestamp(4, 5), - array_field=[1, 2, 3], - map_field={"a": 1, "b": 2}, - struct_field=beam.Row( - nested_field="nested_value", - nested_field2=123) - ) + boolean_field=True, + int_field=1, + float_field=2.3, + numeric_field=Decimal('34'), + bytes_field=b'value', + string_field="value", + timestamptz_field=Timestamp(4, 5), + array_field=[1, 2, 3], + map_field={ + "a": 1, "b": 2 + }, + struct_field=beam.Row(nested_field="nested_value", nested_field2=123)) # [END model_managed_iceberg_data_types] From d512ff1d42689e81ae4085d704aca51b2de1b3a9 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Mar 2026 09:39:19 -0500 Subject: [PATCH 06/16] add missing io --- .../www/site/content/en/documentation/io/built-in/iceberg.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md index 6b194e94d146..0abd7f13c25e 100644 --- a/website/www/site/content/en/documentation/io/built-in/iceberg.md +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -43,7 +43,7 @@ To use IcebergIO, install the [Beam SQL Shell](https://beam.apache.org/documenta {{% section class="language-sql" %}} ```shell -./beam-sql.sh --io +./beam-sql.sh --io iceberg ``` {{% /section %}} From d2e46c61bb0f5df5d66bbefc4590f3881ca44bd5 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Mar 2026 09:41:56 -0500 Subject: [PATCH 07/16] add yaml hadoop config --- .../www/site/content/en/documentation/io/built-in/iceberg.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md index 0abd7f13c25e..0c93cd57f99b 100644 --- a/website/www/site/content/en/documentation/io/built-in/iceberg.md +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -108,6 +108,11 @@ Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two {{< highlight py >}} {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" hadoop_catalog_config >}} {{< /highlight >}} + {{< highlight yaml >}} + catalog_props: &catalog_props + type: "hadoop" + warehouse: "file://tmp/beam-iceberg-local-quickstart" + {{< /highlight >}} {{< /tab >}} {{< tab BigLake >}} {{% section %}} From dbd29091b373fa237970d55406001c25a85ad880 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Mar 2026 14:57:03 -0500 Subject: [PATCH 08/16] small fixes, and add support for python Timestamp --- .../transforms/io/iceberg/Quickstart.java | 3 ++- .../beam/sdk/io/iceberg/IcebergUtils.java | 22 ++++++++++++++-- .../apache_beam/examples/snippets/snippets.py | 26 ++++++++++++++++--- .../en/documentation/io/built-in/iceberg.md | 6 ++--- 4 files changed, 47 insertions(+), 10 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java index fa519f748f6c..65c3af5d85ec 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -34,7 +34,7 @@ public static void main(String[] args) { Map catalogProps = ImmutableMap.of( "type", "hadoop", - "warehouse", "file://tmp/beam-iceberg-local-quickstart"); + "warehouse", "file:///tmp/beam-iceberg-local-quickstart"); // [END hadoop_catalog_props] } @@ -54,6 +54,7 @@ public static void other() { // [START managed_iceberg_config] Map managedConfig = ImmutableMap.of("table", "my_db.my_table", "catalog_properties", catalogProps); + // Note: The table will get created when inserting data (see below) // [END managed_iceberg_config] diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index f76d000628f5..cc97e23a3f57 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -32,9 +32,11 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; +import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; import org.apache.beam.sdk.schemas.logicaltypes.PassThroughLogicalType; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.util.Preconditions; @@ -75,6 +77,7 @@ private IcebergUtils() {} .put(SqlTypes.TIME.getIdentifier(), Types.TimeType.get()) .put(SqlTypes.DATETIME.getIdentifier(), Types.TimestampType.withoutZone()) .put(SqlTypes.UUID.getIdentifier(), Types.UUIDType.get()) + .put(MicrosInstant.IDENTIFIER, Types.TimestampType.withZone()) .build(); private static Schema.FieldType icebergTypeToBeamFieldType(final Type type) { @@ -294,6 +297,13 @@ public static org.apache.iceberg.Schema beamSchemaToIcebergSchema(final Schema s /** Converts a Beam {@link Row} to an Iceberg {@link Record}. */ public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Row row) { + if (row.getSchema().getFieldCount() != schema.columns().size()) { + throw new IllegalStateException( + String.format( + "Beam Row schema and Iceberg schema have different sizes.\n\tBeam Row columns: %s\n\tIceberg schema columns: %s", + row.getSchema().getFieldNames(), + schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toList()))); + } return copyRowIntoRecord(GenericRecord.create(schema), row); } @@ -419,7 +429,11 @@ private static void copyFieldIntoRecord(Record rec, Types.NestedField field, Row private static Object getIcebergTimestampValue(Object beamValue, boolean shouldAdjustToUtc) { // timestamptz if (shouldAdjustToUtc) { - if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME + if (beamValue instanceof java.time.Instant) { // MicrosInstant + java.time.Instant instant = (java.time.Instant) beamValue; + return DateTimeUtil.timestamptzFromNanos( + TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano()); + } else if (beamValue instanceof LocalDateTime) { // SqlTypes.DATETIME return OffsetDateTime.of((LocalDateTime) beamValue, ZoneOffset.UTC); } else if (beamValue instanceof Instant) { // FieldType.DATETIME return DateTimeUtil.timestamptzFromMicros(((Instant) beamValue).getMillis() * 1000L); @@ -434,7 +448,11 @@ private static Object getIcebergTimestampValue(Object beamValue, boolean shouldA } // timestamp - if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME + if (beamValue instanceof java.time.Instant) { // MicrosInstant + java.time.Instant instant = (java.time.Instant) beamValue; + return DateTimeUtil.timestampFromNanos( + TimeUnit.SECONDS.toNanos(instant.getEpochSecond()) + instant.getNano()); + } else if (beamValue instanceof LocalDateTime) { // SqlType.DATETIME return beamValue; } else if (beamValue instanceof Instant) { // FieldType.DATETIME return DateTimeUtil.timestampFromMicros(((Instant) beamValue).getMillis() * 1000L); diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 8edcaace70d9..a61088e895c0 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1081,7 +1081,8 @@ def model_managed_iceberg(): """Examples for Managed Iceberg sources and sinks.""" # [START hadoop_catalog_config] hadoop_catalog_props = { - 'type': 'hadoop', 'warehouse': 'file://tmp/beam-iceberg-local-quickstart' + 'type': 'hadoop', + 'warehouse': 'file:///tmp/beam-iceberg-local-quickstart' } # [END hadoop_catalog_config] @@ -1089,6 +1090,7 @@ def model_managed_iceberg(): managed_config = { 'table': 'my_db.my_table', 'catalog_properties': hadoop_catalog_props } + # Note: The table will get created when inserting data (see below) # [END managed_iceberg_config] @@ -1125,15 +1127,14 @@ def model_managed_iceberg(): # [END biglake_catalog_config] # [START model_managed_iceberg_data_types] - from decimal import Decimal import apache_beam as beam from apache_beam.utils.timestamp import Timestamp row = beam.Row( boolean_field=True, int_field=1, - float_field=2.3, - numeric_field=Decimal('34'), + long_field=2, + float_field=3.45, bytes_field=b'value', string_field="value", timestamptz_field=Timestamp(4, 5), @@ -1142,6 +1143,23 @@ def model_managed_iceberg(): "a": 1, "b": 2 }, struct_field=beam.Row(nested_field="nested_value", nested_field2=123)) + + import numpy as np + from apache_beam.typehints.row_type import RowTypeConstraint + from typing import Sequence + + # Override data schema by adding `with_output_types` to the transform: + beam.Create(row).with_output_types( + RowTypeConstraint.from_fields([ + ('boolean_field', bool), ('int_field', int), ('long_field', np.int64), + ('float_field', float), ('bytes_field', bytes), ('string_field', str), + ('timestamptz_field', Timestamp), ('array_field', Sequence[int]), + ('map_field', dict[str, int]), + ( + 'struct_field', + RowTypeConstraint.from_fields([('nested_field', str), + ('nested_field2', int)])) + ])) # [END model_managed_iceberg_data_types] diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md index 0c93cd57f99b..3a0732efbfa0 100644 --- a/website/www/site/content/en/documentation/io/built-in/iceberg.md +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -99,7 +99,7 @@ Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two CREATE CATALOG my_catalog TYPE 'iceberg' PROPERTIES ( 'type' = 'hadoop', - 'warehouse' = 'file://tmp/beam-iceberg-local-quickstart', + 'warehouse' = 'file:///tmp/beam-iceberg-local-quickstart', ); {{< /highlight >}} {{< highlight java>}} @@ -111,7 +111,7 @@ Beam supports a wide variety of Iceberg catalogs, but this guide focuses on two {{< highlight yaml >}} catalog_props: &catalog_props type: "hadoop" - warehouse: "file://tmp/beam-iceberg-local-quickstart" + warehouse: "file:///tmp/beam-iceberg-local-quickstart" {{< /highlight >}} {{< /tab >}} {{< tab BigLake >}} @@ -161,7 +161,7 @@ You can use Beam SQL to create a new namespace through an explicit DDL statement CREATE DATABASE my_catalog.my_db; ``` -Alternatively, the IcebergIO sink can handle namespace creation automatically at runtime. +Alternatively, the IcebergIO sink can automatically create missing namespaces at runtime. This is ideal for dynamic pipelines where destinations are determined by the incoming data ### Create a Table From c3d802de82f50bbd285e1268c46d878df031ceda Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Mar 2026 15:41:25 -0500 Subject: [PATCH 09/16] spotless --- .../beam/sdk/io/iceberg/IcebergUtils.java | 37 ++++++++++--------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index cc97e23a3f57..953136ccf92b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -17,23 +17,6 @@ */ package org.apache.beam.sdk.io.iceberg; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - -import java.nio.ByteBuffer; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; @@ -55,6 +38,24 @@ import org.joda.time.DateTime; import org.joda.time.Instant; +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + /** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ public class IcebergUtils { private IcebergUtils() {} @@ -300,7 +301,7 @@ public static Record beamRowToIcebergRecord(org.apache.iceberg.Schema schema, Ro if (row.getSchema().getFieldCount() != schema.columns().size()) { throw new IllegalStateException( String.format( - "Beam Row schema and Iceberg schema have different sizes.\n\tBeam Row columns: %s\n\tIceberg schema columns: %s", + "Beam Row schema and Iceberg schema have different sizes.%n\tBeam Row columns: %s%n\tIceberg schema columns: %s", row.getSchema().getFieldNames(), schema.columns().stream().map(Types.NestedField::name).collect(Collectors.toList()))); } From 4a202d9b1d786427d5c6a315d8ff93308c637597 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 4 Mar 2026 16:17:14 -0500 Subject: [PATCH 10/16] spotless --- .../beam/sdk/io/iceberg/IcebergUtils.java | 35 +++++++++---------- 1 file changed, 17 insertions(+), 18 deletions(-) diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java index 953136ccf92b..08349bb4b909 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergUtils.java @@ -17,6 +17,23 @@ */ package org.apache.beam.sdk.io.iceberg; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.nio.ByteBuffer; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.FixedPrecisionNumeric; import org.apache.beam.sdk.schemas.logicaltypes.MicrosInstant; @@ -38,24 +55,6 @@ import org.joda.time.DateTime; import org.joda.time.Instant; -import java.nio.ByteBuffer; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.OffsetDateTime; -import java.time.ZoneOffset; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; -import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; -import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; - /** Utilities for converting between Beam and Iceberg types, made public for user's convenience. */ public class IcebergUtils { private IcebergUtils() {} From 1017c25250702cd94bb492c5cc361b569171faff Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Sun, 8 Mar 2026 11:49:56 -0400 Subject: [PATCH 11/16] add public data quickstart --- .../io/iceberg/IcebergBeamSchemaAndRow.java | 12 +- .../transforms/io/iceberg/Quickstart.java | 70 +++++- .../sdk/io/iceberg/IcebergScanConfig.java | 6 + .../apache_beam/examples/snippets/snippets.py | 46 +++- website/Dockerfile | 2 +- .../en/documentation/io/built-in/iceberg.md | 231 ++++++++++++------ .../content/en/documentation/io/connectors.md | 2 +- 7 files changed, 283 insertions(+), 86 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java index bdbd59ebd8d7..dc983e0187d1 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java @@ -18,12 +18,6 @@ package org.apache.beam.examples.snippets.transforms.io.iceberg; // [START iceberg_schema_and_row] - -import java.math.BigDecimal; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalTime; -import java.util.Arrays; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; @@ -31,6 +25,12 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.DateTime; +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Arrays; + public class IcebergBeamSchemaAndRow { Schema nestedSchema = Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build(); diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java index 65c3af5d85ec..f74370c1b208 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -17,18 +17,26 @@ */ package org.apache.beam.examples.snippets.transforms.io.iceberg; -import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.transforms.AddFields; +import org.apache.beam.sdk.schemas.transforms.Group; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Mean; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Map; + public class Quickstart { + static String PROJECT_ID = "apache-beam-testing"; + static String BUCKET_NAME = "my-bucket"; public static void main(String[] args) { // [START hadoop_catalog_props] Map catalogProps = @@ -38,14 +46,70 @@ public static void main(String[] args) { // [END hadoop_catalog_props] } + public static void publicDatasets() { + // [START biglake_public_catalog_props] + Map catalogProps = + ImmutableMap.of( + "type", "rest", + "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", + "warehouse", "gs://biglake-public-nyc-taxi-iceberg", + "header.x-goog-user-project", PROJECT_ID, + "rest.auth.type", "google", + "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", + "header.X-Iceberg-Access-Delegation", "vended-credentials"); + // [END biglake_public_catalog_props] + + // [START biglake_public_query] + Pipeline p = Pipeline.create(); + + // Set up query properties: + Map config = + ImmutableMap.of( + "table", + "public_data.nyc_taxicab", + "catalog_properties", + catalogProps, + "filter", + "data_file_year = 2021 AND tip_amount > 100", + "keep", + Arrays.asList("passenger_count", "total_amount", "trip_distance")); + + // Read Iceberg records + PCollection icebergRows = + p.apply(Managed.read("iceberg").withConfig(config)).getSinglePCollection(); + + // Perform further analysis on records + PCollection result = + icebergRows + .apply(AddFields.create().field("num_trips", Schema.FieldType.INT32, 1)) + .apply( + Group.byFieldNames("passenger_count") + .aggregateField("num_trips", Sum.ofIntegers(), "num_trips") + .aggregateField("total_amount", Mean.of(), "avg_fare") + .aggregateField("trip_distance", Mean.of(), "avg_distance")); + + // Print to console + result.apply( + MapElements.into(TypeDescriptors.voids()) + .via( + row -> { + System.out.println(row); + return null; + })); + + // Execute + p.run().waitUntilFinish(); + // [END biglake_public_query] + } + public static void other() { // [START biglake_catalog_props] Map catalogProps = ImmutableMap.of( "type", "rest", "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", - "warehouse", "gs://biglake-public-nyc-taxi-iceberg", - "header.x-goog-user-project", "$PROJECT_ID", + "warehouse", "gs://" + BUCKET_NAME, + "header.x-goog-user-project", PROJECT_ID, "rest.auth.type", "google", "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", "header.X-Iceberg-Access-Delegation", "vended-credentials"); diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java index 3829baa43665..91e9d75ae7ad 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergScanConfig.java @@ -42,6 +42,8 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @AutoValue public abstract class IcebergScanConfig implements Serializable { @@ -65,9 +67,13 @@ public enum ScanType { @Pure public abstract String getTableIdentifier(); + private static final Logger LOG = LoggerFactory.getLogger(IcebergScanConfig.class); + @Pure public Table getTable() { if (cachedTable == null) { + System.out.println("xxx loading a new table"); + LOG.info("xxx loading a new table"); cachedTable = getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier())); } diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index a61088e895c0..49136d4e3779 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -1114,17 +1114,55 @@ def model_managed_iceberg(): | beam.LogElements()) # [END managed_iceberg_read] - # [START biglake_catalog_config] - biglake_catalog_config = { + BUCKET_NAME = "" + PROJECT_ID = "" + + # [START biglake_public_catalog_props] + biglake_catalog_props = { 'type': 'rest', 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog', 'warehouse': 'gs://biglake-public-nyc-taxi-iceberg', - 'header.x-goog-user-project': '$PROJECT_ID', + 'header.x-goog-user-project': PROJECT_ID, + 'rest.auth.type': 'google', + 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation': 'vended-credentials' + } + # [END biglake_public_catalog_props] + + # [START biglake_public_query] + from statistics import mean + + config = { + "table": "public_data.nyc_taxicab", + "catalog_properties": biglake_catalog_props, + "filter": "data_file_year = 2021 AND tip_amount > 100", + "keep": ["passenger_count", "total_amount", "trip_distance"] + } + + with beam.Pipeline() as p: + rows = p | beam.managed.Read("iceberg", config=config) + + result = ( + rows | beam.Select(num_trips=lambda x: 1, *rows.element_type._fields) + | beam.GroupBy('passenger_count').aggregate_field( + 'num_trips', sum, 'total_trips').aggregate_field( + 'total_amount', mean, 'avg_fare').aggregate_field( + 'trip_distance', mean, 'avg_distance')) + + result | beam.Map(print) + # [END biglake_public_query] + + # [START biglake_catalog_props] + biglake_catalog_config = { + 'type': 'rest', + 'uri': 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse': 'gs://' + BUCKET_NAME, + 'header.x-goog-user-project': PROJECT_ID, 'rest.auth.type': 'google', 'io-impl': 'org.apache.iceberg.gcp.gcs.GCSFileIO', 'header.X-Iceberg-Access-Delegation': 'vended-credentials' } - # [END biglake_catalog_config] + # [END biglake_catalog_props] # [START model_managed_iceberg_data_types] import apache_beam as beam diff --git a/website/Dockerfile b/website/Dockerfile index 1d18ee762318..d275b00f3673 100644 --- a/website/Dockerfile +++ b/website/Dockerfile @@ -18,7 +18,7 @@ # This image contains Hugo and dependencies required to build and test the Beam # website. It is used by tasks in build.gradle. -FROM debian:stable-slim +FROM --platform=linux/amd64 debian:stable-slim SHELL ["/bin/bash", "-o", "pipefail", "-e", "-u", "-x", "-c"] diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md index 3a0732efbfa0..bc3bfccc072f 100644 --- a/website/www/site/content/en/documentation/io/built-in/iceberg.md +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -48,9 +48,12 @@ To use IcebergIO, install the [Beam SQL Shell](https://beam.apache.org/documenta {{% /section %}} {{< paragraph >}} -Additional resources: +If you're new to Iceberg, check out the [basics section](#iceberg-basics) under the guide. {{< /paragraph >}} +Additional resources: + + {{< paragraph wrap="span" >}} * [IcebergIO configuration parameters](https://beam.apache.org/documentation/io/managed-io/#iceberg-write) * [IcebergIO source code](https://github.com/apache/beam/tree/master/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg) @@ -59,28 +62,95 @@ Additional resources: * [Apache Iceberg terms](https://iceberg.apache.org/terms/) {{< /paragraph >}} -## Iceberg basics - -### Catalogs - -A catalog is a top-level entity used to manage and access Iceberg tables. There are many catalog implementations out there; -this guide focuses on the Hadoop catalog for easy local testing and BigLake REST catalog for cloud-scale development. - -### Namespaces -A namespace lives inside a catalog and may contain a number of Iceberg tables. This is the equivalent of a "database". +## Quickstart with Public Datasets -### Tables +We can jump straight into reading some high-quality public datasets served via Iceberg's REST Catalog. +These datasets are hosted on Google Cloud's BigLake and are available to read by anyone, making it a good +resource to experiment with. -The actual entity containing data, and is described by a schema and partition spec. +There are some prerequisites to using the BigLake Catalog: +- A Google Cloud Project (for authentication). Create an account [here](https://docs.cloud.google.com/docs/get-started) if you don't have one. +- Standard Google [Application Default Credentials](https://docs.cloud.google.com/docs/authentication/set-up-adc-local-dev-environment#local-user-cred) (ADC) set up in your environment. +- A [Google Cloud Storage bucket](https://docs.cloud.google.com/storage/docs/creating-buckets) -### Snapshots +When you've met those prerequisites, start by setting up your catalog: +{{< highlight sql>}} + CREATE CATALOG my_catalog TYPE 'iceberg' + PROPERTIES ( + 'type' = 'rest', + 'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog', + 'warehouse' = 'gs://biglake-public-nyc-taxi-iceberg', + 'header.x-goog-user-project' = '$PROJECT_ID', + 'rest.auth.type' = 'google', + 'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO', + 'header.X-Iceberg-Access-Delegation' = 'vended-credentials' + ); +{{< /highlight >}} +{{< highlight java>}} + {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" biglake_public_catalog_props >}} +{{< /highlight >}} +{{< highlight py >}} + {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" biglake_public_catalog_props >}} +{{< /highlight >}} +{{< highlight yaml >}} +catalog_props: &catalog_props + type: "rest" + uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog" + warehouse: "gs://biglake-public-nyc-taxi-iceberg" + header.x-goog-user-project: *PROJECT_ID + rest.auth.type: "google" + io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO" + header.X-Iceberg-Access-Delegation: "vended-credentials" +{{< /highlight >}} -A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change -and references its parent snapshot. An Iceberg table's history is a chronological list of snapshots, enabling features -like time travel and ACID-compliant concurrent writes. +Now simply query the public dataset: +{{< highlight sql>}} + SELECT + passenger_count, + COUNT(1) AS num_trips, + ROUND(AVG(total_amount), 2) AS avg_fare, + ROUND(AVG(trip_distance), 2) AS avg_distance + FROM + bqms.public_data.nyc_taxicab + WHERE + data_file_year = 2021 + AND tip_amount > 100 + GROUP BY + passenger_count + ORDER BY + num_trips DESC; +{{< /highlight >}} +{{< highlight java>}} + {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" biglake_public_query >}} +{{< /highlight >}} +{{< highlight py >}} + {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" biglake_public_query >}} +{{< /highlight >}} +{{< highlight yaml >}} + pipeline: + type: chain + transforms: + - type: ReadFromIceberg + config: + table: "public_data.nyc_taxicab" + catalog_properties: *biglake_catalog_props + filter: "data_file_year = 2021 AND tip_amount > 100" + keep: [ "passenger_count", "total_amount", "trip_distance" ] + - type: Sql + config: + query: "SELECT + passenger_count, + COUNT(1) AS num_trips, + ROUND(AVG(total_amount), 2) AS avg_fare, + ROUND(AVG(trip_distance), 2) AS avg_distance + FROM + PCOLLECTION + GROUP BY + passenger_count" + {{< /highlight >}} -## Quickstart Guide +## User Guide ### Choose Your Catalog @@ -140,14 +210,14 @@ built-in credential delegation and unified metadata management. It requires a fe {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java" biglake_catalog_props >}} {{< /highlight >}} {{< highlight py >}} - {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" biglake_catalog_config >}} + {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" biglake_catalog_props >}} {{< /highlight >}} {{< highlight yaml >}} catalog_props: &catalog_props type: "rest" uri: "https://biglake.googleapis.com/iceberg/v1/restcatalog" - warehouse: "gs://$BUCKET_NAME" - header.x-goog-user-project: "$PROJECT_ID" + warehouse: "gs://" + *BUCKET_NAME + header.x-goog-user-project: *PROJECT_ID rest.auth.type: "google" io-impl: "org.apache.iceberg.gcp.gcs.GCSFileIO" header.X-Iceberg-Access-Delegation: "vended-credentials" @@ -261,7 +331,10 @@ pipeline: - type: LogForTesting {{< /highlight >}} +## Further steps +Check out the full [IcebergIO configuration](https://beam.apache.org/documentation/io/managed-io/#iceberg-write) to make +use of other features like applying a partition spec, table properties, row filtering, column pruning, etc. ## Data Types @@ -273,64 +346,80 @@ other SDKs may have specific constraints on complex or experimental types. The f the standard mapping for core data types across SQL, Java, Python, and YAML: {{< highlight sql >}} -INSERT INTO catalog.namespace.table VALUES ( -9223372036854775807, -- BIGINT -2147483647, -- INTEGER -1.0, -- FLOAT -1.0, -- DOUBLE -TRUE, -- BOOLEAN -TIMESTAMP '2018-05-28 20:17:40.123', -- TIMESTAMP -'varchar', -- VARCHAR -'char', -- CHAR -ARRAY['abc', 'xyz'], -- ARRAY -ARRAY[CAST(ROW('abc', 123) AS ROW(nested_str VARCHAR, nested_int INTEGER))] -- ARRAY[STRUCT] -) + INSERT INTO catalog.namespace.table VALUES ( + 9223372036854775807, -- BIGINT + 2147483647, -- INTEGER + 1.0, -- FLOAT + 1.0, -- DOUBLE + TRUE, -- BOOLEAN + TIMESTAMP '2018-05-28 20:17:40.123', -- TIMESTAMP + 'varchar', -- VARCHAR + 'char', -- CHAR + ARRAY['abc', 'xyz'], -- ARRAY + ARRAY[CAST(ROW('abc', 123) AS ROW(nested_str VARCHAR, nested_int INTEGER))] -- ARRAY[STRUCT] + ) {{< /highlight >}} {{< highlight java >}} -{{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java" iceberg_schema_and_row >}} + {{< code_sample "examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java" iceberg_schema_and_row >}} {{< /highlight >}} {{< highlight py >}} -{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_managed_iceberg_data_types >}} + {{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" model_managed_iceberg_data_types >}} {{< /highlight >}} {{< highlight yaml >}} -pipeline: - transforms: - - type: Create - config: - elements: - - boolean_field: false - integer_field: 123 - number_field: 4.56 - string_field: "abc" - struct_field: - nested_1: a - nested_2: 1 - array_field: [1, 2, 3] - output_schema: - type: object - properties: - boolean_field: - type: boolean - integer_field: - type: integer - number_field: - type: number - string_field: - type: string - struct_field: - type: object - properties: - nested_1: - type: string - nested_2: - type: integer - array_field: - type: array - items: + pipeline: + transforms: + - type: Create + config: + elements: + - boolean_field: false + integer_field: 123 + number_field: 4.56 + string_field: "abc" + struct_field: + nested_1: a + nested_2: 1 + array_field: [1, 2, 3] + output_schema: + type: object + properties: + boolean_field: + type: boolean + integer_field: type: integer + number_field: + type: number + string_field: + type: string + struct_field: + type: object + properties: + nested_1: + type: string + nested_2: + type: integer + array_field: + type: array + items: + type: integer {{< /highlight >}} -## Further steps +## Iceberg basics -Check out the full [IcebergIO configuration](https://beam.apache.org/documentation/io/managed-io/#iceberg-write) to make -use of other features like applying a partition spec, table properties, row filtering, column pruning, etc. +### Catalogs + +A catalog is a top-level entity used to manage and access Iceberg tables. There are many catalog implementations out there; +this guide focuses on the Hadoop catalog for easy local testing and BigLake REST catalog for cloud-scale development. + +### Namespaces + +A namespace lives inside a catalog and may contain a number of Iceberg tables. This is the equivalent of a "database". + +### Tables + +The actual entity containing data, and is described by a schema and partition spec. + +### Snapshots + +A new snapshot is created whenever a change is made to an Iceberg table. Each snapshot provides a summary of the change +and references its parent snapshot. An Iceberg table's history is a chronological list of snapshots, enabling features +like time travel and ACID-compliant concurrent writes. diff --git a/website/www/site/content/en/documentation/io/connectors.md b/website/www/site/content/en/documentation/io/connectors.md index 9797195518e9..79a002a6c167 100644 --- a/website/www/site/content/en/documentation/io/connectors.md +++ b/website/www/site/content/en/documentation/io/connectors.md @@ -1051,7 +1051,7 @@ This table provides a consolidated, at-a-glance overview of the available built- - Iceberg (Managed I/O) + Iceberg (guide) ✔ ✔ From 6401ba797a31a58ee5cd14a15ac99229e7b9b7e8 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Mar 2026 11:32:38 -0400 Subject: [PATCH 12/16] yaml instruction --- .../site/content/en/documentation/io/built-in/iceberg.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md index bc3bfccc072f..e0cd10b48452 100644 --- a/website/www/site/content/en/documentation/io/built-in/iceberg.md +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -47,6 +47,14 @@ To use IcebergIO, install the [Beam SQL Shell](https://beam.apache.org/documenta ``` {{% /section %}} +{{< paragraph class="language-yaml" >}} +To use IcebergIO with [Beam YAML](../../sdks/yaml), install the `yaml` extra: +{{< /paragraph >}} + +{{< highlight yaml >}} +pip install apache_beam[yaml] +{{< /highlight >}} + {{< paragraph >}} If you're new to Iceberg, check out the [basics section](#iceberg-basics) under the guide. {{< /paragraph >}} From 2973f6224143d9100dd6a7f705277c609c7f9140 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Mar 2026 11:33:22 -0400 Subject: [PATCH 13/16] spotless --- .../io/iceberg/IcebergBeamSchemaAndRow.java | 11 ++- .../transforms/io/iceberg/Quickstart.java | 68 +++++++++---------- 2 files changed, 39 insertions(+), 40 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java index dc983e0187d1..dd3d83683a59 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java @@ -18,6 +18,11 @@ package org.apache.beam.examples.snippets.transforms.io.iceberg; // [START iceberg_schema_and_row] +import java.math.BigDecimal; +import java.time.Instant; +import java.time.LocalDate; +import java.time.LocalTime; +import java.util.Arrays; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; import org.apache.beam.sdk.schemas.logicaltypes.Timestamp; @@ -25,12 +30,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.joda.time.DateTime; -import java.math.BigDecimal; -import java.time.Instant; -import java.time.LocalDate; -import java.time.LocalTime; -import java.util.Arrays; - public class IcebergBeamSchemaAndRow { Schema nestedSchema = Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build(); diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java index f74370c1b208..cfeeba0ffeb6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -17,6 +17,8 @@ */ package org.apache.beam.examples.snippets.transforms.io.iceberg; +import java.util.Arrays; +import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.managed.Managed; import org.apache.beam.sdk.schemas.Schema; @@ -31,12 +33,10 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import java.util.Arrays; -import java.util.Map; - public class Quickstart { static String PROJECT_ID = "apache-beam-testing"; static String BUCKET_NAME = "my-bucket"; + public static void main(String[] args) { // [START hadoop_catalog_props] Map catalogProps = @@ -49,14 +49,14 @@ public static void main(String[] args) { public static void publicDatasets() { // [START biglake_public_catalog_props] Map catalogProps = - ImmutableMap.of( - "type", "rest", - "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", - "warehouse", "gs://biglake-public-nyc-taxi-iceberg", - "header.x-goog-user-project", PROJECT_ID, - "rest.auth.type", "google", - "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", - "header.X-Iceberg-Access-Delegation", "vended-credentials"); + ImmutableMap.of( + "type", "rest", + "uri", "https://biglake.googleapis.com/iceberg/v1/restcatalog", + "warehouse", "gs://biglake-public-nyc-taxi-iceberg", + "header.x-goog-user-project", PROJECT_ID, + "rest.auth.type", "google", + "io-impl", "org.apache.iceberg.gcp.gcs.GCSFileIO", + "header.X-Iceberg-Access-Delegation", "vended-credentials"); // [END biglake_public_catalog_props] // [START biglake_public_query] @@ -64,38 +64,38 @@ public static void publicDatasets() { // Set up query properties: Map config = - ImmutableMap.of( - "table", - "public_data.nyc_taxicab", - "catalog_properties", - catalogProps, - "filter", - "data_file_year = 2021 AND tip_amount > 100", - "keep", - Arrays.asList("passenger_count", "total_amount", "trip_distance")); + ImmutableMap.of( + "table", + "public_data.nyc_taxicab", + "catalog_properties", + catalogProps, + "filter", + "data_file_year = 2021 AND tip_amount > 100", + "keep", + Arrays.asList("passenger_count", "total_amount", "trip_distance")); // Read Iceberg records PCollection icebergRows = - p.apply(Managed.read("iceberg").withConfig(config)).getSinglePCollection(); + p.apply(Managed.read("iceberg").withConfig(config)).getSinglePCollection(); // Perform further analysis on records PCollection result = - icebergRows - .apply(AddFields.create().field("num_trips", Schema.FieldType.INT32, 1)) - .apply( - Group.byFieldNames("passenger_count") - .aggregateField("num_trips", Sum.ofIntegers(), "num_trips") - .aggregateField("total_amount", Mean.of(), "avg_fare") - .aggregateField("trip_distance", Mean.of(), "avg_distance")); + icebergRows + .apply(AddFields.create().field("num_trips", Schema.FieldType.INT32, 1)) + .apply( + Group.byFieldNames("passenger_count") + .aggregateField("num_trips", Sum.ofIntegers(), "num_trips") + .aggregateField("total_amount", Mean.of(), "avg_fare") + .aggregateField("trip_distance", Mean.of(), "avg_distance")); // Print to console result.apply( - MapElements.into(TypeDescriptors.voids()) - .via( - row -> { - System.out.println(row); - return null; - })); + MapElements.into(TypeDescriptors.voids()) + .via( + row -> { + System.out.println(row); + return null; + })); // Execute p.run().waitUntilFinish(); From 611744c3b520bb0143b2ba971ed8737106120539 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 10 Mar 2026 11:50:19 -0400 Subject: [PATCH 14/16] spotless --- .../examples/snippets/transforms/io/iceberg/Quickstart.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java index cfeeba0ffeb6..9d48c0a175a1 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/Quickstart.java @@ -34,8 +34,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class Quickstart { - static String PROJECT_ID = "apache-beam-testing"; - static String BUCKET_NAME = "my-bucket"; + static final String PROJECT_ID = "apache-beam-testing"; + static final String BUCKET_NAME = "my-bucket"; public static void main(String[] args) { // [START hadoop_catalog_props] From 1dca560f370f3e8263a1bf536942b8b8d00ddf86 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Wed, 11 Mar 2026 11:15:44 -0400 Subject: [PATCH 15/16] fix spotbugs --- .../io/iceberg/IcebergBeamSchemaAndRow.java | 90 ++++++++++--------- 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java index dd3d83683a59..2d2c934aa8fd 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java +++ b/examples/java/src/main/java/org/apache/beam/examples/snippets/transforms/io/iceberg/IcebergBeamSchemaAndRow.java @@ -31,49 +31,53 @@ import org.joda.time.DateTime; public class IcebergBeamSchemaAndRow { - Schema nestedSchema = - Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build(); - Schema beamSchema = - Schema.builder() - .addBooleanField("boolean_field") - .addInt32Field("int_field") - .addInt64Field("long_field") - .addFloatField("float_field") - .addDoubleField("double_field") - .addDecimalField("numeric_field") - .addByteArrayField("bytes_field") - .addStringField("string_field") - .addLogicalTypeField("time_field", SqlTypes.TIME) - .addLogicalTypeField("date_field", SqlTypes.DATE) - .addLogicalTypeField("timestamp_field", Timestamp.MICROS) - .addDateTimeField("timestamptz_field") - .addArrayField("array_field", Schema.FieldType.INT32) - .addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32) - .addRowField("struct_field", nestedSchema) - .build(); + public Row createRow() { + Schema nestedSchema = + Schema.builder().addStringField("nested_field").addInt32Field("nested_field_2").build(); + Schema beamSchema = + Schema.builder() + .addBooleanField("boolean_field") + .addInt32Field("int_field") + .addInt64Field("long_field") + .addFloatField("float_field") + .addDoubleField("double_field") + .addDecimalField("numeric_field") + .addByteArrayField("bytes_field") + .addStringField("string_field") + .addLogicalTypeField("time_field", SqlTypes.TIME) + .addLogicalTypeField("date_field", SqlTypes.DATE) + .addLogicalTypeField("timestamp_field", Timestamp.MICROS) + .addDateTimeField("timestamptz_field") + .addArrayField("array_field", Schema.FieldType.INT32) + .addMapField("map_field", Schema.FieldType.STRING, Schema.FieldType.INT32) + .addRowField("struct_field", nestedSchema) + .build(); - Row beamRow = - Row.withSchema(beamSchema) - .withFieldValues( - ImmutableMap.builder() - .put("boolean_field", true) - .put("int_field", 1) - .put("long_field", 2L) - .put("float_field", 3.4f) - .put("double_field", 4.5d) - .put("numeric_field", new BigDecimal(67)) - .put("bytes_field", new byte[] {1, 2, 3}) - .put("string_field", "value") - .put("time_field", LocalTime.now()) - .put("date_field", LocalDate.now()) - .put("timestamp_field", Instant.now()) - .put("timestamptz_field", DateTime.now()) - .put("array_field", Arrays.asList(1, 2, 3)) - .put("map_field", ImmutableMap.of("a", 1, "b", 2)) - .put( - "struct_field", - Row.withSchema(nestedSchema).addValues("nested_value", 123).build()) - .build()) - .build(); + Row beamRow = + Row.withSchema(beamSchema) + .withFieldValues( + ImmutableMap.builder() + .put("boolean_field", true) + .put("int_field", 1) + .put("long_field", 2L) + .put("float_field", 3.4f) + .put("double_field", 4.5d) + .put("numeric_field", new BigDecimal(67)) + .put("bytes_field", new byte[] {1, 2, 3}) + .put("string_field", "value") + .put("time_field", LocalTime.now()) + .put("date_field", LocalDate.now()) + .put("timestamp_field", Instant.now()) + .put("timestamptz_field", DateTime.now()) + .put("array_field", Arrays.asList(1, 2, 3)) + .put("map_field", ImmutableMap.of("a", 1, "b", 2)) + .put( + "struct_field", + Row.withSchema(nestedSchema).addValues("nested_value", 123).build()) + .build()) + .build(); + + return beamRow; + } } // [END iceberg_schema_and_row] From 27cb5b563ad7104bd75dfa204e9958592e14b63d Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Thu, 12 Mar 2026 12:25:45 -0400 Subject: [PATCH 16/16] tweaks --- .../site/content/en/documentation/io/built-in/iceberg.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/website/www/site/content/en/documentation/io/built-in/iceberg.md b/website/www/site/content/en/documentation/io/built-in/iceberg.md index e0cd10b48452..0021124f2b41 100644 --- a/website/www/site/content/en/documentation/io/built-in/iceberg.md +++ b/website/www/site/content/en/documentation/io/built-in/iceberg.md @@ -234,12 +234,12 @@ built-in credential delegation and unified metadata management. It requires a fe ### Create a Namespace -You can use Beam SQL to create a new namespace through an explicit DDL statement: +If you're on Beam SQL, you can explicitly create a new namespace: ```sql CREATE DATABASE my_catalog.my_db; ``` -Alternatively, the IcebergIO sink can automatically create missing namespaces at runtime. +Alternatively, the IcebergIO sink will automatically create missing namespaces at runtime. This is ideal for dynamic pipelines where destinations are determined by the incoming data ### Create a Table @@ -309,7 +309,7 @@ pipeline: ### View Namespaces and Tables -You can use Beam SQL to view the newly created resources: +If you're on Beam SQL, you can view the newly created resources: ```sql SHOW DATABASES my_catalog; ```