Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
"modification": 5
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).
* [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support
* Python: Added JupyterLab 4.x extension compatibility for enhanced notebook integration ([#34495](https://github.com/apache/beam/pull/34495)).
* [IcebergIO] Dynamically create namespaces if needed ([#35228](https://github.com/apache/beam/pull/35228))

## Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.Record;
Expand Down Expand Up @@ -278,31 +280,46 @@ static String getPartitionDataPath(
* using the Iceberg API.
*/
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
Namespace namespace = identifier.namespace();
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table == null) {
synchronized (TABLE_CACHE) {
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
if (table != null) {
table.refresh();
return table;
}

synchronized (TABLE_CACHE) {
// Create namespace if it does not exist yet
if (catalog instanceof SupportsNamespaces) {
SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog;
if (!supportsNamespaces.namespaceExists(namespace)) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
supportsNamespaces.createNamespace(namespace);
LOG.info("Created new namespace '{}'.", namespace);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this namespace
}
}
TABLE_CACHE.put(identifier, table);
}
} else {
// If fetching from cache, refresh the table to avoid working with stale metadata
// (e.g. partition spec)
table.refresh();

// If table exists, just load it
// Note: the implementation of catalog.tableExists() will load the table to check its
// existence. We don't use it here to avoid double loadTable() calls.
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) { // Otherwise, create the table
org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
try {
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException ignored) {
// race condition: another worker already created this table
table = catalog.loadTable(identifier);
}
}
}

TABLE_CACHE.put(identifier, table);
return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.Serializable;
import java.util.List;
Expand All @@ -44,6 +46,8 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
Expand Down Expand Up @@ -107,6 +111,38 @@ public void testSimpleAppend() throws Exception {
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}

@Test
public void testCreateNamespaceAndTable() {
Namespace newNamespace = Namespace.of("new_namespace");
TableIdentifier tableId =
TableIdentifier.of(newNamespace, "table" + Long.toString(UUID.randomUUID().hashCode(), 16));

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
.put("warehouse", warehouse.location)
.build();

IcebergCatalogConfig catalog =
IcebergCatalogConfig.builder().setCatalogProperties(catalogProps).build();

testPipeline
.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA))
.apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId));

assertFalse(((SupportsNamespaces) catalog.catalog()).namespaceExists(newNamespace));
LOG.info("Executing pipeline");
testPipeline.run().waitUntilFinish();
assertTrue(((SupportsNamespaces) catalog.catalog()).namespaceExists(newNamespace));
LOG.info("Done running pipeline");

Table table = warehouse.loadTable(tableId);
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());

assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}

/** Tests that a small write to three different tables with dynamic destinations works. */
@Test
public void testDynamicDestinationsWithoutSpillover() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
Expand All @@ -58,6 +56,7 @@
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -111,18 +110,37 @@ private WindowedValue<IcebergDestination> getWindowedDestination(
String tableName, org.apache.iceberg.Schema schema, @Nullable PartitionSpec partitionSpec) {
TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName);

warehouse.createTable(tableIdentifier, schema, partitionSpec);
// TODO: remove when we enable dynamic table creation with partition specs
if (partitionSpec != null) {
warehouse.createTable(tableIdentifier, schema, partitionSpec);
}

IcebergDestination icebergDestination =
IcebergDestination.builder()
.setFileFormat(FileFormat.PARQUET)
.setTableIdentifier(tableIdentifier)
.build();
return WindowedValues.of(
icebergDestination,
GlobalWindow.TIMESTAMP_MAX_VALUE,
GlobalWindow.INSTANCE,
PaneInfo.NO_FIRING);
return WindowedValues.valueInGlobalWindow(icebergDestination);
}

@Test
public void testCreateNamespaceAndTable() {
RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3);
Namespace newNamespace = Namespace.of("new_namespace");
TableIdentifier identifier = TableIdentifier.of(newNamespace, testName.getMethodName());
WindowedValue<IcebergDestination> dest =
WindowedValues.valueInGlobalWindow(
IcebergDestination.builder()
.setFileFormat(FileFormat.PARQUET)
.setTableIdentifier(identifier)
.build());

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build();

assertFalse(catalog.namespaceExists(newNamespace));
boolean writeSuccess = writerManager.write(dest, row);
assertTrue(writeSuccess);
assertTrue(catalog.namespaceExists(newNamespace));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,39 +38,22 @@
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT {
private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryMetastoreCatalogIT");
static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
static final String DATASET = "managed_iceberg_bqms_tests_" + System.nanoTime();;
private long salt = System.nanoTime();

@BeforeClass
public static void createDataset() throws IOException, InterruptedException {
BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET);
}

@AfterClass
public static void deleteDataset() {
BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
}

@Override
public String tableId() {
return DATASET + "." + testName.getMethodName() + "_" + salt;
public String type() {
return "bqms";
}

@Override
public Catalog createCatalog() {
salt += System.nanoTime();
return CatalogUtil.loadCatalog(
BQMS_CATALOG,
"bqms_" + catalogName,
catalogName,
ImmutableMap.<String, String>builder()
.put("gcp_project", OPTIONS.getProject())
.put("gcp_location", "us-central1")
Expand All @@ -79,16 +62,6 @@ public Catalog createCatalog() {
new Configuration());
}

@Override
public void catalogCleanup() {
for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) {
// only delete tables that were created in this test run
if (tableIdentifier.name().contains(String.valueOf(salt))) {
catalog.dropTable(tableIdentifier);
}
}
}

@Override
public Map<String, Object> managedIcebergConfig(String tableId) {
return ImmutableMap.<String, Object>builder()
Expand All @@ -108,6 +81,14 @@ public Map<String, Object> managedIcebergConfig(String tableId) {
@Test
public void testWriteToPartitionedAndValidateWithBQQuery()
throws IOException, InterruptedException {
// querying with the client seems to work only when the dataset
// is created by the client (not iceberg)
BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName());
String newNamespace = namespace() + "_new";
namespacesToCleanup.add(newNamespace);
bqClient.createNewDataset(OPTIONS.getProject(), newNamespace);
String tableId = newNamespace + ".test_table";

// For an example row where bool_field=true, modulo_5=3, str=value_303,
// this partition spec will create a partition like:
// /bool_field=true/modulo_5=3/str_trunc=value_3/
Expand All @@ -117,17 +98,17 @@ public void testWriteToPartitionedAndValidateWithBQQuery()
.hour("datetime")
.truncate("str", "value_x".length())
.build();
catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec);
catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec);

// Write with Beam
Map<String, Object> config = managedIcebergConfig(tableId());
Map<String, Object> config = managedIcebergConfig(tableId);
PCollection<Row> input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

// Fetch records using a BigQuery query and validate
BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName());
String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableId());

String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableId);
List<TableRow> rows = bqClient.queryUnflattened(query, OPTIONS.getProject(), true, true);
List<Row> beamRows =
rows.stream()
Expand All @@ -137,7 +118,7 @@ public void testWriteToPartitionedAndValidateWithBQQuery()
assertThat(beamRows, containsInAnyOrder(inputRows.toArray()));

String queryByPartition =
String.format("SELECT bool_field, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId());
String.format("SELECT bool_field, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId);
rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true);
RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool_field", "datetime"));
beamRows =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
*/
package org.apache.beam.sdk.io.iceberg.catalog;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hadoop.HadoopCatalog;

public class HadoopCatalogIT extends IcebergCatalogBaseIT {
@Override
public String type() {
return "hadoop";
}

@Override
public Catalog createCatalog() {
Configuration catalogHadoopConf = new Configuration();
Expand All @@ -37,21 +38,11 @@ public Catalog createCatalog() {

HadoopCatalog catalog = new HadoopCatalog();
catalog.setConf(catalogHadoopConf);
catalog.initialize("hadoop_" + catalogName, ImmutableMap.of("warehouse", warehouse));
catalog.initialize(catalogName, ImmutableMap.of("warehouse", warehouse));

return catalog;
}

@Override
public void catalogCleanup() throws IOException {
HadoopCatalog hadoopCatalog = (HadoopCatalog) catalog;
List<TableIdentifier> tables = hadoopCatalog.listTables(Namespace.of(testName.getMethodName()));
for (TableIdentifier identifier : tables) {
hadoopCatalog.dropTable(identifier);
}
hadoopCatalog.close();
}

@Override
public Map<String, Object> managedIcebergConfig(String tableId) {
return ImmutableMap.<String, Object>builder()
Expand Down
Loading
Loading