createWriteOperation(PipelineOptions options);
+ /**
+ * {@inheritDoc}
+ *
+ * By default, does not register any display data. Implementors may override this method
+ * to provide their own display metadata.
+ */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ }
+
/**
* A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
*
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
index 1eee771d1e18..8d87b1627138 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java
@@ -20,6 +20,8 @@
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
+import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData;
import org.joda.time.Instant;
import java.io.IOException;
@@ -52,7 +54,7 @@
* @param Type of elements read by the source.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
-public abstract class Source implements Serializable {
+public abstract class Source implements Serializable, HasDisplayData {
/**
* Checks that this source is valid, before it can be used in a pipeline.
*
@@ -66,6 +68,16 @@ public abstract class Source implements Serializable {
*/
public abstract Coder getDefaultOutputCoder();
+ /**
+ * {@inheritDoc}
+ *
+ * By default, does not register any display data. Implementors may override this method
+ * to provide their own display metadata.
+ */
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ }
+
/**
* The interface that readers of custom input sources must implement.
*
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
index 0980d48b69da..6989bb36af44 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java
@@ -28,6 +28,7 @@
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.MimeTypes;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -339,6 +340,14 @@ public PCollection apply(PInput input) {
return pcol;
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("compressionType", compressionType.toString());
+ if (filepattern != null) {
+ builder.add("filePattern", filepattern);
+ }
+ }
+
@Override
protected Coder getDefaultOutputCoder() {
return coder;
@@ -632,6 +641,21 @@ public PDone apply(PCollection input) {
filenamePrefix, filenameSuffix, shardTemplate, coder)));
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ if (filenamePrefix != null) {
+ builder.add("fileNamePrefix", filenamePrefix);
+ }
+
+ if (filenameSuffix != null && !filenameSuffix.isEmpty()) {
+ builder.add("fileNameSuffix", filenameSuffix);
+ }
+
+ if (numShards != 0) {
+ builder.add("numShards", numShards);
+ }
+ }
+
/**
* Returns the current shard name template string.
*/
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
index bc8ebb88f691..7d2e6532e770 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java
@@ -29,6 +29,7 @@
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.View;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -78,6 +79,13 @@ public PDone apply(PCollection input) {
return createWrite(input, sink.createWriteOperation(options));
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder
+ .add("sink", sink.getClass())
+ .include(sink);
+ }
+
/**
* Returns the {@link Sink} associated with this PTransform.
*/
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
index 9ba76c5e5923..34494c7ef355 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java
@@ -21,6 +21,7 @@
import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation;
import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.Preconditions;
@@ -220,6 +221,21 @@ public void validate(PipelineOptions options) {
public XmlWriteOperation createWriteOperation(PipelineOptions options) {
return new XmlWriteOperation<>(this);
}
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ if (baseOutputFilename != null) {
+ builder.add("filenamePrefix", baseOutputFilename);
+ }
+
+ if (rootElementName != null) {
+ builder.add("rootElement", rootElementName);
+ }
+
+ if (classToBind != null) {
+ builder.add("recordClass", classToBind);
+ }
+ }
}
/**
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
index eae5e8bac884..d359cf13d67d 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java
@@ -21,6 +21,7 @@
import com.google.cloud.dataflow.sdk.coders.JAXBCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.common.base.Preconditions;
import org.codehaus.stax2.XMLInputFactory2;
@@ -217,6 +218,28 @@ public void validate() {
recordClass, "recordClass is null. Use builder method withRecordClass() to set this.");
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("filePattern", getFileOrPatternSpec());
+
+ if (rootElement != null) {
+ builder.add("rootElement", rootElement);
+ }
+
+ if (recordElement != null) {
+ builder.add("recordElement", recordElement);
+ }
+
+ if (recordClass != null) {
+ builder.add("recordClass", recordClass);
+ }
+
+ long minBundleSize = getMinBundleSize();
+ if (minBundleSize != DEFAULT_MIN_BUNDLE_SIZE) {
+ builder.add("minBundleSize", minBundleSize);
+ }
+ }
+
@Override
public Coder getDefaultOutputCoder() {
return JAXBCoder.of(recordClass);
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
index 01a42ce9cf3b..041db5366505 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java
@@ -40,6 +40,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PBegin;
@@ -260,6 +261,19 @@ public void validate(PBegin input) {
}
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("tableId", tableId);
+
+ if (options != null) {
+ builder.add("options", options.toString());
+ }
+
+ if (filter != null) {
+ builder.add("rowFilter", filter.toString());
+ }
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(Read.class)
@@ -427,6 +441,15 @@ Write withBigtableService(BigtableService bigtableService) {
return new Write(options, tableId, bigtableService);
}
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("tableId", tableId);
+
+ if (options != null) {
+ builder.add("options", options.toString());
+ }
+ }
+
@Override
public String toString() {
return MoreObjects.toStringHelper(Write.class)
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
index ec4f08bc561e..48836e9c7c2e 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java
@@ -76,12 +76,12 @@ public void leaveCompositeTransform(TransformTreeNode node) {
public void visitTransform(TransformTreeNode node) {
toFinalize.removeAll(node.getInput().expand());
AppliedPTransform, ?, ?> appliedTransform = getAppliedTransform(node);
+ stepNames.put(appliedTransform, genStepName());
if (node.getInput().expand().isEmpty()) {
rootTransforms.add(appliedTransform);
} else {
for (PValue value : node.getInput().expand()) {
valueToConsumers.get(value).add(appliedTransform);
- stepNames.put(appliedTransform, genStepName());
}
}
}
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
index bfdf73e69fe8..63ec7fcde12c 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java
@@ -88,7 +88,7 @@ public Map asMap() {
public String toString() {
StringBuilder builder = new StringBuilder();
boolean isFirstLine = true;
- for (Map.Entry entry : entries.entrySet()) {
+ for (Item entry : entries.values()) {
if (isFirstLine) {
isFirstLine = false;
} else {
@@ -107,13 +107,18 @@ public String toString() {
*/
public interface Builder {
/**
- * Include display metadata from the specified subcomponent. For example, a {@link ParDo}
+ * Register display metadata from the specified subcomponent. For example, a {@link ParDo}
* transform includes display metadata from the encapsulated {@link DoFn}.
- *
- * @return A builder instance to continue to build in a fluent-style.
*/
Builder include(HasDisplayData subComponent);
+ /**
+ * Register display metadata from the specified subcomponent, using the specified namespace.
+ * For example, a {@link ParDo} transform includes display metadata from the encapsulated
+ * {@link DoFn}.
+ */
+ Builder include(HasDisplayData subComponent, Class> namespace);
+
/**
* Register the given string display metadata. The metadata item will be registered with type
* {@link DisplayData.Type#STRING}, and is identified by the specified key and namespace from
@@ -135,6 +140,13 @@ public interface Builder {
*/
ItemBuilder add(String key, double value);
+ /**
+ * Register the given floating point display metadata. The metadata item will be registered with
+ * type {@link DisplayData.Type#BOOLEAN}, and is identified by the specified key and namespace
+ * from the current transform or component.
+ */
+ ItemBuilder add(String key, boolean value);
+
/**
* Register the given timestamp display metadata. The metadata item will be registered with type
* {@link DisplayData.Type#TIMESTAMP}, and is identified by the specified key and namespace from
@@ -287,7 +299,35 @@ public String getLinkUrl() {
@Override
public String toString() {
- return getValue();
+ return String.format("%s:%s=%s", ns, key, value);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof Item) {
+ Item that = (Item) obj;
+ return Objects.equals(this.ns, that.ns)
+ && Objects.equals(this.key, that.key)
+ && Objects.equals(this.type, that.type)
+ && Objects.equals(this.value, that.value)
+ && Objects.equals(this.shortValue, that.shortValue)
+ && Objects.equals(this.label, that.label)
+ && Objects.equals(this.url, that.url);
+ }
+
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ this.ns,
+ this.key,
+ this.type,
+ this.value,
+ this.shortValue,
+ this.label,
+ this.url);
}
private Item withLabel(String label) {
@@ -313,8 +353,12 @@ public static class Identifier {
private final String ns;
private final String key;
- static Identifier of(Class> namespace, String key) {
- return new Identifier(namespace.getName(), key);
+ public static Identifier of(Class> namespace, String key) {
+ return of(namespace.getName(), key);
+ }
+
+ public static Identifier of(String namespace, String key) {
+ return new Identifier(namespace, key);
}
private Identifier(String ns, String key) {
@@ -355,7 +399,7 @@ public String toString() {
/**
* Display metadata type.
*/
- enum Type {
+ public enum Type {
STRING {
@Override
FormattedItemValue format(Object value) {
@@ -374,6 +418,12 @@ FormattedItemValue format(Object value) {
return new FormattedItemValue(Double.toString((Double) value));
}
},
+ BOOLEAN() {
+ @Override
+ FormattedItemValue format(Object value) {
+ return new FormattedItemValue(Boolean.toString((boolean) value));
+ }
+ },
TIMESTAMP() {
@Override
FormattedItemValue format(Object value) {
@@ -403,7 +453,7 @@ FormattedItemValue format(Object value) {
abstract FormattedItemValue format(Object value);
}
- private static class FormattedItemValue {
+ static class FormattedItemValue {
private final String shortValue;
private final String longValue;
@@ -416,11 +466,11 @@ private FormattedItemValue(String longValue, String shortValue) {
this.shortValue = shortValue;
}
- private String getLongValue () {
+ String getLongValue() {
return this.longValue;
}
- private String getShortValue() {
+ String getShortValue() {
return this.shortValue;
}
}
@@ -446,11 +496,17 @@ private static InternalBuilder forRoot(HasDisplayData instance) {
@Override
public Builder include(HasDisplayData subComponent) {
+ checkNotNull(subComponent);
+ return include(subComponent, subComponent.getClass());
+ }
+
+ @Override
+ public Builder include(HasDisplayData subComponent, Class> namespace) {
checkNotNull(subComponent);
boolean newComponent = visited.add(subComponent);
if (newComponent) {
Class prevNs = this.latestNs;
- this.latestNs = subComponent.getClass();
+ this.latestNs = namespace;
subComponent.populateDisplayData(this);
this.latestNs = prevNs;
}
@@ -474,6 +530,11 @@ public ItemBuilder add(String key, double value) {
return addItem(key, Type.FLOAT, value);
}
+ @Override
+ public ItemBuilder add(String key, boolean value) {
+ return addItem(key, Type.BOOLEAN, value);
+ }
+
@Override
public ItemBuilder add(String key, Instant value) {
checkNotNull(value);
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
index fd77f9658fd2..8b1a8bc90cd2 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java
@@ -281,7 +281,7 @@ protected Trigger(@Nullable List> subTriggers) {
/**
- * Called immediately after an element is first incorporated into a window.
+ * Called every time an element is incorporated into a window.
*/
public abstract void onElement(OnElementContext c) throws Exception;
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
index f0a9b1a1512d..c71be1919f62 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java
@@ -17,6 +17,7 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -29,6 +30,7 @@
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.base.MoreObjects;
@@ -222,6 +224,35 @@ public void testAvroSinkWrite() throws Exception {
}
}
+ @Test
+ public void testReadDisplayData() {
+ AvroIO.Read.Bound> read = AvroIO.Read
+ .from("foo.*")
+ .withSchema(GenericClass.class);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+ assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
+ }
+
+ @Test
+ public void testWriteDisplayData() {
+ AvroIO.Write.Bound> write = AvroIO.Write
+ .to("foo")
+ .withSuffix("bar")
+ .withSchema(GenericClass.class)
+ .withNumShards(100);
+
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("fileNamePrefix", "foo"));
+ assertThat(displayData, hasDisplayItem("fileNameSuffix", "bar"));
+ assertThat(displayData, hasDisplayItem("schema", GenericClass.class));
+ assertThat(displayData, hasDisplayItem("numShards", 100));
+ }
+
// TODO: for Write only, test withSuffix, withNumShards,
// withShardNameTemplate and withoutSharding.
}
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java
index 775f5c3539bc..db26546d12dd 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java
@@ -17,6 +17,7 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -29,6 +30,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.testing.SourceTestUtils;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.common.base.MoreObjects;
import org.apache.avro.Schema;
@@ -507,6 +509,20 @@ public void testSeekerFindAllLocations() {
}
}
+ @Test
+ public void testDisplayData() {
+ AvroSource source = AvroSource
+ .from("foobar.txt")
+ .withSchema(Bird.class)
+ .withMinBundleSize(1234);
+
+ DisplayData displayData = DisplayData.from(source);
+ assertThat(displayData, hasDisplayItem("filePattern", "foobar.txt"));
+ assertThat(displayData,
+ hasDisplayItem("schema", ReflectData.get().getSchema(Bird.class).toString()));
+ assertThat(displayData, hasDisplayItem("minBundleSize", 1234));
+ }
+
/**
* Class that will encode to a fixed size: 16 bytes.
*
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java
index 51c65563790a..12829c79fe5e 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java
@@ -17,8 +17,10 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import com.google.api.client.util.Data;
import com.google.api.services.bigquery.model.TableReference;
@@ -34,6 +36,7 @@
import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import org.hamcrest.Matchers;
@@ -222,6 +225,22 @@ public void testBuildSourceWithTableAndFlatten() {
p.run();
}
+ @Test
+ public void testBuildSourceDisplayData() {
+ String tableSpec = "project:dataset.tableid";
+
+ BigQueryIO.Read.Bound read = BigQueryIO.Read
+ .from(tableSpec)
+ .fromQuery("myQuery")
+ .withoutResultFlattening();
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("tableSpec", tableSpec));
+ assertThat(displayData, hasDisplayItem("query", "myQuery"));
+ assertThat(displayData, hasDisplayItem("flattenResults", false));
+ }
+
@Test
public void testBuildSink() {
BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
@@ -334,6 +353,27 @@ public void testBuildSinkWithWriteDispositionEmpty() {
null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
}
+ @Test
+ public void testBuildSinkDisplayData() {
+ String tableSpec = "project:dataset.table";
+ TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2");
+
+ BigQueryIO.Write.Bound write = BigQueryIO.Write
+ .to(tableSpec)
+ .withSchema(schema)
+ .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
+ .withWriteDisposition(WriteDisposition.WRITE_APPEND);
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("tableSpec", tableSpec));
+ assertThat(displayData, hasDisplayItem("schema", schema.toString()));
+ assertThat(displayData,
+ hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString()));
+ assertThat(displayData,
+ hasDisplayItem("writeDisposition", WriteDisposition.WRITE_APPEND.toString()));
+ }
+
private void testWriteValidatesDataset(boolean streaming) {
BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class);
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java
index 7cac67a20069..86413eeb0721 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java
@@ -17,6 +17,7 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@@ -29,6 +30,7 @@
import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -38,13 +40,14 @@
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/** Unit tests for {@link BoundedReadFromUnboundedSource}. */
@RunWith(JUnit4.class)
-public class BoundedReadFromUnboundedSourceTest {
+public class BoundedReadFromUnboundedSourceTest implements Serializable{
private static final int NUM_RECORDS = 100;
private static List finalizeTracker = null;
@@ -66,6 +69,19 @@ public void testTimeBound() throws Exception {
test(false, true);
}
+ @Test
+ public void testForwardsDisplayData() {
+ TestCountingSource src = new TestCountingSource(1234) {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+
+ BoundedReadFromUnboundedSource> read = Read.from(src).withMaxNumRecords(5);
+ assertThat(DisplayData.from(read), includes(src));
+ }
+
private static class Checker
implements SerializableFunction>, Void> {
private final boolean dedup;
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
index 3de0513d18f8..a68a02ba8a0c 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java
@@ -17,9 +17,13 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import com.google.cloud.dataflow.sdk.Pipeline;
@@ -32,6 +36,7 @@
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.SourceTestUtils;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.io.Files;
import com.google.common.primitives.Bytes;
@@ -331,6 +336,26 @@ public void testCompressedReadMultipleFiles() throws Exception {
p.run();
}
+ @Test
+ public void testDisplayData() {
+ ByteSource inputSource = new ByteSource("foobar.txt", 1) {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+
+ CompressedSource> compressedSource = CompressedSource.from(inputSource);
+ CompressedSource> gzipSource = compressedSource.withDecompression(CompressionMode.GZIP);
+
+ DisplayData compressedSourceDisplayData = DisplayData.from(compressedSource);
+ DisplayData gzipDisplayData = DisplayData.from(gzipSource);
+
+ assertThat(compressedSourceDisplayData, hasDisplayItem(hasKey("compressionMode")));
+ assertThat(gzipDisplayData, hasDisplayItem("compressionMode", CompressionMode.GZIP.toString()));
+ assertThat(compressedSourceDisplayData, includes(inputSource));
+ }
+
/**
* Generate byte array of given size.
*/
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
index 5a7c2fbac390..65b28ac981d0 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java
@@ -18,6 +18,7 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -30,9 +31,11 @@
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.Max;
import com.google.cloud.dataflow.sdk.transforms.Min;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.PCollection;
import org.joda.time.Duration;
@@ -75,6 +78,13 @@ public void testBoundedInput() {
p.run();
}
+ @Test
+ public void testBoundedDisplayData() {
+ PTransform, ?> input = CountingInput.upTo(1234);
+ DisplayData displayData = DisplayData.from(input);
+ assertThat(displayData, hasDisplayItem("upTo", 1234));
+ }
+
@Test
@Category(RunnableOnService.class)
public void testUnboundedInput() {
@@ -138,6 +148,28 @@ public void testUnboundedInputTimestamps() {
p.run();
}
+ @Test
+ public void testUnboundedDisplayData() {
+ Duration maxReadTime = Duration.standardHours(5);
+ SerializableFunction timestampFn = new SerializableFunction() {
+ @Override
+ public Instant apply(Long input) {
+ return Instant.now();
+ }
+ };
+
+ PTransform, ?> input = CountingInput.unbounded()
+ .withMaxNumRecords(1234)
+ .withMaxReadTime(maxReadTime)
+ .withTimestampFn(timestampFn);
+
+ DisplayData displayData = DisplayData.from(input);
+
+ assertThat(displayData, hasDisplayItem("maxRecords", 1234));
+ assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+ assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass()));
+ }
+
/**
* A timestamp function that uses the given value as the timestamp. Because the input values will
* not wrap, this function is non-decreasing and meets the timestamp function criteria laid out
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java
index 41967409d0b8..891895d7c3f9 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java
@@ -18,6 +18,7 @@
package com.google.cloud.dataflow.sdk.io;
import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
@@ -55,6 +56,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.TestCredential;
import com.google.common.collect.Lists;
@@ -194,6 +196,22 @@ public void testSourceValidationSucceedsNamespace() throws Exception {
source.validate();
}
+ @Test
+ public void testSourceDipslayData() {
+ DatastoreIO.Source source = DatastoreIO.source()
+ .withDataset(DATASET)
+ .withQuery(QUERY)
+ .withHost(HOST)
+ .withNamespace(NAMESPACE);
+
+ DisplayData displayData = DisplayData.from(source);
+
+ assertThat(displayData, hasDisplayItem("dataset", DATASET));
+ assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
+ assertThat(displayData, hasDisplayItem("host", HOST));
+ assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+ }
+
@Test
public void testSinkDoesNotAllowNullHost() throws Exception {
thrown.expect(NullPointerException.class);
@@ -226,6 +244,18 @@ public void testSinkValidationSucceedsWithDataset() throws Exception {
sink.validate(testPipelineOptions(null));
}
+ @Test
+ public void testSinkDipslayData() {
+ DatastoreIO.Sink sink = DatastoreIO.sink()
+ .withDataset(DATASET)
+ .withHost(HOST);
+
+ DisplayData displayData = DisplayData.from(sink);
+
+ assertThat(displayData, hasDisplayItem("dataset", DATASET));
+ assertThat(displayData, hasDisplayItem("host", HOST));
+ }
+
@Test
public void testQuerySplitBasic() throws Exception {
KindExpression mykind = KindExpression.newBuilder().setName("mykind").build();
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java
index f7d81fc50d21..94f9b1b4a2e4 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java
@@ -17,12 +17,16 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
import com.google.api.client.testing.http.FixedClock;
import com.google.api.client.util.Clock;
import com.google.api.services.pubsub.model.PubsubMessage;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
+import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
@@ -231,4 +235,42 @@ public void timestampLabelRfc3339WithTooLargeYearThrowsError() {
// Year 10000 out of range.
parseTimestamp("10000-10-29T23:41:41.123999Z");
}
+
+ @Test
+ public void testReadDisplayData() {
+ String topic = "projects/project/topics/topic";
+ String subscription = "projects/project/subscriptions/subscription";
+ Duration maxReadTime = Duration.standardMinutes(5);
+ PubsubIO.Read.Bound read = PubsubIO.Read
+ .topic(topic)
+ .subscription(subscription)
+ .timestampLabel("myTimestamp")
+ .idLabel("myId")
+ .maxNumRecords(1234)
+ .maxReadTime(maxReadTime);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("topic", topic));
+ assertThat(displayData, hasDisplayItem("subscription", subscription));
+ assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ assertThat(displayData, hasDisplayItem("maxRecords", 1234));
+ assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+ }
+
+ @Test
+ public void testWriteDisplayData() {
+ String topic = "projects/project/topics/topic";
+ PubsubIO.Write.Bound> write = PubsubIO.Write
+ .topic(topic)
+ .timestampLabel("myTimestamp")
+ .idLabel("myId");
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("topic", topic));
+ assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp"));
+ assertThat(displayData, hasDisplayItem("idLabel", "myId"));
+ }
}
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java
index 0564072ca6e4..c2965f5a5ef3 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java
@@ -17,10 +17,16 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes;
+import static org.junit.Assert.assertThat;
+
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
+import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -28,6 +34,7 @@
import org.junit.runners.JUnit4;
import java.io.IOException;
+import java.io.Serializable;
import java.util.List;
import javax.annotation.Nullable;
@@ -36,9 +43,9 @@
* Tests for {@link Read}.
*/
@RunWith(JUnit4.class)
-public class ReadTest {
+public class ReadTest implements Serializable{
@Rule
- public ExpectedException thrown = ExpectedException.none();
+ public transient ExpectedException thrown = ExpectedException.none();
@Test
public void failsWhenCustomBoundedSourceIsNotSerializable() {
@@ -62,6 +69,38 @@ public void succeedsWhenCustomUnboundedSourceIsSerializable() {
Read.from(new SerializableUnboundedSource());
}
+ @Test
+ public void testDisplayData() {
+ SerializableBoundedSource boundedSource = new SerializableBoundedSource() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+ SerializableUnboundedSource unboundedSource = new SerializableUnboundedSource() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+ Duration maxReadTime = Duration.standardMinutes(2345);
+
+ Read.Bounded bounded = Read.from(boundedSource);
+ BoundedReadFromUnboundedSource unbounded = Read.from(unboundedSource)
+ .withMaxNumRecords(1234)
+ .withMaxReadTime(maxReadTime);
+
+ DisplayData boundedDisplayData = DisplayData.from(bounded);
+ assertThat(boundedDisplayData, hasDisplayItem("source", boundedSource.getClass()));
+ assertThat(boundedDisplayData, includes(boundedSource));
+
+ DisplayData unboundedDisplayData = DisplayData.from(unbounded);
+ assertThat(unboundedDisplayData, hasDisplayItem("source", unboundedSource.getClass()));
+ assertThat(unboundedDisplayData, includes(unboundedSource));
+ assertThat(unboundedDisplayData, hasDisplayItem("maxRecords", 1234));
+ assertThat(unboundedDisplayData, hasDisplayItem("maxReadTime", maxReadTime));
+ }
+
private abstract static class CustomBoundedSource extends BoundedSource {
@Override
public List extends BoundedSource> splitIntoBundles(
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
index 0bd6ce76274e..d167d9f4fae9 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java
@@ -21,6 +21,7 @@
import static com.google.cloud.dataflow.sdk.TestUtils.LINES_ARRAY;
import static com.google.cloud.dataflow.sdk.TestUtils.NO_INTS_ARRAY;
import static com.google.cloud.dataflow.sdk.TestUtils.NO_LINES_ARRAY;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -41,6 +42,7 @@
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.GcsUtil;
import com.google.cloud.dataflow.sdk.util.TestCredential;
@@ -198,6 +200,18 @@ public void testReadNamed() throws Exception {
}
}
+ @Test
+ public void testReadDisplayData() {
+ TextIO.Read.Bound> read = TextIO.Read
+ .from("foo.*")
+ .withCompressionType(CompressionType.BZIP2);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("filePattern", "foo.*"));
+ assertThat(displayData, hasDisplayItem("compressionType", CompressionType.BZIP2.toString()));
+ }
+
void runTestWrite(T[] elems, Coder coder) throws Exception {
File tmpFile = tmpFolder.newFile("file.txt");
String filename = tmpFile.getPath();
@@ -285,6 +299,21 @@ public void testWriteNamed() {
}
}
+ @Test
+ public void testWriteDisplayData() {
+ TextIO.Write.Bound> write = TextIO.Write
+ .to("foo")
+ .withSuffix("bar")
+ .withNumShards(100);
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("fileNamePrefix", "foo"));
+ assertThat(displayData, hasDisplayItem("fileNameSuffix", "bar"));
+ assertThat(displayData, hasDisplayItem("numShards", 100));
+
+ }
+
@Test
public void testUnsupportedFilePattern() throws IOException {
File outFolder = tmpFolder.newFolder();
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java
index da487729632f..671dee88bced 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java
@@ -17,6 +17,8 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
@@ -35,6 +37,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -91,6 +94,21 @@ public void testWriteWindowed() {
runWrite(inputs, /* windowed */ true);
}
+ @Test
+ public void testDisplayData() {
+ TestSink sink = new TestSink() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+ Write.Bound write = Write.to(sink);
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
+ assertThat(displayData, includes(sink));
+ }
+
/**
* Performs a Write transform and verifies the Write transform calls the appropriate methods on
* a test sink in the correct order, as well as verifies that the elements of a PCollection are
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java
index 3ba720b341b7..24c6d510e4d1 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java
@@ -17,13 +17,16 @@
*/
package com.google.cloud.dataflow.sdk.io;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import com.google.cloud.dataflow.sdk.io.XmlSink.XmlWriteOperation;
import com.google.cloud.dataflow.sdk.io.XmlSink.XmlWriter;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.common.collect.Lists;
import org.junit.Rule;
@@ -161,6 +164,19 @@ public void testCreateWriter() throws Exception {
assertNotNull(writer.marshaller);
}
+ @Test
+ public void testDisplayData() {
+ XmlSink.Bound sink = XmlSink.write()
+ .toFilenamePrefix("foobar")
+ .withRootElement("bird")
+ .ofRecordClass(Integer.class);
+
+ DisplayData displayData = DisplayData.from(sink);
+ assertThat(displayData, hasDisplayItem("filenamePrefix", "foobar"));
+ assertThat(displayData, hasDisplayItem("rootElement", "bird"));
+ assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
+ }
+
/**
* Write a bundle with an XmlWriter and verify the output is expected.
*/
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java
index 41e28870783e..e11d435a8f01 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java
@@ -20,6 +20,7 @@
import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.containsString;
@@ -33,6 +34,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.common.collect.ImmutableList;
@@ -822,4 +824,23 @@ public void testReadXMLFilePattern() throws IOException {
DataflowAssert.that(output).containsInAnyOrder(expectedResults);
p.run();
}
+
+ @Test
+ public void testDisplayData() {
+
+
+ XmlSource> source = XmlSource
+ .from("foo.xml")
+ .withRootElement("bird")
+ .withRecordElement("cat")
+ .withMinBundleSize(1234)
+ .withRecordClass(Integer.class);
+ DisplayData displayData = DisplayData.from(source);
+
+ assertThat(displayData, hasDisplayItem("filePattern", "foo.xml"));
+ assertThat(displayData, hasDisplayItem("rootElement", "bird"));
+ assertThat(displayData, hasDisplayItem("recordElement", "cat"));
+ assertThat(displayData, hasDisplayItem("recordClass", Integer.class));
+ assertThat(displayData, hasDisplayItem("minBundleSize", 1234));
+ }
}
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java
index b8ac24356126..bb63041e4b0a 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java
@@ -21,6 +21,8 @@
import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verifyNotNull;
import static org.hamcrest.Matchers.hasSize;
@@ -46,6 +48,7 @@
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
@@ -404,6 +407,26 @@ public void testReadingWithFilterAndSubSplits() throws Exception {
assertSourcesEqualReferenceSource(source, splits, null /* options */);
}
+ @Test
+ public void testReadingDisplayData() {
+ RowFilter rowFilter = RowFilter.newBuilder()
+ .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
+ .build();
+
+ BigtableIO.Read read = BigtableIO.read()
+ .withBigtableOptions(BIGTABLE_OPTIONS)
+ .withTableId("fooTable")
+ .withRowFilter(rowFilter);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+ assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
+
+ // BigtableIO adds user-agent to options; assert only on key and not value.
+ assertThat(displayData, hasDisplayItem(hasKey("options")));
+ }
+
/** Tests that a record gets written to the service and messages are logged. */
@Test
public void testWriting() throws Exception {
@@ -460,6 +483,19 @@ public void testWritingFailsBadElement() throws Exception {
p.run();
}
+ @Test
+ public void testWritingDisplayData() {
+ BigtableIO.Write write = BigtableIO.write()
+ .withTableId("fooTable")
+ .withBigtableOptions(BIGTABLE_OPTIONS);
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
+ // BigtableIO adds user-agent to options; assert only on key and not value.
+ assertThat(displayData, hasDisplayItem(hasKey("options")));
+ }
+
////////////////////////////////////////////////////////////////////////////////////////////
private static final String COLUMN_FAMILY_NAME = "family";
private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
index bea6fe1bd6f5..905f58f8714e 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java
@@ -176,6 +176,43 @@ public PDone apply(PInput input) {
assertThat(visitor.getUnfinalizedPValues(), emptyIterable());
}
+ @Test
+ public void getStepNamesContainsAllTransforms() {
+ PCollection created = p.apply(Create.of("1", "2", "3"));
+ PCollection transformed =
+ created.apply(
+ ParDo.of(
+ new DoFn() {
+ @Override
+ public void processElement(DoFn.ProcessContext c)
+ throws Exception {
+ c.output(Integer.toString(c.element().length()));
+ }
+ }));
+ PDone finished =
+ transformed.apply(
+ new PTransform() {
+ @Override
+ public PDone apply(PInput input) {
+ return PDone.in(input.getPipeline());
+ }
+ });
+
+ p.traverseTopologically(visitor);
+ assertThat(
+ visitor.getStepNames(),
+ Matchers., String>hasEntry(
+ created.getProducingTransformInternal(), "s0"));
+ assertThat(
+ visitor.getStepNames(),
+ Matchers., String>hasEntry(
+ transformed.getProducingTransformInternal(), "s1"));
+ assertThat(
+ visitor.getStepNames(),
+ Matchers., String>hasEntry(
+ finished.getProducingTransformInternal(), "s2"));
+ }
+
@Test
public void traverseMultipleTimesThrows() {
p.apply(Create.of(1, 2, 3));
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
index fde2cb43bb20..e1faf1b8744f 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java
@@ -73,7 +73,6 @@
import java.util.Collection;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -90,7 +89,8 @@ public class InProcessEvaluationContextTest {
private PCollection> downstream;
private PCollectionView> view;
private PCollection unbounded;
-
+ private Collection> rootTransforms;
+ private Map>> valueToConsumers;
@Before
public void setup() {
@@ -103,32 +103,20 @@ public void setup() {
downstream = created.apply(WithKeys.of("foo"));
view = created.apply(View.asIterable());
unbounded = p.apply(CountingInput.unbounded());
- Collection> rootTransforms =
- ImmutableList.>of(
- created.getProducingTransformInternal(), unbounded.getProducingTransformInternal());
- Map>> valueToConsumers = new HashMap<>();
- valueToConsumers.put(
- created,
- ImmutableList.>of(
- downstream.getProducingTransformInternal(), view.getProducingTransformInternal()));
- valueToConsumers.put(unbounded, ImmutableList.>of());
- valueToConsumers.put(downstream, ImmutableList.>of());
- valueToConsumers.put(view, ImmutableList.>of());
-
- Map, String> stepNames = new HashMap<>();
- stepNames.put(created.getProducingTransformInternal(), "s1");
- stepNames.put(downstream.getProducingTransformInternal(), "s2");
- stepNames.put(view.getProducingTransformInternal(), "s3");
- stepNames.put(unbounded.getProducingTransformInternal(), "s4");
-
- Collection> views = ImmutableList.>of(view);
- context = InProcessEvaluationContext.create(
+
+ ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor();
+ p.traverseTopologically(cVis);
+ rootTransforms = cVis.getRootTransforms();
+ valueToConsumers = cVis.getValueToConsumers();
+
+ context =
+ InProcessEvaluationContext.create(
runner.getPipelineOptions(),
InProcessBundleFactory.create(),
rootTransforms,
valueToConsumers,
- stepNames,
- views);
+ cVis.getStepNames(),
+ cVis.getViews());
}
@Test
@@ -495,16 +483,14 @@ public void isDoneWithPartiallyDone() {
null,
ImmutableList.of(),
StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build());
- context.handleResult(
- committedBundle,
- ImmutableList.of(),
- StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build());
assertThat(context.isDone(), is(false));
- context.handleResult(
- committedBundle,
- ImmutableList.of(),
- StepTransformResult.withoutHold(view.getProducingTransformInternal()).build());
+ for (AppliedPTransform, ?, ?> consumers : valueToConsumers.get(created)) {
+ context.handleResult(
+ committedBundle,
+ ImmutableList.of(),
+ StepTransformResult.withoutHold(consumers).build());
+ }
assertThat(context.isDone(), is(true));
}
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java
index 385bc42c76d9..d540b4b628a9 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java
@@ -17,13 +17,19 @@
*/
package com.google.cloud.dataflow.sdk.transforms.display;
+import static org.hamcrest.Matchers.allOf;
+
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Item;
+import com.google.common.collect.Sets;
+import org.hamcrest.CustomTypeSafeMatcher;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import java.util.Collection;
@@ -43,6 +49,71 @@ public static Matcher hasDisplayItem() {
return hasDisplayItem(Matchers.any(DisplayData.Item.class));
}
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains an item with the
+ * specified key and String value.
+ */
+ public static Matcher hasDisplayItem(String key, String value) {
+ return hasDisplayItem(key, DisplayData.Type.STRING, value);
+ }
+
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains an item with the
+ * specified key and Boolean value.
+ */
+ public static Matcher hasDisplayItem(String key, Boolean value) {
+ return hasDisplayItem(key, DisplayData.Type.BOOLEAN, value);
+ }
+
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains an item with the
+ * specified key and Duration value.
+ */
+ public static Matcher hasDisplayItem(String key, Duration value) {
+ return hasDisplayItem(key, DisplayData.Type.DURATION, value);
+ }
+
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains an item with the
+ * specified key and Float value.
+ */
+ public static Matcher hasDisplayItem(String key, double value) {
+ return hasDisplayItem(key, DisplayData.Type.FLOAT, value);
+ }
+
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains an item with the
+ * specified key and Integer value.
+ */
+ public static Matcher hasDisplayItem(String key, long value) {
+ return hasDisplayItem(key, DisplayData.Type.INTEGER, value);
+ }
+
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains an item with the
+ * specified key and Class value.
+ */
+ public static Matcher hasDisplayItem(String key, Class> value) {
+ return hasDisplayItem(key, DisplayData.Type.JAVA_CLASS, value);
+ }
+
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains an item with the
+ * specified key and Timestamp value.
+ */
+ public static Matcher hasDisplayItem(String key, Instant value) {
+ return hasDisplayItem(key, DisplayData.Type.TIMESTAMP, value);
+ }
+
+ private static Matcher hasDisplayItem(
+ String key, DisplayData.Type type, Object value) {
+ DisplayData.FormattedItemValue formattedValue = type.format(value);
+ return hasDisplayItem(allOf(
+ hasKey(key),
+ hasType(type),
+ hasValue(formattedValue.getLongValue())));
+ }
+
/**
* Creates a matcher that matches if the examined {@link DisplayData} contains any item
* matching the specified {@code itemMatcher}.
@@ -69,13 +140,93 @@ protected boolean matchesSafely(DisplayData data, Description mismatchDescriptio
Collection- items = data.items();
boolean isMatch = Matchers.hasItem(itemMatcher).matches(items);
if (!isMatch) {
- mismatchDescription.appendText("found " + items.size() + " non-matching items");
+ mismatchDescription.appendText("found " + items.size() + " non-matching item(s):\n");
+ mismatchDescription.appendValue(data);
}
return isMatch;
}
}
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains all display data
+ * registered from the specified subcomponent.
+ */
+ public static Matcher includes(final HasDisplayData subComponent) {
+ return includes(subComponent, subComponent.getClass());
+ }
+
+ /**
+ * Create a matcher that matches if the examined {@link DisplayData} contains all display data
+ * registered from the specified subcomponent and namespace.
+ */
+ public static Matcher includes(
+ final HasDisplayData subComponent, final Class extends HasDisplayData> namespace) {
+ return new CustomTypeSafeMatcher("includes subcomponent") {
+ @Override
+ protected boolean matchesSafely(DisplayData displayData) {
+ DisplayData subComponentData = DisplayData.from(subComponent);
+ if (subComponentData.items().size() == 0) {
+ throw new UnsupportedOperationException("subComponent contains no display data; " +
+ "cannot verify whether it is included");
+ }
+
+ DisplayDataComparision comparison = checkSubset(displayData, subComponentData, namespace);
+ return comparison.missingItems.isEmpty();
+ }
+
+
+ @Override
+ protected void describeMismatchSafely(
+ DisplayData displayData, Description mismatchDescription) {
+ DisplayData subComponentDisplayData = DisplayData.from(subComponent);
+ DisplayDataComparision comparison = checkSubset(
+ displayData, subComponentDisplayData, subComponent.getClass());
+
+ mismatchDescription
+ .appendText("did not include:\n")
+ .appendValue(comparison.missingItems)
+ .appendText("\nNon-matching items:\n")
+ .appendValue(comparison.unmatchedItems);
+ }
+
+ private DisplayDataComparision checkSubset(
+ DisplayData displayData, DisplayData included, Class> namespace) {
+ DisplayDataComparision comparison = new DisplayDataComparision(displayData.items());
+ for (Item item : included.items()) {
+ Item matchedItem = displayData.asMap().get(
+ DisplayData.Identifier.of(namespace, item.getKey()));
+
+ if (matchedItem != null) {
+ comparison.matched(matchedItem);
+ } else {
+ comparison.missing(item);
+ }
+ }
+
+ return comparison;
+ }
+
+ class DisplayDataComparision {
+ Collection missingItems;
+ Collection unmatchedItems;
+
+ DisplayDataComparision(Collection
- superset) {
+ missingItems = Sets.newHashSet();
+ unmatchedItems = Sets.newHashSet(superset);
+ }
+
+ void matched(Item supersetItem) {
+ unmatchedItems.remove(supersetItem);
+ }
+
+ void missing(Item subsetItem) {
+ missingItems.add(subsetItem);
+ }
+ }
+ };
+ }
+
/**
* Creates a matcher that matches if the examined {@link DisplayData.Item} contains a key
* with the specified value.
@@ -96,4 +247,32 @@ protected String featureValueOf(DisplayData.Item actual) {
}
};
}
+
+ public static Matcher hasType(DisplayData.Type type) {
+ return hasType(Matchers.is(type));
+ }
+
+ public static Matcher hasType(Matcher typeMatcher) {
+ return new FeatureMatcher(
+ typeMatcher, "with type", "type") {
+ @Override
+ protected DisplayData.Type featureValueOf(DisplayData.Item actual) {
+ return actual.getType();
+ }
+ };
+ }
+
+ public static Matcher hasValue(String value) {
+ return hasValue(Matchers.is(value));
+ }
+
+ public static Matcher hasValue(Matcher valueMatcher) {
+ return new FeatureMatcher(
+ valueMatcher, "with value", "value") {
+ @Override
+ protected String featureValueOf(DisplayData.Item actual) {
+ return actual.getValue();
+ }
+ };
+ }
}
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java
index 3ade92339f53..1b43ff7ea02c 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java
@@ -19,11 +19,13 @@
import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasType;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasValue;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.core.StringStartsWith.startsWith;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Builder;
@@ -46,7 +48,7 @@ public void testHasDisplayItem() {
Matcher matcher = hasDisplayItem();
assertFalse(matcher.matches(DisplayData.none()));
- assertTrue(matcher.matches(createDisplayDataWithItem("foo", "bar")));
+ assertThat(createDisplayDataWithItem("foo", "bar"), matcher);
}
@Test
@@ -59,15 +61,68 @@ public void testHasDisplayItemDescription() {
matcher.describeMismatch(DisplayData.none(), mismatchDesc);
assertThat(desc.toString(), startsWith("display data with item: "));
- assertThat(mismatchDesc.toString(), containsString("found 0 non-matching items"));
+ assertThat(mismatchDesc.toString(), containsString("found 0 non-matching item(s)"));
}
@Test
public void testHasKey() {
Matcher matcher = hasDisplayItem(hasKey("foo"));
- assertTrue(matcher.matches(createDisplayDataWithItem("foo", "bar")));
assertFalse(matcher.matches(createDisplayDataWithItem("fooz", "bar")));
+
+ assertThat(createDisplayDataWithItem("foo", "bar"), matcher);
+ }
+
+ @Test
+ public void testHasType() {
+ Matcher matcher = hasDisplayItem(hasType(DisplayData.Type.JAVA_CLASS));
+
+ DisplayData data = DisplayData.from(new PTransform, PCollection>() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add("foo", DisplayDataMatchersTest.class);
+ }
+ });
+
+ assertFalse(matcher.matches(createDisplayDataWithItem("fooz", "bar")));
+ assertThat(data, matcher);
+ }
+
+ @Test
+ public void testHasValue() {
+ Matcher matcher = hasDisplayItem(hasValue("bar"));
+
+ assertFalse(matcher.matches(createDisplayDataWithItem("foo", "baz")));
+ assertThat(createDisplayDataWithItem("foo", "bar"), matcher);
+ }
+
+ @Test
+ public void testIncludes() {
+ final HasDisplayData subComponent = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+ HasDisplayData hasSubcomponent = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder
+ .include(subComponent)
+ .add("foo2", "bar2");
+ }
+ };
+ HasDisplayData sameKeyDifferentNamespace = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+ Matcher matcher = includes(subComponent);
+
+ assertFalse(matcher.matches(DisplayData.from(sameKeyDifferentNamespace)));
+ assertThat(DisplayData.from(hasSubcomponent), matcher);
+ assertThat(DisplayData.from(subComponent), matcher);
}
private DisplayData createDisplayDataWithItem(final String key, final String value) {
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
index 397e1021244f..4d75e267197a 100644
--- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java
@@ -19,6 +19,9 @@
import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasType;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasValue;
+import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.everyItem;
@@ -92,7 +95,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
.include(subComponent2)
.add("MinSproggles", 200)
.withLabel("Mimimum Required Sproggles")
- .add("LazerOrientation", "NORTH")
+ .add("FireLazers", true)
.add("TimeBomb", Instant.now().plus(Duration.standardDays(1)))
.add("FilterLogic", subComponent1.getClass())
.add("ServiceUrl", "google.com/fizzbang")
@@ -127,12 +130,12 @@ public void testCanBuild() {
DisplayData.from(new HasDisplayData() {
@Override
public void populateDisplayData(DisplayData.Builder builder) {
- builder.add("Foo", "bar");
+ builder.add("foo", "bar");
}
});
assertThat(data.items(), hasSize(1));
- assertThat(data, hasDisplayItem(hasKey("Foo")));
+ assertThat(data, hasDisplayItem("foo", "bar"));
}
@Test
@@ -148,7 +151,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
Map map = data.asMap();
assertEquals(map.size(), 1);
- assertThat(data, hasDisplayItem(hasKey("foo")));
+ assertThat(data, hasDisplayItem("foo", "bar"));
assertEquals(map.values(), data.items());
}
@@ -164,8 +167,8 @@ public void testItemProperties() {
allOf(
hasNamespace(Matchers.>is(ConcreteComponent.class)),
hasKey("now"),
- hasType(is(DisplayData.Type.TIMESTAMP)),
- hasValue(is(ISO_FORMATTER.print(value))),
+ hasType(DisplayData.Type.TIMESTAMP),
+ hasValue(ISO_FORMATTER.print(value)),
hasShortValue(nullValue(String.class)),
hasLabel(is("the current instant")),
hasUrl(is("http://time.gov"))));
@@ -219,14 +222,35 @@ public void populateDisplayData(DisplayData.Builder builder) {
}
});
- assertThat(
- data,
- hasDisplayItem(
- allOf(
- hasKey("foo"),
- hasNamespace(Matchers.>is(subComponent.getClass())))));
+ assertThat(data, includes(subComponent));
+ }
+
+ @Test
+ public void testIncludesNamespaceOverride() {
+ final HasDisplayData subComponent = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+
+ final HasDisplayData namespaceOverride = new HasDisplayData(){
+ @Override
+ public void populateDisplayData(Builder builder) {
+ }
+ };
+
+ DisplayData data = DisplayData.from(new HasDisplayData() {
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.include(subComponent, namespaceOverride.getClass());
+ }
+ });
+
+ assertThat(data, includes(subComponent, namespaceOverride.getClass()));
}
+
@Test
public void testIdentifierEquality() {
new EqualsTester()
@@ -238,6 +262,33 @@ public void testIdentifierEquality() {
.testEquals();
}
+ @Test
+ public void testItemEquality() {
+ HasDisplayData component1 = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+ HasDisplayData component2 = new HasDisplayData() {
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add("foo", "bar");
+ }
+ };
+
+ DisplayData component1DisplayData1 = DisplayData.from(component1);
+ DisplayData component1DisplayData2 = DisplayData.from(component1);
+ DisplayData component2DisplayData = DisplayData.from(component2);
+
+ new EqualsTester()
+ .addEqualityGroup(
+ component1DisplayData1.items().toArray()[0],
+ component1DisplayData2.items().toArray()[0])
+ .addEqualityGroup(component2DisplayData.items().toArray()[0])
+ .testEquals();
+ }
+
@Test
public void testAnonymousClassNamespace() {
DisplayData data =
@@ -404,6 +455,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
.add("string", "foobar")
.add("integer", 123)
.add("float", 3.14)
+ .add("boolean", true)
.add("java_class", DisplayDataTest.class)
.add("timestamp", Instant.now())
.add("duration", Duration.standardHours(1));
@@ -412,18 +464,19 @@ public void populateDisplayData(DisplayData.Builder builder) {
Collection
- items = data.items();
assertThat(
- items, hasItem(allOf(hasKey("string"), hasType(is(DisplayData.Type.STRING)))));
+ items, hasItem(allOf(hasKey("string"), hasType(DisplayData.Type.STRING))));
assertThat(
- items, hasItem(allOf(hasKey("integer"), hasType(is(DisplayData.Type.INTEGER)))));
- assertThat(items, hasItem(allOf(hasKey("float"), hasType(is(DisplayData.Type.FLOAT)))));
+ items, hasItem(allOf(hasKey("integer"), hasType(DisplayData.Type.INTEGER))));
+ assertThat(items, hasItem(allOf(hasKey("float"), hasType(DisplayData.Type.FLOAT))));
+ assertThat(items, hasItem(allOf(hasKey("boolean"), hasType(DisplayData.Type.BOOLEAN))));
assertThat(
items,
- hasItem(allOf(hasKey("java_class"), hasType(is(DisplayData.Type.JAVA_CLASS)))));
+ hasItem(allOf(hasKey("java_class"), hasType(DisplayData.Type.JAVA_CLASS))));
assertThat(
items,
- hasItem(allOf(hasKey("timestamp"), hasType(is(DisplayData.Type.TIMESTAMP)))));
+ hasItem(allOf(hasKey("timestamp"), hasType(DisplayData.Type.TIMESTAMP))));
assertThat(
- items, hasItem(allOf(hasKey("duration"), hasType(is(DisplayData.Type.DURATION)))));
+ items, hasItem(allOf(hasKey("duration"), hasType(DisplayData.Type.DURATION))));
}
@Test
@@ -438,6 +491,7 @@ public void populateDisplayData(DisplayData.Builder builder) {
.add("string", "foobar")
.add("integer", 123)
.add("float", 3.14)
+ .add("boolean", true)
.add("java_class", DisplayDataTest.class)
.add("timestamp", now)
.add("duration", oneHour);
@@ -445,17 +499,13 @@ public void populateDisplayData(DisplayData.Builder builder) {
};
DisplayData data = DisplayData.from(component);
- Collection
- items = data.items();
- assertThat(items, hasItem(allOf(hasKey("string"), hasValue(is("foobar")))));
- assertThat(items, hasItem(allOf(hasKey("integer"), hasValue(is("123")))));
- assertThat(items, hasItem(allOf(hasKey("float"), hasValue(is("3.14")))));
- assertThat(items, hasItem(allOf(hasKey("java_class"),
- hasValue(is(DisplayDataTest.class.getName())),
- hasShortValue(is(DisplayDataTest.class.getSimpleName())))));
- assertThat(items, hasItem(allOf(hasKey("timestamp"),
- hasValue(is(ISO_FORMATTER.print(now))))));
- assertThat(items, hasItem(allOf(hasKey("duration"),
- hasValue(is(Long.toString(oneHour.getMillis()))))));
+ assertThat(data, hasDisplayItem("string", "foobar"));
+ assertThat(data, hasDisplayItem("integer", 123));
+ assertThat(data, hasDisplayItem("float", 3.14));
+ assertThat(data, hasDisplayItem("boolean", true));
+ assertThat(data, hasDisplayItem("java_class", DisplayDataTest.class));
+ assertThat(data, hasDisplayItem("timestamp", now));
+ assertThat(data, hasDisplayItem("duration", oneHour));
}
@Test
@@ -582,16 +632,6 @@ protected Class> featureValueOf(DisplayData.Item actual) {
};
}
- private static Matcher hasType(Matcher typeMatcher) {
- return new FeatureMatcher(
- typeMatcher, "display item with type", "type") {
- @Override
- protected DisplayData.Type featureValueOf(DisplayData.Item actual) {
- return actual.getType();
- }
- };
- }
-
private static Matcher hasLabel(Matcher labelMatcher) {
return new FeatureMatcher(
labelMatcher, "display item with label", "label") {
@@ -612,16 +652,6 @@ protected String featureValueOf(DisplayData.Item actual) {
};
}
- private static Matcher hasValue(Matcher valueMatcher) {
- return new FeatureMatcher(
- valueMatcher, "display item with value", "value") {
- @Override
- protected String featureValueOf(DisplayData.Item actual) {
- return actual.getValue();
- }
- };
- }
-
private static Matcher hasShortValue(Matcher valueStringMatcher) {
return new FeatureMatcher(
valueStringMatcher, "display item with short value", "short value") {