diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json index 98be2d60cbf9..5d04b2c0a8c7 100644 --- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json +++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json @@ -1,5 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3, - "https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface" + "modification": 5 } diff --git a/CHANGES.md b/CHANGES.md index 0c126e4087e7..8a41039d9950 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -82,6 +82,7 @@ * [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)). * [YAML] WriteToTFRecord and ReadFromTFRecord Beam YAML support * Python: Added JupyterLab 4.x extension compatibility for enhanced notebook integration ([#34495](https://github.com/apache/beam/pull/34495)). +* [IcebergIO] Dynamically create namespaces if needed ([#35228](https://github.com/apache/beam/pull/35228)) ## Breaking Changes diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java index 142170acc67b..f7e5893456f8 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java @@ -49,6 +49,8 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.InternalRecordWrapper; import org.apache.iceberg.data.Record; @@ -278,31 +280,46 @@ static String getPartitionDataPath( * using the Iceberg API. */ private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) { + Namespace namespace = identifier.namespace(); @Nullable Table table = TABLE_CACHE.getIfPresent(identifier); - if (table == null) { - synchronized (TABLE_CACHE) { - try { - table = catalog.loadTable(identifier); - } catch (NoSuchTableException e) { + if (table != null) { + table.refresh(); + return table; + } + + synchronized (TABLE_CACHE) { + // Create namespace if it does not exist yet + if (catalog instanceof SupportsNamespaces) { + SupportsNamespaces supportsNamespaces = (SupportsNamespaces) catalog; + if (!supportsNamespaces.namespaceExists(namespace)) { try { - org.apache.iceberg.Schema tableSchema = - IcebergUtils.beamSchemaToIcebergSchema(dataSchema); - // TODO(ahmedabu98): support creating a table with a specified partition spec - table = catalog.createTable(identifier, tableSchema); - LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema); - } catch (AlreadyExistsException alreadyExistsException) { - // handle race condition where workers are concurrently creating the same table. - // if running into already exists exception, we perform one last load - table = catalog.loadTable(identifier); + supportsNamespaces.createNamespace(namespace); + LOG.info("Created new namespace '{}'.", namespace); + } catch (AlreadyExistsException ignored) { + // race condition: another worker already created this namespace } } - TABLE_CACHE.put(identifier, table); } - } else { - // If fetching from cache, refresh the table to avoid working with stale metadata - // (e.g. partition spec) - table.refresh(); + + // If table exists, just load it + // Note: the implementation of catalog.tableExists() will load the table to check its + // existence. We don't use it here to avoid double loadTable() calls. + try { + table = catalog.loadTable(identifier); + } catch (NoSuchTableException e) { // Otherwise, create the table + org.apache.iceberg.Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(dataSchema); + try { + // TODO(ahmedabu98): support creating a table with a specified partition spec + table = catalog.createTable(identifier, tableSchema); + LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema); + } catch (AlreadyExistsException ignored) { + // race condition: another worker already created this table + table = catalog.loadTable(identifier); + } + } } + + TABLE_CACHE.put(identifier, table); return table; } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java index 87a543a439ec..be1125b21734 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java @@ -19,6 +19,8 @@ import static org.apache.beam.sdk.io.iceberg.IcebergUtils.beamRowToIcebergRecord; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import java.io.Serializable; import java.util.List; @@ -44,6 +46,8 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.data.IcebergGenerics; @@ -107,6 +111,38 @@ public void testSimpleAppend() throws Exception { assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); } + @Test + public void testCreateNamespaceAndTable() { + Namespace newNamespace = Namespace.of("new_namespace"); + TableIdentifier tableId = + TableIdentifier.of(newNamespace, "table" + Long.toString(UUID.randomUUID().hashCode(), 16)); + + Map catalogProps = + ImmutableMap.builder() + .put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP) + .put("warehouse", warehouse.location) + .build(); + + IcebergCatalogConfig catalog = + IcebergCatalogConfig.builder().setCatalogProperties(catalogProps).build(); + + testPipeline + .apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1))) + .setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA)) + .apply("Append To Table", IcebergIO.writeRows(catalog).to(tableId)); + + assertFalse(((SupportsNamespaces) catalog.catalog()).namespaceExists(newNamespace)); + LOG.info("Executing pipeline"); + testPipeline.run().waitUntilFinish(); + assertTrue(((SupportsNamespaces) catalog.catalog()).namespaceExists(newNamespace)); + LOG.info("Done running pipeline"); + + Table table = warehouse.loadTable(tableId); + List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build()); + + assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray())); + } + /** Tests that a small write to three different tables with dynamic destinations works. */ @Test public void testDynamicDestinationsWithoutSpillover() throws Exception { diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java index 1fef79e81240..b240442deb6d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/RecordWriterManagerTest.java @@ -41,8 +41,6 @@ import java.util.Map; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.sdk.values.WindowedValues; @@ -58,6 +56,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; import org.apache.iceberg.types.Conversions; @@ -111,18 +110,37 @@ private WindowedValue getWindowedDestination( String tableName, org.apache.iceberg.Schema schema, @Nullable PartitionSpec partitionSpec) { TableIdentifier tableIdentifier = TableIdentifier.of("default", tableName); - warehouse.createTable(tableIdentifier, schema, partitionSpec); + // TODO: remove when we enable dynamic table creation with partition specs + if (partitionSpec != null) { + warehouse.createTable(tableIdentifier, schema, partitionSpec); + } IcebergDestination icebergDestination = IcebergDestination.builder() .setFileFormat(FileFormat.PARQUET) .setTableIdentifier(tableIdentifier) .build(); - return WindowedValues.of( - icebergDestination, - GlobalWindow.TIMESTAMP_MAX_VALUE, - GlobalWindow.INSTANCE, - PaneInfo.NO_FIRING); + return WindowedValues.valueInGlobalWindow(icebergDestination); + } + + @Test + public void testCreateNamespaceAndTable() { + RecordWriterManager writerManager = new RecordWriterManager(catalog, "test_file_name", 1000, 3); + Namespace newNamespace = Namespace.of("new_namespace"); + TableIdentifier identifier = TableIdentifier.of(newNamespace, testName.getMethodName()); + WindowedValue dest = + WindowedValues.valueInGlobalWindow( + IcebergDestination.builder() + .setFileFormat(FileFormat.PARQUET) + .setTableIdentifier(identifier) + .build()); + + Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "aaa", true).build(); + + assertFalse(catalog.namespaceExists(newNamespace)); + boolean writeSuccess = writerManager.write(dest, row); + assertTrue(writeSuccess); + assertTrue(catalog.namespaceExists(newNamespace)); } @Test diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java index 27aa92b40069..eb3ebfbf5219 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/BigQueryMetastoreCatalogIT.java @@ -38,39 +38,22 @@ import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; -import org.junit.AfterClass; -import org.junit.BeforeClass; import org.junit.Test; public class BigQueryMetastoreCatalogIT extends IcebergCatalogBaseIT { - private static final BigqueryClient BQ_CLIENT = new BigqueryClient("BigQueryMetastoreCatalogIT"); static final String BQMS_CATALOG = "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog"; - static final String DATASET = "managed_iceberg_bqms_tests_" + System.nanoTime();; - private long salt = System.nanoTime(); - - @BeforeClass - public static void createDataset() throws IOException, InterruptedException { - BQ_CLIENT.createNewDataset(OPTIONS.getProject(), DATASET); - } - - @AfterClass - public static void deleteDataset() { - BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET); - } @Override - public String tableId() { - return DATASET + "." + testName.getMethodName() + "_" + salt; + public String type() { + return "bqms"; } @Override public Catalog createCatalog() { - salt += System.nanoTime(); return CatalogUtil.loadCatalog( BQMS_CATALOG, - "bqms_" + catalogName, + catalogName, ImmutableMap.builder() .put("gcp_project", OPTIONS.getProject()) .put("gcp_location", "us-central1") @@ -79,16 +62,6 @@ public Catalog createCatalog() { new Configuration()); } - @Override - public void catalogCleanup() { - for (TableIdentifier tableIdentifier : catalog.listTables(Namespace.of(DATASET))) { - // only delete tables that were created in this test run - if (tableIdentifier.name().contains(String.valueOf(salt))) { - catalog.dropTable(tableIdentifier); - } - } - } - @Override public Map managedIcebergConfig(String tableId) { return ImmutableMap.builder() @@ -108,6 +81,14 @@ public Map managedIcebergConfig(String tableId) { @Test public void testWriteToPartitionedAndValidateWithBQQuery() throws IOException, InterruptedException { + // querying with the client seems to work only when the dataset + // is created by the client (not iceberg) + BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName()); + String newNamespace = namespace() + "_new"; + namespacesToCleanup.add(newNamespace); + bqClient.createNewDataset(OPTIONS.getProject(), newNamespace); + String tableId = newNamespace + ".test_table"; + // For an example row where bool_field=true, modulo_5=3, str=value_303, // this partition spec will create a partition like: // /bool_field=true/modulo_5=3/str_trunc=value_3/ @@ -117,17 +98,17 @@ public void testWriteToPartitionedAndValidateWithBQQuery() .hour("datetime") .truncate("str", "value_x".length()) .build(); - catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA, partitionSpec); + catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA, partitionSpec); // Write with Beam - Map config = managedIcebergConfig(tableId()); + Map config = managedIcebergConfig(tableId); PCollection input = pipeline.apply(Create.of(inputRows)).setRowSchema(BEAM_SCHEMA); input.apply(Managed.write(Managed.ICEBERG).withConfig(config)); pipeline.run().waitUntilFinish(); // Fetch records using a BigQuery query and validate - BigqueryClient bqClient = new BigqueryClient(getClass().getSimpleName()); - String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableId()); + + String query = String.format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableId); List rows = bqClient.queryUnflattened(query, OPTIONS.getProject(), true, true); List beamRows = rows.stream() @@ -137,7 +118,7 @@ public void testWriteToPartitionedAndValidateWithBQQuery() assertThat(beamRows, containsInAnyOrder(inputRows.toArray())); String queryByPartition = - String.format("SELECT bool_field, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId()); + String.format("SELECT bool_field, datetime FROM `%s.%s`", OPTIONS.getProject(), tableId); rows = bqClient.queryUnflattened(queryByPartition, OPTIONS.getProject(), true, true); RowFilter rowFilter = new RowFilter(BEAM_SCHEMA).keep(Arrays.asList("bool_field", "datetime")); beamRows = diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java index 8b731c001ad1..05b1b475d23c 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HadoopCatalogIT.java @@ -17,18 +17,19 @@ */ package org.apache.beam.sdk.io.iceberg.catalog; -import java.io.IOException; -import java.util.List; import java.util.Map; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.catalog.Catalog; -import org.apache.iceberg.catalog.Namespace; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hadoop.HadoopCatalog; public class HadoopCatalogIT extends IcebergCatalogBaseIT { + @Override + public String type() { + return "hadoop"; + } + @Override public Catalog createCatalog() { Configuration catalogHadoopConf = new Configuration(); @@ -37,21 +38,11 @@ public Catalog createCatalog() { HadoopCatalog catalog = new HadoopCatalog(); catalog.setConf(catalogHadoopConf); - catalog.initialize("hadoop_" + catalogName, ImmutableMap.of("warehouse", warehouse)); + catalog.initialize(catalogName, ImmutableMap.of("warehouse", warehouse)); return catalog; } - @Override - public void catalogCleanup() throws IOException { - HadoopCatalog hadoopCatalog = (HadoopCatalog) catalog; - List tables = hadoopCatalog.listTables(Namespace.of(testName.getMethodName())); - for (TableIdentifier identifier : tables) { - hadoopCatalog.dropTable(identifier); - } - hadoopCatalog.close(); - } - @Override public Map managedIcebergConfig(String tableId) { return ImmutableMap.builder() diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java index 4c3a620bebc3..30e10e80270d 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/HiveCatalogIT.java @@ -17,14 +17,11 @@ */ package org.apache.beam.sdk.io.iceberg.catalog; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.beam.sdk.io.iceberg.catalog.hiveutils.HiveMetastoreExtension; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -41,15 +38,10 @@ */ public class HiveCatalogIT extends IcebergCatalogBaseIT { private static HiveMetastoreExtension hiveMetastoreExtension; - private long salt = System.nanoTime(); - - private String testDb() { - return "test_db_" + testName.getMethodName(); - } @Override - public String tableId() { - return String.format("%s.%s", testDb(), "test_table" + "_" + salt); + public String type() { + return "hive"; } @BeforeClass @@ -65,36 +57,17 @@ public static void tearDown() throws Exception { } } - @Override - public void catalogSetup() throws Exception { - String dbPath = hiveMetastoreExtension.metastore().getDatabasePath(testDb()); - Database db = new Database(testDb(), "description", dbPath, Maps.newHashMap()); - hiveMetastoreExtension.metastoreClient().createDatabase(db); - } - @Override public Catalog createCatalog() { - salt += System.nanoTime(); return CatalogUtil.loadCatalog( HiveCatalog.class.getName(), - "hive_" + catalogName, + catalogName, ImmutableMap.of( CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS, String.valueOf(TimeUnit.SECONDS.toMillis(10))), hiveMetastoreExtension.hiveConf()); } - @Override - public void catalogCleanup() throws Exception { - if (hiveMetastoreExtension != null) { - List tables = hiveMetastoreExtension.metastoreClient().getAllTables(testDb()); - for (String table : tables) { - hiveMetastoreExtension.metastoreClient().dropTable(testDb(), table, true, false); - } - hiveMetastoreExtension.metastoreClient().dropDatabase(testDb()); - } - } - @Override public Map managedIcebergConfig(String tableId) { String metastoreUri = hiveMetastoreExtension.hiveConf().getVar(HiveConf.ConfVars.METASTOREURIS); @@ -102,6 +75,7 @@ public Map managedIcebergConfig(String tableId) { Map confProperties = ImmutableMap.builder() .put(HiveConf.ConfVars.METASTOREURIS.varname, metastoreUri) + .put(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, warehouse) .build(); return ImmutableMap.builder() diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java index e4b91fa4014a..cfb8acf4ee17 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/IcebergCatalogBaseIT.java @@ -26,7 +26,9 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; import com.google.api.services.storage.model.StorageObject; import java.io.IOException; @@ -64,11 +66,13 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.RowFilter; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.AppendFiles; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.FileScanTask; @@ -77,6 +81,8 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.data.IdentityPartitionConverters; import org.apache.iceberg.data.Record; @@ -126,7 +132,7 @@ * *
    *
  • {@link #catalogSetup()} - *
  • {@link #catalogCleanup()} + *
  • {@link #catalogCleanup(List)} *
* *

1,000 records are used for each test by default. You can change this by overriding {@link @@ -139,16 +145,33 @@ public abstract class IcebergCatalogBaseIT implements Serializable { public abstract Map managedIcebergConfig(String tableId); - public void catalogSetup() throws Exception {} + public abstract String type(); - public void catalogCleanup() throws Exception {} + public void catalogSetup() { + ((SupportsNamespaces) catalog).createNamespace(Namespace.of(namespace())); + } + + public void catalogCleanup(List namespaces) throws IOException { + for (Namespace namespace : namespaces) { + for (TableIdentifier identifier : catalog.listTables(namespace)) { + catalog.dropTable(identifier); + } + if (catalog instanceof SupportsNamespaces) { + ((SupportsNamespaces) catalog).dropNamespace(namespace); + } + } + } public Integer numRecords() { return OPTIONS.getRunner().equals(DirectRunner.class) ? 10 : 1000; } + public String namespace() { + return catalogName + "_" + testName.getMethodName(); + } + public String tableId() { - return testName.getMethodName() + ".test_table"; + return namespace() + ".test_table"; } public static String warehouse(Class testClass) { @@ -157,32 +180,23 @@ public static String warehouse(Class testClass) TestPipeline.testingPipelineOptions().getTempLocation(), testClass.getSimpleName(), RANDOM); } - public String catalogName = "test_catalog_" + System.nanoTime(); + public String catalogName = type() + "_test_catalog_" + System.currentTimeMillis(); @Before public void setUp() throws Exception { - catalogName += System.nanoTime(); OPTIONS.as(DirectOptions.class).setTargetParallelism(1); - warehouse = - String.format( - "%s/%s/%s", - TestPipeline.testingPipelineOptions().getTempLocation(), - getClass().getSimpleName(), - RANDOM); warehouse = warehouse(getClass()); - catalogSetup(); + namespacesToCleanup.add(namespace()); catalog = createCatalog(); + catalogSetup(); Thread.sleep(SETUP_TEARDOWN_SLEEP_MS); } @After public void cleanUp() throws Exception { - try { - catalogCleanup(); - Thread.sleep(SETUP_TEARDOWN_SLEEP_MS); - } catch (Exception e) { - LOG.warn("Catalog cleanup failed.", e); - } + catalogCleanup(namespacesToCleanup.stream().map(Namespace::of).collect(Collectors.toList())); + LOG.info("Successfully cleaned up namespaces: {}", namespacesToCleanup); + Thread.sleep(SETUP_TEARDOWN_SLEEP_MS); try { GcsUtil gcsUtil = OPTIONS.as(GcsOptions.class).getGcsUtil(); @@ -213,6 +227,7 @@ public void cleanUp() throws Exception { } protected static String warehouse; + protected List namespacesToCleanup = new ArrayList<>(); public Catalog catalog; protected static final GcpOptions OPTIONS = TestPipeline.testingPipelineOptions().as(GcpOptions.class); @@ -851,6 +866,107 @@ public void testStreamToPartitionedDynamicDestinations() throws IOException { writeToDynamicDestinations(null, true, true); } + @Test + public void testWriteToDynamicNamespaces() throws IOException { + // run this test only on catalogs that support namespace management + assumeTrue(catalog instanceof SupportsNamespaces); + + String tableIdentifierTemplate = namespace() + "_{modulo_5}.table_{bool_field}"; + Map writeConfig = new HashMap<>(managedIcebergConfig(tableIdentifierTemplate)); + // override with table template + writeConfig.put("table", tableIdentifierTemplate); + + Namespace namespace0 = Namespace.of(namespace() + "_0"); + Namespace namespace1 = Namespace.of(namespace() + "_1"); + Namespace namespace2 = Namespace.of(namespace() + "_2"); + Namespace namespace3 = Namespace.of(namespace() + "_3"); + Namespace namespace4 = Namespace.of(namespace() + "_4"); + + TableIdentifier tableId0true = TableIdentifier.of(namespace0, "table_true"); + TableIdentifier tableId0false = TableIdentifier.of(namespace0, "table_false"); + TableIdentifier tableId1true = TableIdentifier.of(namespace1, "table_true"); + TableIdentifier tableId1false = TableIdentifier.of(namespace1, "table_false"); + TableIdentifier tableId2true = TableIdentifier.of(namespace2, "table_true"); + TableIdentifier tableId2false = TableIdentifier.of(namespace2, "table_false"); + TableIdentifier tableId3true = TableIdentifier.of(namespace3, "table_true"); + TableIdentifier tableId3false = TableIdentifier.of(namespace3, "table_false"); + TableIdentifier tableId4true = TableIdentifier.of(namespace4, "table_true"); + TableIdentifier tableId4false = TableIdentifier.of(namespace4, "table_false"); + + List namespaces = + Arrays.asList(namespace0, namespace1, namespace2, namespace3, namespace4); + SupportsNamespaces sN = (SupportsNamespaces) catalog; + // assert namespace don't exist beforehand + namespaces.forEach(n -> assertFalse(sN.namespaceExists(n))); + + pipeline + .apply(Create.of(inputRows)) + .setRowSchema(BEAM_SCHEMA) + .apply(Managed.write(ICEBERG).withConfig(writeConfig)); + pipeline.run().waitUntilFinish(); + + // assert namespace were created + namespaces.forEach(n -> assertTrue(sN.namespaceExists(n))); + + Table table0true = catalog.loadTable(tableId0true); + Table table0false = catalog.loadTable(tableId0false); + Table table1true = catalog.loadTable(tableId1true); + Table table1false = catalog.loadTable(tableId1false); + Table table2true = catalog.loadTable(tableId2true); + Table table2false = catalog.loadTable(tableId2false); + Table table3true = catalog.loadTable(tableId3true); + Table table3false = catalog.loadTable(tableId3false); + Table table4true = catalog.loadTable(tableId4true); + Table table4false = catalog.loadTable(tableId4false); + + for (Table t : + Arrays.asList( + table0true, + table0false, + table1true, + table1false, + table2true, + table2false, + table3true, + table3false, + table4true, + table4false)) { + assertTrue(t.schema().sameSchema(ICEBERG_SCHEMA)); + } + + // Read back and check records are correct + Map, List> results = + ImmutableMap., List>builder() + .put(KV.of(0L, true), readRecords(table0true)) + .put(KV.of(0L, false), readRecords(table0false)) + .put(KV.of(1L, true), readRecords(table1true)) + .put(KV.of(1L, false), readRecords(table1false)) + .put(KV.of(2L, true), readRecords(table2true)) + .put(KV.of(2L, false), readRecords(table2false)) + .put(KV.of(3L, true), readRecords(table3true)) + .put(KV.of(3L, false), readRecords(table3false)) + .put(KV.of(4L, true), readRecords(table4true)) + .put(KV.of(4L, false), readRecords(table4false)) + .build(); + + for (Map.Entry, List> entry : results.entrySet()) { + long modulo = entry.getKey().getKey(); + boolean bool = entry.getKey().getValue(); + List records = entry.getValue(); + Stream expectedRecords = + inputRows.stream() + .filter( + rec -> + checkStateNotNull(rec.getInt64("modulo_5")) == modulo + && checkStateNotNull(rec.getBoolean("bool_field")) == bool) + .map(RECORD_FUNC::apply); + + assertThat(records, containsInAnyOrder(expectedRecords.toArray())); + } + + namespaces.stream().map(Namespace::toString).forEach(namespacesToCleanup::add); + } + public void runReadBetween(boolean useSnapshotBoundary, boolean streaming) throws Exception { Table table = catalog.createTable(TableIdentifier.parse(tableId()), ICEBERG_SCHEMA);