From 27d5825afb2248ea6a30c93b2c112f6f24f700b3 Mon Sep 17 00:00:00 2001 From: Davor Bonaci Date: Mon, 4 Apr 2016 18:07:14 -0700 Subject: [PATCH 1/5] Add coveralls plugin to the Maven build process No binding to the lifecycle. --- pom.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pom.xml b/pom.xml index 3cae1a45d65a..a198724a17fc 100644 --- a/pom.xml +++ b/pom.xml @@ -240,6 +240,13 @@ + + + org.eluder.coveralls + coveralls-maven-plugin + 4.1.0 + + org.apache.maven.plugins maven-surefire-plugin From de2787ab5eea0a6c15c21502e54afa2b9ca9edbd Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 5 Apr 2016 15:11:11 +0200 Subject: [PATCH 2/5] Fix misleading Javadoc on Trigger.onElement() --- .../google/cloud/dataflow/sdk/transforms/windowing/Trigger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java index fd77f9658fd2..8b1a8bc90cd2 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/Trigger.java @@ -281,7 +281,7 @@ protected Trigger(@Nullable List> subTriggers) { /** - * Called immediately after an element is first incorporated into a window. + * Called every time an element is incorporated into a window. */ public abstract void onElement(OnElementContext c) throws Exception; From 014a9a5a2241b4f780efea30c14496961c55041d Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Mon, 28 Mar 2016 13:38:35 -0700 Subject: [PATCH 3/5] DisplayData tweaks based on transform usage. * Add boolean-valued display data. * Implement equality for DislpayData.Item * Add ability to override namespace for included subcomponents. * Additional Matchers for testing display data * Update DisplayData inner class privacy --- .../sdk/transforms/display/DisplayData.java | 85 ++++++-- .../display/DisplayDataMatchers.java | 181 +++++++++++++++++- .../display/DisplayDataMatchersTest.java | 63 +++++- .../transforms/display/DisplayDataTest.java | 128 ++++++++----- 4 files changed, 391 insertions(+), 66 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java index bfdf73e69fe8..63ec7fcde12c 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayData.java @@ -88,7 +88,7 @@ public Map asMap() { public String toString() { StringBuilder builder = new StringBuilder(); boolean isFirstLine = true; - for (Map.Entry entry : entries.entrySet()) { + for (Item entry : entries.values()) { if (isFirstLine) { isFirstLine = false; } else { @@ -107,13 +107,18 @@ public String toString() { */ public interface Builder { /** - * Include display metadata from the specified subcomponent. For example, a {@link ParDo} + * Register display metadata from the specified subcomponent. For example, a {@link ParDo} * transform includes display metadata from the encapsulated {@link DoFn}. - * - * @return A builder instance to continue to build in a fluent-style. */ Builder include(HasDisplayData subComponent); + /** + * Register display metadata from the specified subcomponent, using the specified namespace. + * For example, a {@link ParDo} transform includes display metadata from the encapsulated + * {@link DoFn}. + */ + Builder include(HasDisplayData subComponent, Class namespace); + /** * Register the given string display metadata. The metadata item will be registered with type * {@link DisplayData.Type#STRING}, and is identified by the specified key and namespace from @@ -135,6 +140,13 @@ public interface Builder { */ ItemBuilder add(String key, double value); + /** + * Register the given floating point display metadata. The metadata item will be registered with + * type {@link DisplayData.Type#BOOLEAN}, and is identified by the specified key and namespace + * from the current transform or component. + */ + ItemBuilder add(String key, boolean value); + /** * Register the given timestamp display metadata. The metadata item will be registered with type * {@link DisplayData.Type#TIMESTAMP}, and is identified by the specified key and namespace from @@ -287,7 +299,35 @@ public String getLinkUrl() { @Override public String toString() { - return getValue(); + return String.format("%s:%s=%s", ns, key, value); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Item) { + Item that = (Item) obj; + return Objects.equals(this.ns, that.ns) + && Objects.equals(this.key, that.key) + && Objects.equals(this.type, that.type) + && Objects.equals(this.value, that.value) + && Objects.equals(this.shortValue, that.shortValue) + && Objects.equals(this.label, that.label) + && Objects.equals(this.url, that.url); + } + + return false; + } + + @Override + public int hashCode() { + return Objects.hash( + this.ns, + this.key, + this.type, + this.value, + this.shortValue, + this.label, + this.url); } private Item withLabel(String label) { @@ -313,8 +353,12 @@ public static class Identifier { private final String ns; private final String key; - static Identifier of(Class namespace, String key) { - return new Identifier(namespace.getName(), key); + public static Identifier of(Class namespace, String key) { + return of(namespace.getName(), key); + } + + public static Identifier of(String namespace, String key) { + return new Identifier(namespace, key); } private Identifier(String ns, String key) { @@ -355,7 +399,7 @@ public String toString() { /** * Display metadata type. */ - enum Type { + public enum Type { STRING { @Override FormattedItemValue format(Object value) { @@ -374,6 +418,12 @@ FormattedItemValue format(Object value) { return new FormattedItemValue(Double.toString((Double) value)); } }, + BOOLEAN() { + @Override + FormattedItemValue format(Object value) { + return new FormattedItemValue(Boolean.toString((boolean) value)); + } + }, TIMESTAMP() { @Override FormattedItemValue format(Object value) { @@ -403,7 +453,7 @@ FormattedItemValue format(Object value) { abstract FormattedItemValue format(Object value); } - private static class FormattedItemValue { + static class FormattedItemValue { private final String shortValue; private final String longValue; @@ -416,11 +466,11 @@ private FormattedItemValue(String longValue, String shortValue) { this.shortValue = shortValue; } - private String getLongValue () { + String getLongValue() { return this.longValue; } - private String getShortValue() { + String getShortValue() { return this.shortValue; } } @@ -446,11 +496,17 @@ private static InternalBuilder forRoot(HasDisplayData instance) { @Override public Builder include(HasDisplayData subComponent) { + checkNotNull(subComponent); + return include(subComponent, subComponent.getClass()); + } + + @Override + public Builder include(HasDisplayData subComponent, Class namespace) { checkNotNull(subComponent); boolean newComponent = visited.add(subComponent); if (newComponent) { Class prevNs = this.latestNs; - this.latestNs = subComponent.getClass(); + this.latestNs = namespace; subComponent.populateDisplayData(this); this.latestNs = prevNs; } @@ -474,6 +530,11 @@ public ItemBuilder add(String key, double value) { return addItem(key, Type.FLOAT, value); } + @Override + public ItemBuilder add(String key, boolean value) { + return addItem(key, Type.BOOLEAN, value); + } + @Override public ItemBuilder add(String key, Instant value) { checkNotNull(value); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java index 385bc42c76d9..d540b4b628a9 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchers.java @@ -17,13 +17,19 @@ */ package com.google.cloud.dataflow.sdk.transforms.display; +import static org.hamcrest.Matchers.allOf; + import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Item; +import com.google.common.collect.Sets; +import org.hamcrest.CustomTypeSafeMatcher; import org.hamcrest.Description; import org.hamcrest.FeatureMatcher; import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.joda.time.Duration; +import org.joda.time.Instant; import java.util.Collection; @@ -43,6 +49,71 @@ public static Matcher hasDisplayItem() { return hasDisplayItem(Matchers.any(DisplayData.Item.class)); } + /** + * Create a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key and String value. + */ + public static Matcher hasDisplayItem(String key, String value) { + return hasDisplayItem(key, DisplayData.Type.STRING, value); + } + + /** + * Create a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key and Boolean value. + */ + public static Matcher hasDisplayItem(String key, Boolean value) { + return hasDisplayItem(key, DisplayData.Type.BOOLEAN, value); + } + + /** + * Create a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key and Duration value. + */ + public static Matcher hasDisplayItem(String key, Duration value) { + return hasDisplayItem(key, DisplayData.Type.DURATION, value); + } + + /** + * Create a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key and Float value. + */ + public static Matcher hasDisplayItem(String key, double value) { + return hasDisplayItem(key, DisplayData.Type.FLOAT, value); + } + + /** + * Create a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key and Integer value. + */ + public static Matcher hasDisplayItem(String key, long value) { + return hasDisplayItem(key, DisplayData.Type.INTEGER, value); + } + + /** + * Create a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key and Class value. + */ + public static Matcher hasDisplayItem(String key, Class value) { + return hasDisplayItem(key, DisplayData.Type.JAVA_CLASS, value); + } + + /** + * Create a matcher that matches if the examined {@link DisplayData} contains an item with the + * specified key and Timestamp value. + */ + public static Matcher hasDisplayItem(String key, Instant value) { + return hasDisplayItem(key, DisplayData.Type.TIMESTAMP, value); + } + + private static Matcher hasDisplayItem( + String key, DisplayData.Type type, Object value) { + DisplayData.FormattedItemValue formattedValue = type.format(value); + return hasDisplayItem(allOf( + hasKey(key), + hasType(type), + hasValue(formattedValue.getLongValue()))); + } + /** * Creates a matcher that matches if the examined {@link DisplayData} contains any item * matching the specified {@code itemMatcher}. @@ -69,13 +140,93 @@ protected boolean matchesSafely(DisplayData data, Description mismatchDescriptio Collection items = data.items(); boolean isMatch = Matchers.hasItem(itemMatcher).matches(items); if (!isMatch) { - mismatchDescription.appendText("found " + items.size() + " non-matching items"); + mismatchDescription.appendText("found " + items.size() + " non-matching item(s):\n"); + mismatchDescription.appendValue(data); } return isMatch; } } + /** + * Create a matcher that matches if the examined {@link DisplayData} contains all display data + * registered from the specified subcomponent. + */ + public static Matcher includes(final HasDisplayData subComponent) { + return includes(subComponent, subComponent.getClass()); + } + + /** + * Create a matcher that matches if the examined {@link DisplayData} contains all display data + * registered from the specified subcomponent and namespace. + */ + public static Matcher includes( + final HasDisplayData subComponent, final Class namespace) { + return new CustomTypeSafeMatcher("includes subcomponent") { + @Override + protected boolean matchesSafely(DisplayData displayData) { + DisplayData subComponentData = DisplayData.from(subComponent); + if (subComponentData.items().size() == 0) { + throw new UnsupportedOperationException("subComponent contains no display data; " + + "cannot verify whether it is included"); + } + + DisplayDataComparision comparison = checkSubset(displayData, subComponentData, namespace); + return comparison.missingItems.isEmpty(); + } + + + @Override + protected void describeMismatchSafely( + DisplayData displayData, Description mismatchDescription) { + DisplayData subComponentDisplayData = DisplayData.from(subComponent); + DisplayDataComparision comparison = checkSubset( + displayData, subComponentDisplayData, subComponent.getClass()); + + mismatchDescription + .appendText("did not include:\n") + .appendValue(comparison.missingItems) + .appendText("\nNon-matching items:\n") + .appendValue(comparison.unmatchedItems); + } + + private DisplayDataComparision checkSubset( + DisplayData displayData, DisplayData included, Class namespace) { + DisplayDataComparision comparison = new DisplayDataComparision(displayData.items()); + for (Item item : included.items()) { + Item matchedItem = displayData.asMap().get( + DisplayData.Identifier.of(namespace, item.getKey())); + + if (matchedItem != null) { + comparison.matched(matchedItem); + } else { + comparison.missing(item); + } + } + + return comparison; + } + + class DisplayDataComparision { + Collection missingItems; + Collection unmatchedItems; + + DisplayDataComparision(Collection superset) { + missingItems = Sets.newHashSet(); + unmatchedItems = Sets.newHashSet(superset); + } + + void matched(Item supersetItem) { + unmatchedItems.remove(supersetItem); + } + + void missing(Item subsetItem) { + missingItems.add(subsetItem); + } + } + }; + } + /** * Creates a matcher that matches if the examined {@link DisplayData.Item} contains a key * with the specified value. @@ -96,4 +247,32 @@ protected String featureValueOf(DisplayData.Item actual) { } }; } + + public static Matcher hasType(DisplayData.Type type) { + return hasType(Matchers.is(type)); + } + + public static Matcher hasType(Matcher typeMatcher) { + return new FeatureMatcher( + typeMatcher, "with type", "type") { + @Override + protected DisplayData.Type featureValueOf(DisplayData.Item actual) { + return actual.getType(); + } + }; + } + + public static Matcher hasValue(String value) { + return hasValue(Matchers.is(value)); + } + + public static Matcher hasValue(Matcher valueMatcher) { + return new FeatureMatcher( + valueMatcher, "with value", "value") { + @Override + protected String featureValueOf(DisplayData.Item actual) { + return actual.getValue(); + } + }; + } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java index 3ade92339f53..1b43ff7ea02c 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataMatchersTest.java @@ -19,11 +19,13 @@ import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasType; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasValue; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.core.StringStartsWith.startsWith; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.display.DisplayData.Builder; @@ -46,7 +48,7 @@ public void testHasDisplayItem() { Matcher matcher = hasDisplayItem(); assertFalse(matcher.matches(DisplayData.none())); - assertTrue(matcher.matches(createDisplayDataWithItem("foo", "bar"))); + assertThat(createDisplayDataWithItem("foo", "bar"), matcher); } @Test @@ -59,15 +61,68 @@ public void testHasDisplayItemDescription() { matcher.describeMismatch(DisplayData.none(), mismatchDesc); assertThat(desc.toString(), startsWith("display data with item: ")); - assertThat(mismatchDesc.toString(), containsString("found 0 non-matching items")); + assertThat(mismatchDesc.toString(), containsString("found 0 non-matching item(s)")); } @Test public void testHasKey() { Matcher matcher = hasDisplayItem(hasKey("foo")); - assertTrue(matcher.matches(createDisplayDataWithItem("foo", "bar"))); assertFalse(matcher.matches(createDisplayDataWithItem("fooz", "bar"))); + + assertThat(createDisplayDataWithItem("foo", "bar"), matcher); + } + + @Test + public void testHasType() { + Matcher matcher = hasDisplayItem(hasType(DisplayData.Type.JAVA_CLASS)); + + DisplayData data = DisplayData.from(new PTransform, PCollection>() { + @Override + public void populateDisplayData(Builder builder) { + builder.add("foo", DisplayDataMatchersTest.class); + } + }); + + assertFalse(matcher.matches(createDisplayDataWithItem("fooz", "bar"))); + assertThat(data, matcher); + } + + @Test + public void testHasValue() { + Matcher matcher = hasDisplayItem(hasValue("bar")); + + assertFalse(matcher.matches(createDisplayDataWithItem("foo", "baz"))); + assertThat(createDisplayDataWithItem("foo", "bar"), matcher); + } + + @Test + public void testIncludes() { + final HasDisplayData subComponent = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add("foo", "bar"); + } + }; + HasDisplayData hasSubcomponent = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder + .include(subComponent) + .add("foo2", "bar2"); + } + }; + HasDisplayData sameKeyDifferentNamespace = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add("foo", "bar"); + } + }; + Matcher matcher = includes(subComponent); + + assertFalse(matcher.matches(DisplayData.from(sameKeyDifferentNamespace))); + assertThat(DisplayData.from(hasSubcomponent), matcher); + assertThat(DisplayData.from(subComponent), matcher); } private DisplayData createDisplayDataWithItem(final String key, final String value) { diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java index 397e1021244f..4d75e267197a 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/transforms/display/DisplayDataTest.java @@ -19,6 +19,9 @@ import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasType; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasValue; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.everyItem; @@ -92,7 +95,7 @@ public void populateDisplayData(DisplayData.Builder builder) { .include(subComponent2) .add("MinSproggles", 200) .withLabel("Mimimum Required Sproggles") - .add("LazerOrientation", "NORTH") + .add("FireLazers", true) .add("TimeBomb", Instant.now().plus(Duration.standardDays(1))) .add("FilterLogic", subComponent1.getClass()) .add("ServiceUrl", "google.com/fizzbang") @@ -127,12 +130,12 @@ public void testCanBuild() { DisplayData.from(new HasDisplayData() { @Override public void populateDisplayData(DisplayData.Builder builder) { - builder.add("Foo", "bar"); + builder.add("foo", "bar"); } }); assertThat(data.items(), hasSize(1)); - assertThat(data, hasDisplayItem(hasKey("Foo"))); + assertThat(data, hasDisplayItem("foo", "bar")); } @Test @@ -148,7 +151,7 @@ public void populateDisplayData(DisplayData.Builder builder) { Map map = data.asMap(); assertEquals(map.size(), 1); - assertThat(data, hasDisplayItem(hasKey("foo"))); + assertThat(data, hasDisplayItem("foo", "bar")); assertEquals(map.values(), data.items()); } @@ -164,8 +167,8 @@ public void testItemProperties() { allOf( hasNamespace(Matchers.>is(ConcreteComponent.class)), hasKey("now"), - hasType(is(DisplayData.Type.TIMESTAMP)), - hasValue(is(ISO_FORMATTER.print(value))), + hasType(DisplayData.Type.TIMESTAMP), + hasValue(ISO_FORMATTER.print(value)), hasShortValue(nullValue(String.class)), hasLabel(is("the current instant")), hasUrl(is("http://time.gov")))); @@ -219,14 +222,35 @@ public void populateDisplayData(DisplayData.Builder builder) { } }); - assertThat( - data, - hasDisplayItem( - allOf( - hasKey("foo"), - hasNamespace(Matchers.>is(subComponent.getClass()))))); + assertThat(data, includes(subComponent)); + } + + @Test + public void testIncludesNamespaceOverride() { + final HasDisplayData subComponent = new HasDisplayData() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + + final HasDisplayData namespaceOverride = new HasDisplayData(){ + @Override + public void populateDisplayData(Builder builder) { + } + }; + + DisplayData data = DisplayData.from(new HasDisplayData() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.include(subComponent, namespaceOverride.getClass()); + } + }); + + assertThat(data, includes(subComponent, namespaceOverride.getClass())); } + @Test public void testIdentifierEquality() { new EqualsTester() @@ -238,6 +262,33 @@ public void testIdentifierEquality() { .testEquals(); } + @Test + public void testItemEquality() { + HasDisplayData component1 = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add("foo", "bar"); + } + }; + HasDisplayData component2 = new HasDisplayData() { + @Override + public void populateDisplayData(Builder builder) { + builder.add("foo", "bar"); + } + }; + + DisplayData component1DisplayData1 = DisplayData.from(component1); + DisplayData component1DisplayData2 = DisplayData.from(component1); + DisplayData component2DisplayData = DisplayData.from(component2); + + new EqualsTester() + .addEqualityGroup( + component1DisplayData1.items().toArray()[0], + component1DisplayData2.items().toArray()[0]) + .addEqualityGroup(component2DisplayData.items().toArray()[0]) + .testEquals(); + } + @Test public void testAnonymousClassNamespace() { DisplayData data = @@ -404,6 +455,7 @@ public void populateDisplayData(DisplayData.Builder builder) { .add("string", "foobar") .add("integer", 123) .add("float", 3.14) + .add("boolean", true) .add("java_class", DisplayDataTest.class) .add("timestamp", Instant.now()) .add("duration", Duration.standardHours(1)); @@ -412,18 +464,19 @@ public void populateDisplayData(DisplayData.Builder builder) { Collection items = data.items(); assertThat( - items, hasItem(allOf(hasKey("string"), hasType(is(DisplayData.Type.STRING))))); + items, hasItem(allOf(hasKey("string"), hasType(DisplayData.Type.STRING)))); assertThat( - items, hasItem(allOf(hasKey("integer"), hasType(is(DisplayData.Type.INTEGER))))); - assertThat(items, hasItem(allOf(hasKey("float"), hasType(is(DisplayData.Type.FLOAT))))); + items, hasItem(allOf(hasKey("integer"), hasType(DisplayData.Type.INTEGER)))); + assertThat(items, hasItem(allOf(hasKey("float"), hasType(DisplayData.Type.FLOAT)))); + assertThat(items, hasItem(allOf(hasKey("boolean"), hasType(DisplayData.Type.BOOLEAN)))); assertThat( items, - hasItem(allOf(hasKey("java_class"), hasType(is(DisplayData.Type.JAVA_CLASS))))); + hasItem(allOf(hasKey("java_class"), hasType(DisplayData.Type.JAVA_CLASS)))); assertThat( items, - hasItem(allOf(hasKey("timestamp"), hasType(is(DisplayData.Type.TIMESTAMP))))); + hasItem(allOf(hasKey("timestamp"), hasType(DisplayData.Type.TIMESTAMP)))); assertThat( - items, hasItem(allOf(hasKey("duration"), hasType(is(DisplayData.Type.DURATION))))); + items, hasItem(allOf(hasKey("duration"), hasType(DisplayData.Type.DURATION)))); } @Test @@ -438,6 +491,7 @@ public void populateDisplayData(DisplayData.Builder builder) { .add("string", "foobar") .add("integer", 123) .add("float", 3.14) + .add("boolean", true) .add("java_class", DisplayDataTest.class) .add("timestamp", now) .add("duration", oneHour); @@ -445,17 +499,13 @@ public void populateDisplayData(DisplayData.Builder builder) { }; DisplayData data = DisplayData.from(component); - Collection items = data.items(); - assertThat(items, hasItem(allOf(hasKey("string"), hasValue(is("foobar"))))); - assertThat(items, hasItem(allOf(hasKey("integer"), hasValue(is("123"))))); - assertThat(items, hasItem(allOf(hasKey("float"), hasValue(is("3.14"))))); - assertThat(items, hasItem(allOf(hasKey("java_class"), - hasValue(is(DisplayDataTest.class.getName())), - hasShortValue(is(DisplayDataTest.class.getSimpleName()))))); - assertThat(items, hasItem(allOf(hasKey("timestamp"), - hasValue(is(ISO_FORMATTER.print(now)))))); - assertThat(items, hasItem(allOf(hasKey("duration"), - hasValue(is(Long.toString(oneHour.getMillis())))))); + assertThat(data, hasDisplayItem("string", "foobar")); + assertThat(data, hasDisplayItem("integer", 123)); + assertThat(data, hasDisplayItem("float", 3.14)); + assertThat(data, hasDisplayItem("boolean", true)); + assertThat(data, hasDisplayItem("java_class", DisplayDataTest.class)); + assertThat(data, hasDisplayItem("timestamp", now)); + assertThat(data, hasDisplayItem("duration", oneHour)); } @Test @@ -582,16 +632,6 @@ protected Class featureValueOf(DisplayData.Item actual) { }; } - private static Matcher hasType(Matcher typeMatcher) { - return new FeatureMatcher( - typeMatcher, "display item with type", "type") { - @Override - protected DisplayData.Type featureValueOf(DisplayData.Item actual) { - return actual.getType(); - } - }; - } - private static Matcher hasLabel(Matcher labelMatcher) { return new FeatureMatcher( labelMatcher, "display item with label", "label") { @@ -612,16 +652,6 @@ protected String featureValueOf(DisplayData.Item actual) { }; } - private static Matcher hasValue(Matcher valueMatcher) { - return new FeatureMatcher( - valueMatcher, "display item with value", "value") { - @Override - protected String featureValueOf(DisplayData.Item actual) { - return actual.getValue(); - } - }; - } - private static Matcher hasShortValue(Matcher valueStringMatcher) { return new FeatureMatcher( valueStringMatcher, "display item with short value", "short value") { From 5888df7b3d6183d389ce3141de321be25256fc2f Mon Sep 17 00:00:00 2001 From: Thomas Groh Date: Mon, 4 Apr 2016 11:10:18 -0700 Subject: [PATCH 4/5] Give root transforms step names Fix a bug where steps would only be given step names if they were a non-root node. Use the ConsumerTrackingPipelineVisitor in the InProcessEvaluationContext test to handle runner-expanded transforms --- .../ConsumerTrackingPipelineVisitor.java | 2 +- .../ConsumerTrackingPipelineVisitorTest.java | 37 ++++++++++++++ .../InProcessEvaluationContextTest.java | 50 +++++++------------ 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java index ec4f08bc561e..48836e9c7c2e 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitor.java @@ -76,12 +76,12 @@ public void leaveCompositeTransform(TransformTreeNode node) { public void visitTransform(TransformTreeNode node) { toFinalize.removeAll(node.getInput().expand()); AppliedPTransform appliedTransform = getAppliedTransform(node); + stepNames.put(appliedTransform, genStepName()); if (node.getInput().expand().isEmpty()) { rootTransforms.add(appliedTransform); } else { for (PValue value : node.getInput().expand()) { valueToConsumers.get(value).add(appliedTransform); - stepNames.put(appliedTransform, genStepName()); } } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java index bea6fe1bd6f5..905f58f8714e 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/ConsumerTrackingPipelineVisitorTest.java @@ -176,6 +176,43 @@ public PDone apply(PInput input) { assertThat(visitor.getUnfinalizedPValues(), emptyIterable()); } + @Test + public void getStepNamesContainsAllTransforms() { + PCollection created = p.apply(Create.of("1", "2", "3")); + PCollection transformed = + created.apply( + ParDo.of( + new DoFn() { + @Override + public void processElement(DoFn.ProcessContext c) + throws Exception { + c.output(Integer.toString(c.element().length())); + } + })); + PDone finished = + transformed.apply( + new PTransform() { + @Override + public PDone apply(PInput input) { + return PDone.in(input.getPipeline()); + } + }); + + p.traverseTopologically(visitor); + assertThat( + visitor.getStepNames(), + Matchers., String>hasEntry( + created.getProducingTransformInternal(), "s0")); + assertThat( + visitor.getStepNames(), + Matchers., String>hasEntry( + transformed.getProducingTransformInternal(), "s1")); + assertThat( + visitor.getStepNames(), + Matchers., String>hasEntry( + finished.getProducingTransformInternal(), "s2")); + } + @Test public void traverseMultipleTimesThrows() { p.apply(Create.of(1, 2, 3)); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java index fde2cb43bb20..e1faf1b8744f 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/InProcessEvaluationContextTest.java @@ -73,7 +73,6 @@ import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -90,7 +89,8 @@ public class InProcessEvaluationContextTest { private PCollection> downstream; private PCollectionView> view; private PCollection unbounded; - + private Collection> rootTransforms; + private Map>> valueToConsumers; @Before public void setup() { @@ -103,32 +103,20 @@ public void setup() { downstream = created.apply(WithKeys.of("foo")); view = created.apply(View.asIterable()); unbounded = p.apply(CountingInput.unbounded()); - Collection> rootTransforms = - ImmutableList.>of( - created.getProducingTransformInternal(), unbounded.getProducingTransformInternal()); - Map>> valueToConsumers = new HashMap<>(); - valueToConsumers.put( - created, - ImmutableList.>of( - downstream.getProducingTransformInternal(), view.getProducingTransformInternal())); - valueToConsumers.put(unbounded, ImmutableList.>of()); - valueToConsumers.put(downstream, ImmutableList.>of()); - valueToConsumers.put(view, ImmutableList.>of()); - - Map, String> stepNames = new HashMap<>(); - stepNames.put(created.getProducingTransformInternal(), "s1"); - stepNames.put(downstream.getProducingTransformInternal(), "s2"); - stepNames.put(view.getProducingTransformInternal(), "s3"); - stepNames.put(unbounded.getProducingTransformInternal(), "s4"); - - Collection> views = ImmutableList.>of(view); - context = InProcessEvaluationContext.create( + + ConsumerTrackingPipelineVisitor cVis = new ConsumerTrackingPipelineVisitor(); + p.traverseTopologically(cVis); + rootTransforms = cVis.getRootTransforms(); + valueToConsumers = cVis.getValueToConsumers(); + + context = + InProcessEvaluationContext.create( runner.getPipelineOptions(), InProcessBundleFactory.create(), rootTransforms, valueToConsumers, - stepNames, - views); + cVis.getStepNames(), + cVis.getViews()); } @Test @@ -495,16 +483,14 @@ public void isDoneWithPartiallyDone() { null, ImmutableList.of(), StepTransformResult.withoutHold(unbounded.getProducingTransformInternal()).build()); - context.handleResult( - committedBundle, - ImmutableList.of(), - StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build()); assertThat(context.isDone(), is(false)); - context.handleResult( - committedBundle, - ImmutableList.of(), - StepTransformResult.withoutHold(view.getProducingTransformInternal()).build()); + for (AppliedPTransform consumers : valueToConsumers.get(created)) { + context.handleResult( + committedBundle, + ImmutableList.of(), + StepTransformResult.withoutHold(consumers).build()); + } assertThat(context.isDone(), is(true)); } From 06fd5ff7f6704e88c55905c47c05654f42a202a1 Mon Sep 17 00:00:00 2001 From: Scott Wegner Date: Tue, 5 Apr 2016 10:37:03 -0700 Subject: [PATCH 5/5] Add DisplayData for IO transforms --- .../google/cloud/dataflow/sdk/io/AvroIO.java | 25 +++++++++++ .../cloud/dataflow/sdk/io/AvroSource.java | 15 +++++++ .../cloud/dataflow/sdk/io/BigQueryIO.java | 31 +++++++++++++ .../io/BoundedReadFromUnboundedSource.java | 16 +++++++ .../dataflow/sdk/io/CompressedSource.java | 8 ++++ .../cloud/dataflow/sdk/io/CountingInput.java | 19 ++++++++ .../cloud/dataflow/sdk/io/DatastoreIO.java | 31 +++++++++++++ .../cloud/dataflow/sdk/io/PubsubIO.java | 43 +++++++++++++++++++ .../google/cloud/dataflow/sdk/io/Read.java | 15 +++++++ .../google/cloud/dataflow/sdk/io/Sink.java | 14 +++++- .../google/cloud/dataflow/sdk/io/Source.java | 14 +++++- .../google/cloud/dataflow/sdk/io/TextIO.java | 24 +++++++++++ .../google/cloud/dataflow/sdk/io/Write.java | 8 ++++ .../google/cloud/dataflow/sdk/io/XmlSink.java | 16 +++++++ .../cloud/dataflow/sdk/io/XmlSource.java | 23 ++++++++++ .../dataflow/sdk/io/bigtable/BigtableIO.java | 23 ++++++++++ .../cloud/dataflow/sdk/io/AvroIOTest.java | 31 +++++++++++++ .../cloud/dataflow/sdk/io/AvroSourceTest.java | 16 +++++++ .../cloud/dataflow/sdk/io/BigQueryIOTest.java | 40 +++++++++++++++++ .../BoundedReadFromUnboundedSourceTest.java | 18 +++++++- .../dataflow/sdk/io/CompressedSourceTest.java | 25 +++++++++++ .../dataflow/sdk/io/CountingInputTest.java | 32 ++++++++++++++ .../dataflow/sdk/io/DatastoreIOTest.java | 30 +++++++++++++ .../cloud/dataflow/sdk/io/PubsubIOTest.java | 42 ++++++++++++++++++ .../cloud/dataflow/sdk/io/ReadTest.java | 43 ++++++++++++++++++- .../cloud/dataflow/sdk/io/TextIOTest.java | 29 +++++++++++++ .../cloud/dataflow/sdk/io/WriteTest.java | 18 ++++++++ .../cloud/dataflow/sdk/io/XmlSinkTest.java | 16 +++++++ .../cloud/dataflow/sdk/io/XmlSourceTest.java | 21 +++++++++ .../sdk/io/bigtable/BigtableIOTest.java | 36 ++++++++++++++++ 30 files changed, 717 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java index 145804d2f82e..8a9e6132bcfa 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroIO.java @@ -26,6 +26,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MimeTypes; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -325,6 +326,14 @@ public PCollection apply(PInput input) { return pcol; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("schema", type); + if (filepattern != null) { + builder.add("filePattern", filepattern); + } + } + @Override protected Coder getDefaultOutputCoder() { return AvroCoder.of(type, schema); @@ -678,6 +687,22 @@ public PDone apply(PCollection input) { filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema)))); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("schema", type); + if (filenamePrefix != null) { + builder.add("fileNamePrefix", filenamePrefix); + } + + if (filenameSuffix != null && !filenameSuffix.isEmpty()) { + builder.add("fileNameSuffix", filenameSuffix); + } + + if (numShards != 0) { + builder.add("numShards", numShards); + } + } + /** * Returns the current shard name template string. */ diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroSource.java index 5a4d2a8615ed..3a06b12ae817 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/AvroSource.java @@ -21,6 +21,7 @@ import com.google.cloud.dataflow.sdk.coders.AvroCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.AvroUtils; import com.google.cloud.dataflow.sdk.util.AvroUtils.AvroMetadata; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -293,6 +294,20 @@ public AvroCoder getDefaultOutputCoder() { return coder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("filePattern", getFileOrPatternSpec()); + + if (readSchemaString != null) { + builder.add("schema", readSchemaString); + } + + long minBundleSize = getMinBundleSize(); + if (minBundleSize != DEFAULT_MIN_BUNDLE_SIZE) { + builder.add("minBundleSize", minBundleSize); + } + } + public String getSchema() { return readSchemaString; } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java index ae0ce4d8b321..29a13a705ece 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java @@ -44,6 +44,7 @@ import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; import com.google.cloud.dataflow.sdk.transforms.Sum; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow; import com.google.cloud.dataflow.sdk.util.BigQueryTableInserter; import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator; @@ -501,6 +502,21 @@ protected Coder getDefaultOutputCoder() { return TableRowJsonCoder.of(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (table != null) { + builder.add("tableSpec", toTableSpec(table)); + } + + if (query != null) { + builder.add("query", query); + } + + if (flattenResults != null) { + builder.add("flattenResults", flattenResults); + } + } + static { DirectPipelineRunner.registerDefaultTransformEvaluator( Bound.class, new DirectPipelineRunner.TransformEvaluator() { @@ -970,6 +986,21 @@ protected Coder getDefaultOutputCoder() { return VoidCoder.of(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (table != null) { + builder.add("tableSpec", toTableSpec(table)); + } + + if (schema != null) { + builder.add("schema", schema.toString()); + } + + builder + .add("createDisposition", createDisposition.toString()) + .add("writeDisposition", writeDisposition.toString()); + } + static { DirectPipelineRunner.registerDefaultTransformEvaluator( Bound.class, new DirectPipelineRunner.TransformEvaluator() { diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java index 2bad6cfba2db..2f8c03d1a6e9 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSource.java @@ -26,6 +26,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff; import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -107,6 +108,21 @@ public String getKindString() { return "Read(" + approximateSimpleName(source.getClass()) + ")"; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("source", source.getClass()) + .include(source); + + if (maxNumRecords != Long.MAX_VALUE) { + builder.add("maxRecords", maxNumRecords); + } + + if (maxReadTime != null) { + builder.add("maxReadTime", maxReadTime); + } + } + private static class UnboundedToBoundedSourceAdapter extends BoundedSource> { private final UnboundedSource source; diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java index 15e6e29decea..a759bb2cb573 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CompressedSource.java @@ -20,6 +20,7 @@ import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.common.base.Preconditions; import com.google.common.io.ByteStreams; import com.google.common.primitives.Ints; @@ -319,6 +320,13 @@ public final boolean producesSortedKeys(PipelineOptions options) throws Exceptio return sourceDelegate.producesSortedKeys(options); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .include(sourceDelegate) + .add("compressionMode", channelFactory.toString()); + } + /** * Returns the delegate source's default output coder. */ diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java index 892682dfbaab..ace7b16cec98 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/CountingInput.java @@ -24,6 +24,7 @@ import com.google.cloud.dataflow.sdk.io.Read.Unbounded; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.values.PBegin; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded; @@ -113,6 +114,11 @@ private BoundedCountingInput(long numElements) { public PCollection apply(PBegin begin) { return begin.apply(Read.from(CountingSource.upTo(numElements))); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("upTo", numElements); + } } /** @@ -220,5 +226,18 @@ public PCollection apply(PBegin begin) { read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get())); } } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("timestampFn", timestampFn.getClass()); + + if (maxReadTime.isPresent()) { + builder.add("maxReadTime", maxReadTime.get()); + } + + if (maxNumRecords.isPresent()) { + builder.add("maxRecords", maxNumRecords.get()); + } + } } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java index 91285856993f..b8010575d3ee 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/DatastoreIO.java @@ -57,6 +57,7 @@ import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions; import com.google.cloud.dataflow.sdk.options.GcpOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -397,6 +398,25 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { return getPropertyMap(entity).get("entity_bytes").getIntegerValue(); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (host != DEFAULT_HOST) { + builder.add("host", host); + } + + if (datasetId != null) { + builder.add("dataset", datasetId); + } + + if (query != null) { + builder.add("query", query.toString()); + } + + if (namespace != null) { + builder.add("namespace", namespace); + } + } + @Override public String toString() { return MoreObjects.toStringHelper(getClass()) @@ -598,6 +618,17 @@ public void validate(PipelineOptions options) { public DatastoreWriteOperation createWriteOperation(PipelineOptions options) { return new DatastoreWriteOperation(this); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (host != DEFAULT_HOST) { + builder.add("host", host); + } + + if (datasetId != null) { + builder.add("dataset", datasetId); + } + } } /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java index b7f2afef5f03..87f699ebf24f 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/PubsubIO.java @@ -41,6 +41,7 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.Transport; @@ -689,6 +690,33 @@ public PCollection apply(PInput input) { } } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (topic != null) { + builder.add("topic", topic.asPath()); + } + + if (subscription != null) { + builder.add("subscription", subscription.asPath()); + } + + if (timestampLabel != null) { + builder.add("timestampLabel", timestampLabel); + } + + if (idLabel != null) { + builder.add("idLabel", idLabel); + } + + if (maxNumRecords != 0) { + builder.add("maxRecords", maxNumRecords); + } + + if (maxReadTime != null) { + builder.add("maxReadTime", maxReadTime); + } + } + @Override protected Coder getDefaultOutputCoder() { return coder; @@ -974,6 +1002,21 @@ public PDone apply(PCollection input) { return PDone.in(input.getPipeline()); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (topic != null) { + builder.add("topic", topic.asPath()); + } + + if (timestampLabel != null) { + builder.add("timestampLabel", timestampLabel); + } + + if (idLabel != null) { + builder.add("idLabel", idLabel); + } + } + @Override protected Coder getDefaultOutputCoder() { return VoidCoder.of(); diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java index fb103eeccf37..e4725bf6e28d 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Read.java @@ -22,6 +22,7 @@ import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.SerializableUtils; import com.google.cloud.dataflow.sdk.util.WindowedValue; import com.google.cloud.dataflow.sdk.util.WindowingStrategy; @@ -144,6 +145,13 @@ public String getKindString() { return "Read(" + approximateSimpleName(source.getClass()) + ")"; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("source", source.getClass()) + .include(source); + } + static { registerDefaultTransformEvaluator(); } @@ -250,5 +258,12 @@ public final PCollection apply(PInput input) { public String getKindString() { return "Read(" + approximateSimpleName(source.getClass()) + ")"; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("source", source.getClass()) + .include(source); + } } } diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java index 4f5ae2c47a87..270bd31b7fdd 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Sink.java @@ -20,6 +20,8 @@ import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; +import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData; import com.google.cloud.dataflow.sdk.values.PCollection; import java.io.Serializable; @@ -121,7 +123,7 @@ * @param the type that will be written to the Sink. */ @Experimental(Experimental.Kind.SOURCE_SINK) -public abstract class Sink implements Serializable { +public abstract class Sink implements Serializable, HasDisplayData { /** * Ensures that the sink is valid and can be written to before the write operation begins. One * should use {@link com.google.common.base.Preconditions} to implement this method. @@ -133,6 +135,16 @@ public abstract class Sink implements Serializable { */ public abstract WriteOperation createWriteOperation(PipelineOptions options); + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } + /** * A {@link WriteOperation} defines the process of a parallel write of objects to a Sink. * diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java index 1eee771d1e18..8d87b1627138 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Source.java @@ -20,6 +20,8 @@ import com.google.cloud.dataflow.sdk.annotations.Experimental; import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; +import com.google.cloud.dataflow.sdk.transforms.display.HasDisplayData; import org.joda.time.Instant; import java.io.IOException; @@ -52,7 +54,7 @@ * @param Type of elements read by the source. */ @Experimental(Experimental.Kind.SOURCE_SINK) -public abstract class Source implements Serializable { +public abstract class Source implements Serializable, HasDisplayData { /** * Checks that this source is valid, before it can be used in a pipeline. * @@ -66,6 +68,16 @@ public abstract class Source implements Serializable { */ public abstract Coder getDefaultOutputCoder(); + /** + * {@inheritDoc} + * + *

By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } + /** * The interface that readers of custom input sources must implement. * diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java index 0980d48b69da..6989bb36af44 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/TextIO.java @@ -28,6 +28,7 @@ import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner; import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.util.MimeTypes; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -339,6 +340,14 @@ public PCollection apply(PInput input) { return pcol; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("compressionType", compressionType.toString()); + if (filepattern != null) { + builder.add("filePattern", filepattern); + } + } + @Override protected Coder getDefaultOutputCoder() { return coder; @@ -632,6 +641,21 @@ public PDone apply(PCollection input) { filenamePrefix, filenameSuffix, shardTemplate, coder))); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (filenamePrefix != null) { + builder.add("fileNamePrefix", filenamePrefix); + } + + if (filenameSuffix != null && !filenameSuffix.isEmpty()) { + builder.add("fileNameSuffix", filenameSuffix); + } + + if (numShards != 0) { + builder.add("numShards", numShards); + } + } + /** * Returns the current shard name template string. */ diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java index bc8ebb88f691..7d2e6532e770 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/Write.java @@ -29,6 +29,7 @@ import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -78,6 +79,13 @@ public PDone apply(PCollection input) { return createWrite(input, sink.createWriteOperation(options)); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("sink", sink.getClass()) + .include(sink); + } + /** * Returns the {@link Sink} associated with this PTransform. */ diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java index 9ba76c5e5923..34494c7ef355 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSink.java @@ -21,6 +21,7 @@ import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriteOperation; import com.google.cloud.dataflow.sdk.io.FileBasedSink.FileBasedWriter; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.Preconditions; @@ -220,6 +221,21 @@ public void validate(PipelineOptions options) { public XmlWriteOperation createWriteOperation(PipelineOptions options) { return new XmlWriteOperation<>(this); } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + if (baseOutputFilename != null) { + builder.add("filenamePrefix", baseOutputFilename); + } + + if (rootElementName != null) { + builder.add("rootElement", rootElementName); + } + + if (classToBind != null) { + builder.add("recordClass", classToBind); + } + } } /** diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java index eae5e8bac884..d359cf13d67d 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/XmlSource.java @@ -21,6 +21,7 @@ import com.google.cloud.dataflow.sdk.coders.JAXBCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.common.base.Preconditions; import org.codehaus.stax2.XMLInputFactory2; @@ -217,6 +218,28 @@ public void validate() { recordClass, "recordClass is null. Use builder method withRecordClass() to set this."); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("filePattern", getFileOrPatternSpec()); + + if (rootElement != null) { + builder.add("rootElement", rootElement); + } + + if (recordElement != null) { + builder.add("recordElement", recordElement); + } + + if (recordClass != null) { + builder.add("recordClass", recordClass); + } + + long minBundleSize = getMinBundleSize(); + if (minBundleSize != DEFAULT_MIN_BUNDLE_SIZE) { + builder.add("minBundleSize", minBundleSize); + } + } + @Override public Coder getDefaultOutputCoder() { return JAXBCoder.of(recordClass); diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java index 01a42ce9cf3b..041db5366505 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIO.java @@ -40,6 +40,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.runners.PipelineRunner; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.DataflowReleaseInfo; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PBegin; @@ -260,6 +261,19 @@ public void validate(PBegin input) { } } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("tableId", tableId); + + if (options != null) { + builder.add("options", options.toString()); + } + + if (filter != null) { + builder.add("rowFilter", filter.toString()); + } + } + @Override public String toString() { return MoreObjects.toStringHelper(Read.class) @@ -427,6 +441,15 @@ Write withBigtableService(BigtableService bigtableService) { return new Write(options, tableId, bigtableService); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("tableId", tableId); + + if (options != null) { + builder.add("options", options.toString()); + } + } + @Override public String toString() { return MoreObjects.toStringHelper(Write.class) diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java index f0a9b1a1512d..c71be1919f62 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroIOTest.java @@ -17,6 +17,7 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -29,6 +30,7 @@ import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.IOChannelUtils; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.base.MoreObjects; @@ -222,6 +224,35 @@ public void testAvroSinkWrite() throws Exception { } } + @Test + public void testReadDisplayData() { + AvroIO.Read.Bound read = AvroIO.Read + .from("foo.*") + .withSchema(GenericClass.class); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem("schema", GenericClass.class)); + } + + @Test + public void testWriteDisplayData() { + AvroIO.Write.Bound write = AvroIO.Write + .to("foo") + .withSuffix("bar") + .withSchema(GenericClass.class) + .withNumShards(100); + + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("fileNamePrefix", "foo")); + assertThat(displayData, hasDisplayItem("fileNameSuffix", "bar")); + assertThat(displayData, hasDisplayItem("schema", GenericClass.class)); + assertThat(displayData, hasDisplayItem("numShards", 100)); + } + // TODO: for Write only, test withSuffix, withNumShards, // withShardNameTemplate and withoutSharding. } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java index 775f5c3539bc..db26546d12dd 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/AvroSourceTest.java @@ -17,6 +17,7 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -29,6 +30,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.SourceTestUtils; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.common.base.MoreObjects; import org.apache.avro.Schema; @@ -507,6 +509,20 @@ public void testSeekerFindAllLocations() { } } + @Test + public void testDisplayData() { + AvroSource source = AvroSource + .from("foobar.txt") + .withSchema(Bird.class) + .withMinBundleSize(1234); + + DisplayData displayData = DisplayData.from(source); + assertThat(displayData, hasDisplayItem("filePattern", "foobar.txt")); + assertThat(displayData, + hasDisplayItem("schema", ReflectData.get().getSchema(Bird.class).toString())); + assertThat(displayData, hasDisplayItem("minBundleSize", 1234)); + } + /** * Class that will encode to a fixed size: 16 bytes. * diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java index 51c65563790a..12829c79fe5e 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BigQueryIOTest.java @@ -17,8 +17,10 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; import com.google.api.client.util.Data; import com.google.api.services.bigquery.model.TableReference; @@ -34,6 +36,7 @@ import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.CoderUtils; import org.hamcrest.Matchers; @@ -222,6 +225,22 @@ public void testBuildSourceWithTableAndFlatten() { p.run(); } + @Test + public void testBuildSourceDisplayData() { + String tableSpec = "project:dataset.tableid"; + + BigQueryIO.Read.Bound read = BigQueryIO.Read + .from(tableSpec) + .fromQuery("myQuery") + .withoutResultFlattening(); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("tableSpec", tableSpec)); + assertThat(displayData, hasDisplayItem("query", "myQuery")); + assertThat(displayData, hasDisplayItem("flattenResults", false)); + } + @Test public void testBuildSink() { BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") @@ -334,6 +353,27 @@ public void testBuildSinkWithWriteDispositionEmpty() { null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); } + @Test + public void testBuildSinkDisplayData() { + String tableSpec = "project:dataset.table"; + TableSchema schema = new TableSchema().set("col1", "type1").set("col2", "type2"); + + BigQueryIO.Write.Bound write = BigQueryIO.Write + .to(tableSpec) + .withSchema(schema) + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withWriteDisposition(WriteDisposition.WRITE_APPEND); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("tableSpec", tableSpec)); + assertThat(displayData, hasDisplayItem("schema", schema.toString())); + assertThat(displayData, + hasDisplayItem("createDisposition", CreateDisposition.CREATE_IF_NEEDED.toString())); + assertThat(displayData, + hasDisplayItem("writeDisposition", WriteDisposition.WRITE_APPEND.toString())); + } + private void testWriteValidatesDataset(boolean streaming) { BigQueryOptions options = PipelineOptionsFactory.as(BigQueryOptions.class); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java index 7cac67a20069..86413eeb0721 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/BoundedReadFromUnboundedSourceTest.java @@ -17,6 +17,7 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -29,6 +30,7 @@ import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -38,13 +40,14 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import java.io.Serializable; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** Unit tests for {@link BoundedReadFromUnboundedSource}. */ @RunWith(JUnit4.class) -public class BoundedReadFromUnboundedSourceTest { +public class BoundedReadFromUnboundedSourceTest implements Serializable{ private static final int NUM_RECORDS = 100; private static List finalizeTracker = null; @@ -66,6 +69,19 @@ public void testTimeBound() throws Exception { test(false, true); } + @Test + public void testForwardsDisplayData() { + TestCountingSource src = new TestCountingSource(1234) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + + BoundedReadFromUnboundedSource> read = Read.from(src).withMaxNumRecords(5); + assertThat(DisplayData.from(read), includes(src)); + } + private static class Checker implements SerializableFunction>, Void> { private final boolean dedup; diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java index 3de0513d18f8..a68a02ba8a0c 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CompressedSourceTest.java @@ -17,9 +17,13 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.cloud.dataflow.sdk.Pipeline; @@ -32,6 +36,7 @@ import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.SourceTestUtils; import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.io.Files; import com.google.common.primitives.Bytes; @@ -331,6 +336,26 @@ public void testCompressedReadMultipleFiles() throws Exception { p.run(); } + @Test + public void testDisplayData() { + ByteSource inputSource = new ByteSource("foobar.txt", 1) { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + + CompressedSource compressedSource = CompressedSource.from(inputSource); + CompressedSource gzipSource = compressedSource.withDecompression(CompressionMode.GZIP); + + DisplayData compressedSourceDisplayData = DisplayData.from(compressedSource); + DisplayData gzipDisplayData = DisplayData.from(gzipSource); + + assertThat(compressedSourceDisplayData, hasDisplayItem(hasKey("compressionMode"))); + assertThat(gzipDisplayData, hasDisplayItem("compressionMode", CompressionMode.GZIP.toString())); + assertThat(compressedSourceDisplayData, includes(inputSource)); + } + /** * Generate byte array of given size. */ diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java index 5a7c2fbac390..65b28ac981d0 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/CountingInputTest.java @@ -18,6 +18,7 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; @@ -30,9 +31,11 @@ import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.Max; import com.google.cloud.dataflow.sdk.transforms.Min; +import com.google.cloud.dataflow.sdk.transforms.PTransform; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.values.PCollection; import org.joda.time.Duration; @@ -75,6 +78,13 @@ public void testBoundedInput() { p.run(); } + @Test + public void testBoundedDisplayData() { + PTransform input = CountingInput.upTo(1234); + DisplayData displayData = DisplayData.from(input); + assertThat(displayData, hasDisplayItem("upTo", 1234)); + } + @Test @Category(RunnableOnService.class) public void testUnboundedInput() { @@ -138,6 +148,28 @@ public void testUnboundedInputTimestamps() { p.run(); } + @Test + public void testUnboundedDisplayData() { + Duration maxReadTime = Duration.standardHours(5); + SerializableFunction timestampFn = new SerializableFunction() { + @Override + public Instant apply(Long input) { + return Instant.now(); + } + }; + + PTransform input = CountingInput.unbounded() + .withMaxNumRecords(1234) + .withMaxReadTime(maxReadTime) + .withTimestampFn(timestampFn); + + DisplayData displayData = DisplayData.from(input); + + assertThat(displayData, hasDisplayItem("maxRecords", 1234)); + assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); + assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass())); + } + /** * A timestamp function that uses the given value as the timestamp. Because the input values will * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java index 41967409d0b8..891895d7c3f9 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/DatastoreIOTest.java @@ -18,6 +18,7 @@ package com.google.cloud.dataflow.sdk.io; import static com.google.api.services.datastore.client.DatastoreHelper.makeKey; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo; @@ -55,6 +56,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.TestCredential; import com.google.common.collect.Lists; @@ -194,6 +196,22 @@ public void testSourceValidationSucceedsNamespace() throws Exception { source.validate(); } + @Test + public void testSourceDipslayData() { + DatastoreIO.Source source = DatastoreIO.source() + .withDataset(DATASET) + .withQuery(QUERY) + .withHost(HOST) + .withNamespace(NAMESPACE); + + DisplayData displayData = DisplayData.from(source); + + assertThat(displayData, hasDisplayItem("dataset", DATASET)); + assertThat(displayData, hasDisplayItem("query", QUERY.toString())); + assertThat(displayData, hasDisplayItem("host", HOST)); + assertThat(displayData, hasDisplayItem("namespace", NAMESPACE)); + } + @Test public void testSinkDoesNotAllowNullHost() throws Exception { thrown.expect(NullPointerException.class); @@ -226,6 +244,18 @@ public void testSinkValidationSucceedsWithDataset() throws Exception { sink.validate(testPipelineOptions(null)); } + @Test + public void testSinkDipslayData() { + DatastoreIO.Sink sink = DatastoreIO.sink() + .withDataset(DATASET) + .withHost(HOST); + + DisplayData displayData = DisplayData.from(sink); + + assertThat(displayData, hasDisplayItem("dataset", DATASET)); + assertThat(displayData, hasDisplayItem("host", HOST)); + } + @Test public void testQuerySplitBasic() throws Exception { KindExpression mykind = KindExpression.newBuilder().setName("mykind").build(); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java index f7d81fc50d21..94f9b1b4a2e4 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/PubsubIOTest.java @@ -17,12 +17,16 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import com.google.api.client.testing.http.FixedClock; import com.google.api.client.util.Clock; import com.google.api.services.pubsub.model.PubsubMessage; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -231,4 +235,42 @@ public void timestampLabelRfc3339WithTooLargeYearThrowsError() { // Year 10000 out of range. parseTimestamp("10000-10-29T23:41:41.123999Z"); } + + @Test + public void testReadDisplayData() { + String topic = "projects/project/topics/topic"; + String subscription = "projects/project/subscriptions/subscription"; + Duration maxReadTime = Duration.standardMinutes(5); + PubsubIO.Read.Bound read = PubsubIO.Read + .topic(topic) + .subscription(subscription) + .timestampLabel("myTimestamp") + .idLabel("myId") + .maxNumRecords(1234) + .maxReadTime(maxReadTime); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("topic", topic)); + assertThat(displayData, hasDisplayItem("subscription", subscription)); + assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); + assertThat(displayData, hasDisplayItem("idLabel", "myId")); + assertThat(displayData, hasDisplayItem("maxRecords", 1234)); + assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); + } + + @Test + public void testWriteDisplayData() { + String topic = "projects/project/topics/topic"; + PubsubIO.Write.Bound write = PubsubIO.Write + .topic(topic) + .timestampLabel("myTimestamp") + .idLabel("myId"); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("topic", topic)); + assertThat(displayData, hasDisplayItem("timestampLabel", "myTimestamp")); + assertThat(displayData, hasDisplayItem("idLabel", "myId")); + } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java index 0564072ca6e4..c2965f5a5ef3 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/ReadTest.java @@ -17,10 +17,16 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes; +import static org.junit.Assert.assertThat; + import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.io.UnboundedSource.CheckpointMark; import com.google.cloud.dataflow.sdk.options.PipelineOptions; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; +import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -28,6 +34,7 @@ import org.junit.runners.JUnit4; import java.io.IOException; +import java.io.Serializable; import java.util.List; import javax.annotation.Nullable; @@ -36,9 +43,9 @@ * Tests for {@link Read}. */ @RunWith(JUnit4.class) -public class ReadTest { +public class ReadTest implements Serializable{ @Rule - public ExpectedException thrown = ExpectedException.none(); + public transient ExpectedException thrown = ExpectedException.none(); @Test public void failsWhenCustomBoundedSourceIsNotSerializable() { @@ -62,6 +69,38 @@ public void succeedsWhenCustomUnboundedSourceIsSerializable() { Read.from(new SerializableUnboundedSource()); } + @Test + public void testDisplayData() { + SerializableBoundedSource boundedSource = new SerializableBoundedSource() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + SerializableUnboundedSource unboundedSource = new SerializableUnboundedSource() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + Duration maxReadTime = Duration.standardMinutes(2345); + + Read.Bounded bounded = Read.from(boundedSource); + BoundedReadFromUnboundedSource unbounded = Read.from(unboundedSource) + .withMaxNumRecords(1234) + .withMaxReadTime(maxReadTime); + + DisplayData boundedDisplayData = DisplayData.from(bounded); + assertThat(boundedDisplayData, hasDisplayItem("source", boundedSource.getClass())); + assertThat(boundedDisplayData, includes(boundedSource)); + + DisplayData unboundedDisplayData = DisplayData.from(unbounded); + assertThat(unboundedDisplayData, hasDisplayItem("source", unboundedSource.getClass())); + assertThat(unboundedDisplayData, includes(unboundedSource)); + assertThat(unboundedDisplayData, hasDisplayItem("maxRecords", 1234)); + assertThat(unboundedDisplayData, hasDisplayItem("maxReadTime", maxReadTime)); + } + private abstract static class CustomBoundedSource extends BoundedSource { @Override public List> splitIntoBundles( diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java index 0bd6ce76274e..d167d9f4fae9 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/TextIOTest.java @@ -21,6 +21,7 @@ import static com.google.cloud.dataflow.sdk.TestUtils.LINES_ARRAY; import static com.google.cloud.dataflow.sdk.TestUtils.NO_INTS_ARRAY; import static com.google.cloud.dataflow.sdk.TestUtils.NO_LINES_ARRAY; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -41,6 +42,7 @@ import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.util.CoderUtils; import com.google.cloud.dataflow.sdk.util.GcsUtil; import com.google.cloud.dataflow.sdk.util.TestCredential; @@ -198,6 +200,18 @@ public void testReadNamed() throws Exception { } } + @Test + public void testReadDisplayData() { + TextIO.Read.Bound read = TextIO.Read + .from("foo.*") + .withCompressionType(CompressionType.BZIP2); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.*")); + assertThat(displayData, hasDisplayItem("compressionType", CompressionType.BZIP2.toString())); + } + void runTestWrite(T[] elems, Coder coder) throws Exception { File tmpFile = tmpFolder.newFile("file.txt"); String filename = tmpFile.getPath(); @@ -285,6 +299,21 @@ public void testWriteNamed() { } } + @Test + public void testWriteDisplayData() { + TextIO.Write.Bound write = TextIO.Write + .to("foo") + .withSuffix("bar") + .withNumShards(100); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("fileNamePrefix", "foo")); + assertThat(displayData, hasDisplayItem("fileNameSuffix", "bar")); + assertThat(displayData, hasDisplayItem("numShards", 100)); + + } + @Test public void testUnsupportedFilePattern() throws IOException { File outFolder = tmpFolder.newFolder(); diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java index da487729632f..671dee88bced 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/WriteTest.java @@ -17,6 +17,8 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.includes; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; @@ -35,6 +37,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -91,6 +94,21 @@ public void testWriteWindowed() { runWrite(inputs, /* windowed */ true); } + @Test + public void testDisplayData() { + TestSink sink = new TestSink() { + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("foo", "bar"); + } + }; + Write.Bound write = Write.to(sink); + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("sink", sink.getClass())); + assertThat(displayData, includes(sink)); + } + /** * Performs a Write transform and verifies the Write transform calls the appropriate methods on * a test sink in the correct order, as well as verifies that the elements of a PCollection are diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java index 3ba720b341b7..24c6d510e4d1 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSinkTest.java @@ -17,13 +17,16 @@ */ package com.google.cloud.dataflow.sdk.io; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; import com.google.cloud.dataflow.sdk.io.XmlSink.XmlWriteOperation; import com.google.cloud.dataflow.sdk.io.XmlSink.XmlWriter; import com.google.cloud.dataflow.sdk.options.PipelineOptions; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.common.collect.Lists; import org.junit.Rule; @@ -161,6 +164,19 @@ public void testCreateWriter() throws Exception { assertNotNull(writer.marshaller); } + @Test + public void testDisplayData() { + XmlSink.Bound sink = XmlSink.write() + .toFilenamePrefix("foobar") + .withRootElement("bird") + .ofRecordClass(Integer.class); + + DisplayData displayData = DisplayData.from(sink); + assertThat(displayData, hasDisplayItem("filenamePrefix", "foobar")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + } + /** * Write a bundle with an XmlWriter and verify the output is expected. */ diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java index 41e28870783e..e11d435a8f01 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/XmlSourceTest.java @@ -20,6 +20,7 @@ import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -33,6 +34,7 @@ import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.testing.DataflowAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.common.collect.ImmutableList; @@ -822,4 +824,23 @@ public void testReadXMLFilePattern() throws IOException { DataflowAssert.that(output).containsInAnyOrder(expectedResults); p.run(); } + + @Test + public void testDisplayData() { + + + XmlSource source = XmlSource + .from("foo.xml") + .withRootElement("bird") + .withRecordElement("cat") + .withMinBundleSize(1234) + .withRecordClass(Integer.class); + DisplayData displayData = DisplayData.from(source); + + assertThat(displayData, hasDisplayItem("filePattern", "foo.xml")); + assertThat(displayData, hasDisplayItem("rootElement", "bird")); + assertThat(displayData, hasDisplayItem("recordElement", "cat")); + assertThat(displayData, hasDisplayItem("recordClass", Integer.class)); + assertThat(displayData, hasDisplayItem("minBundleSize", 1234)); + } } diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java index b8ac24356126..bb63041e4b0a 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/io/bigtable/BigtableIOTest.java @@ -21,6 +21,8 @@ import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive; import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static com.google.cloud.dataflow.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static com.google.cloud.dataflow.sdk.transforms.display.DisplayDataMatchers.hasKey; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Verify.verifyNotNull; import static org.hamcrest.Matchers.hasSize; @@ -46,6 +48,7 @@ import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.display.DisplayData; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; import com.google.cloud.dataflow.sdk.values.TypeDescriptor; @@ -404,6 +407,26 @@ public void testReadingWithFilterAndSubSplits() throws Exception { assertSourcesEqualReferenceSource(source, splits, null /* options */); } + @Test + public void testReadingDisplayData() { + RowFilter rowFilter = RowFilter.newBuilder() + .setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*")) + .build(); + + BigtableIO.Read read = BigtableIO.read() + .withBigtableOptions(BIGTABLE_OPTIONS) + .withTableId("fooTable") + .withRowFilter(rowFilter); + + DisplayData displayData = DisplayData.from(read); + + assertThat(displayData, hasDisplayItem("tableId", "fooTable")); + assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString())); + + // BigtableIO adds user-agent to options; assert only on key and not value. + assertThat(displayData, hasDisplayItem(hasKey("options"))); + } + /** Tests that a record gets written to the service and messages are logged. */ @Test public void testWriting() throws Exception { @@ -460,6 +483,19 @@ public void testWritingFailsBadElement() throws Exception { p.run(); } + @Test + public void testWritingDisplayData() { + BigtableIO.Write write = BigtableIO.write() + .withTableId("fooTable") + .withBigtableOptions(BIGTABLE_OPTIONS); + + DisplayData displayData = DisplayData.from(write); + + assertThat(displayData, hasDisplayItem("tableId", "fooTable")); + // BigtableIO adds user-agent to options; assert only on key and not value. + assertThat(displayData, hasDisplayItem(hasKey("options"))); + } + //////////////////////////////////////////////////////////////////////////////////////////// private static final String COLUMN_FAMILY_NAME = "family"; private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");