Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@
</executions>
</plugin>

<!-- Report jacoco coverage to coveralls.io -->
<plugin>
<groupId>org.eluder.coveralls</groupId>
<artifactId>coveralls-maven-plugin</artifactId>
<version>4.1.0</version>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IOChannelUtils;
import com.google.cloud.dataflow.sdk.util.MimeTypes;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -325,6 +326,14 @@ public PCollection<T> apply(PInput input) {
return pcol;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add("schema", type);
if (filepattern != null) {
builder.add("filePattern", filepattern);
}
}

@Override
protected Coder<T> getDefaultOutputCoder() {
return AvroCoder.of(type, schema);
Expand Down Expand Up @@ -678,6 +687,22 @@ public PDone apply(PCollection<T> input) {
filenamePrefix, filenameSuffix, shardTemplate, AvroCoder.of(type, schema))));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add("schema", type);
if (filenamePrefix != null) {
builder.add("fileNamePrefix", filenamePrefix);
}

if (filenameSuffix != null && !filenameSuffix.isEmpty()) {
builder.add("fileNameSuffix", filenameSuffix);
}

if (numShards != 0) {
builder.add("numShards", numShards);
}
}

/**
* Returns the current shard name template string.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.AvroUtils;
import com.google.cloud.dataflow.sdk.util.AvroUtils.AvroMetadata;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -293,6 +294,20 @@ public AvroCoder<T> getDefaultOutputCoder() {
return coder;
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add("filePattern", getFileOrPatternSpec());

if (readSchemaString != null) {
builder.add("schema", readSchemaString);
}

long minBundleSize = getMinBundleSize();
if (minBundleSize != DEFAULT_MIN_BUNDLE_SIZE) {
builder.add("minBundleSize", minBundleSize);
}
}

public String getSchema() {
return readSchemaString;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.Sum;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.util.BigQueryTableInserter;
import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator;
Expand Down Expand Up @@ -501,6 +502,21 @@ protected Coder<TableRow> getDefaultOutputCoder() {
return TableRowJsonCoder.of();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
if (table != null) {
builder.add("tableSpec", toTableSpec(table));
}

if (query != null) {
builder.add("query", query);
}

if (flattenResults != null) {
builder.add("flattenResults", flattenResults);
}
}

static {
DirectPipelineRunner.registerDefaultTransformEvaluator(
Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
Expand Down Expand Up @@ -970,6 +986,21 @@ protected Coder<Void> getDefaultOutputCoder() {
return VoidCoder.of();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
if (table != null) {
builder.add("tableSpec", toTableSpec(table));
}

if (schema != null) {
builder.add("schema", schema.toString());
}

builder
.add("createDisposition", createDisposition.toString())
.add("writeDisposition", writeDisposition.toString());
}

static {
DirectPipelineRunner.registerDefaultTransformEvaluator(
Bound.class, new DirectPipelineRunner.TransformEvaluator<Bound>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.IntervalBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -107,6 +108,21 @@ public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("source", source.getClass())
.include(source);

if (maxNumRecords != Long.MAX_VALUE) {
builder.add("maxRecords", maxNumRecords);
}

if (maxReadTime != null) {
builder.add("maxReadTime", maxReadTime);
}
}

private static class UnboundedToBoundedSourceAdapter<T>
extends BoundedSource<ValueWithRecordId<T>> {
private final UnboundedSource<T, ?> source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.dataflow.sdk.annotations.Experimental;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.primitives.Ints;
Expand Down Expand Up @@ -319,6 +320,13 @@ public final boolean producesSortedKeys(PipelineOptions options) throws Exceptio
return sourceDelegate.producesSortedKeys(options);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.include(sourceDelegate)
.add("compressionMode", channelFactory.toString());
}

/**
* Returns the delegate source's default output coder.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.cloud.dataflow.sdk.io.Read.Unbounded;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.SerializableFunction;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
Expand Down Expand Up @@ -113,6 +114,11 @@ private BoundedCountingInput(long numElements) {
public PCollection<Long> apply(PBegin begin) {
return begin.apply(Read.from(CountingSource.upTo(numElements)));
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add("upTo", numElements);
}
}

/**
Expand Down Expand Up @@ -220,5 +226,18 @@ public PCollection<Long> apply(PBegin begin) {
read.withMaxReadTime(maxReadTime.get()).withMaxNumRecords(maxNumRecords.get()));
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder.add("timestampFn", timestampFn.getClass());

if (maxReadTime.isPresent()) {
builder.add("maxReadTime", maxReadTime.get());
}

if (maxNumRecords.isPresent()) {
builder.add("maxRecords", maxNumRecords.get());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.cloud.dataflow.sdk.options.DataflowPipelineWorkerPoolOptions;
import com.google.cloud.dataflow.sdk.options.GcpOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.RetryHttpRequestInitializer;
import com.google.cloud.dataflow.sdk.values.PCollection;
Expand Down Expand Up @@ -397,6 +398,25 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return getPropertyMap(entity).get("entity_bytes").getIntegerValue();
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
if (host != DEFAULT_HOST) {
builder.add("host", host);
}

if (datasetId != null) {
builder.add("dataset", datasetId);
}

if (query != null) {
builder.add("query", query.toString());
}

if (namespace != null) {
builder.add("namespace", namespace);
}
}

@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
Expand Down Expand Up @@ -598,6 +618,17 @@ public void validate(PipelineOptions options) {
public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
return new DatastoreWriteOperation(this);
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
if (host != DEFAULT_HOST) {
builder.add("host", host);
}

if (datasetId != null) {
builder.add("dataset", datasetId);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.Transport;
Expand Down Expand Up @@ -689,6 +690,33 @@ public PCollection<T> apply(PInput input) {
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
if (topic != null) {
builder.add("topic", topic.asPath());
}

if (subscription != null) {
builder.add("subscription", subscription.asPath());
}

if (timestampLabel != null) {
builder.add("timestampLabel", timestampLabel);
}

if (idLabel != null) {
builder.add("idLabel", idLabel);
}

if (maxNumRecords != 0) {
builder.add("maxRecords", maxNumRecords);
}

if (maxReadTime != null) {
builder.add("maxReadTime", maxReadTime);
}
}

@Override
protected Coder<T> getDefaultOutputCoder() {
return coder;
Expand Down Expand Up @@ -974,6 +1002,21 @@ public PDone apply(PCollection<T> input) {
return PDone.in(input.getPipeline());
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
if (topic != null) {
builder.add("topic", topic.asPath());
}

if (timestampLabel != null) {
builder.add("timestampLabel", timestampLabel);
}

if (idLabel != null) {
builder.add("idLabel", idLabel);
}
}

@Override
protected Coder<Void> getDefaultOutputCoder() {
return VoidCoder.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.display.DisplayData;
import com.google.cloud.dataflow.sdk.util.SerializableUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
Expand Down Expand Up @@ -144,6 +145,13 @@ public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("source", source.getClass())
.include(source);
}

static {
registerDefaultTransformEvaluator();
}
Expand Down Expand Up @@ -250,5 +258,12 @@ public final PCollection<T> apply(PInput input) {
public String getKindString() {
return "Read(" + approximateSimpleName(source.getClass()) + ")";
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
builder
.add("source", source.getClass())
.include(source);
}
}
}
Loading