diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java index 79aa85facc9c..22290300117b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ClassLoaderFileSystem.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResolveOptions; import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; @@ -85,7 +86,9 @@ protected void copy( @Override protected void rename( - List srcResourceIds, List destResourceIds) + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) throws IOException { throw new UnsupportedOperationException("Read-only filesystem."); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java index 59c64278449d..f735ec350bcf 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java @@ -775,7 +775,10 @@ final void moveToOutputFiles( dstFiles, StandardMoveOptions.IGNORE_MISSING_FILES, StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); - removeTemporaryFiles(srcFiles); + + // The rename ensures that the source files are deleted. However we may still need to clean + // up the directory or orphaned files. + removeTemporaryFiles(Collections.emptyList()); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java index 34ba69df358c..ba2c8251da87 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystem.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.io.fs.ResourceId; /** @@ -113,6 +114,9 @@ protected abstract void copy(List srcResourceIds, List * * @param srcResourceIds the references of the source resources * @param destResourceIds the references of the destination resources + * @param moveOptions move options specifying handling of error conditions + * @throws UnsupportedOperationException if move options are specified and not supported by the + * FileSystem * @throws FileNotFoundException if the source resources are missing. When rename throws, the * state of the resources is unknown but safe: for every (source, destination) pair of * resources, the following are possible: a) source exists, b) destination exists, c) source @@ -121,7 +125,10 @@ protected abstract void copy(List srcResourceIds, List * resource. */ protected abstract void rename( - List srcResourceIds, List destResourceIds) throws IOException; + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) + throws IOException; /** * Deletes a collection of resources. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index 03365b5936d3..cacd47705252 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -52,7 +52,6 @@ import org.apache.beam.sdk.io.fs.ResourceId; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.common.ReflectHelpers; -import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner; @@ -273,25 +272,11 @@ public static ReadableByteChannel open(ResourceId resourceId) throws IOException public static void copy( List srcResourceIds, List destResourceIds, MoveOptions... moveOptions) throws IOException { - validateSrcDestLists(srcResourceIds, destResourceIds); - if (srcResourceIds.isEmpty()) { - // Short-circuit. - return; - } - - List srcToCopy = srcResourceIds; - List destToCopy = destResourceIds; - if (Sets.newHashSet(moveOptions) - .contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) { - KV, List> existings = - filterMissingFiles(srcResourceIds, destResourceIds); - srcToCopy = existings.getKey(); - destToCopy = existings.getValue(); + FilterResult filtered = filterFiles(srcResourceIds, destResourceIds, moveOptions); + if (!filtered.resultSources.isEmpty()) { + getFileSystemInternal(filtered.resultSources.iterator().next().getScheme()) + .copy(filtered.resultSources, filtered.resultDestinations); } - if (srcToCopy.isEmpty()) { - return; - } - getFileSystemInternal(srcToCopy.iterator().next().getScheme()).copy(srcToCopy, destToCopy); } /** @@ -304,32 +289,31 @@ public static void copy( * *

It doesn't support renaming globs. * + *

Src files will be removed, even if the copy is skipped due to specified move options. + * * @param srcResourceIds the references of the source resources * @param destResourceIds the references of the destination resources */ public static void rename( List srcResourceIds, List destResourceIds, MoveOptions... moveOptions) throws IOException { - Set moveOptionSet = Sets.newHashSet(moveOptions); - validateSrcDestLists(srcResourceIds, destResourceIds); - if (srcResourceIds.isEmpty()) { - // Short-circuit. - return; - } - - List srcToRename = srcResourceIds; - List destToRename = destResourceIds; - if (moveOptionSet.size() > 0) { - KV, List> existings = - filterFiles(srcResourceIds, destResourceIds, moveOptions); - srcToRename = existings.getKey(); - destToRename = existings.getValue(); + FilterResult filtered = filterFiles(srcResourceIds, destResourceIds, moveOptions); + if (!filtered.resultSources.isEmpty()) { + try { + getFileSystemInternal(filtered.resultSources.iterator().next().getScheme()) + .rename(filtered.resultSources, filtered.resultDestinations, moveOptions); + } catch (UnsupportedOperationException e) { + // Some file systems do not yet support specifying the move options. We handle the move + // options above with filtering so specifying them is just an optimization for error + // handling and it is safe to rename without specifying them. + getFileSystemInternal(filtered.resultSources.iterator().next().getScheme()) + .rename(filtered.resultSources, filtered.resultDestinations); + } } - if (srcToRename.isEmpty()) { - return; + if (!filtered.filteredExistingSrcs.isEmpty()) { + getFileSystemInternal(filtered.filteredExistingSrcs.iterator().next().getScheme()) + .delete(filtered.filteredExistingSrcs); } - getFileSystemInternal(srcToRename.iterator().next().getScheme()) - .rename(srcToRename, destToRename); } /** @@ -391,50 +375,64 @@ public ResourceId apply(@Nonnull Metadata input) { .delete(resourceIdsToDelete); } - private static KV, List> filterMissingFiles( - List srcResourceIds, List destResourceIds) throws IOException { - return filterFiles(srcResourceIds, destResourceIds, StandardMoveOptions.IGNORE_MISSING_FILES); - } + private static class FilterResult { + public List resultSources = new ArrayList(); + public List resultDestinations = new ArrayList(); + public List filteredExistingSrcs = new ArrayList(); + }; - private static KV, List> filterFiles( + private static FilterResult filterFiles( List srcResourceIds, List destResourceIds, MoveOptions... moveOptions) throws IOException { - Set moveOptionSet = Sets.newHashSet(moveOptions); - Boolean ignoreMissingSrc = moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES); - Boolean skipExistingDest = - moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); validateSrcDestLists(srcResourceIds, destResourceIds); - if (srcResourceIds.isEmpty()) { - // Short-circuit. - return KV.of(Collections.emptyList(), Collections.emptyList()); + FilterResult result = new FilterResult(); + if (moveOptions.length == 0 || srcResourceIds.isEmpty()) { + // Nothing will be filtered. + result.resultSources = srcResourceIds; + result.resultDestinations = destResourceIds; + return result; } + Set moveOptionSet = Sets.newHashSet(moveOptions); + final boolean ignoreMissingSrc = + moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES); + final boolean skipExistingDest = + moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + final int size = srcResourceIds.size(); - List srcToHandle = new ArrayList<>(); - List destToHandle = new ArrayList<>(); - - List matchSrcResults = matchResources(srcResourceIds); - List matchDestResults = new ArrayList<>(); + // Match necessary srcs and dests with a single match call. + List matchResources = new ArrayList<>(); + if (ignoreMissingSrc) { + matchResources.addAll(srcResourceIds); + } if (skipExistingDest) { - matchDestResults = matchResources(destResourceIds); + matchResources.addAll(destResourceIds); } - - for (int i = 0; i < matchSrcResults.size(); ++i) { - if (matchSrcResults.get(i).status().equals(Status.NOT_FOUND) && ignoreMissingSrc) { - // If the source is not found, and we are ignoring found source files, then we skip it. + List matchResults = matchResources(matchResources); + List matchSrcResults = ignoreMissingSrc ? matchResults.subList(0, size) : null; + List matchDestResults = + skipExistingDest + ? matchResults.subList(matchResults.size() - size, matchResults.size()) + : null; + + for (int i = 0; i < size; ++i) { + if (matchSrcResults != null && matchSrcResults.get(i).status().equals(Status.NOT_FOUND)) { + // If the source is not found, and we are ignoring missing source files, then we skip it. continue; } - if (skipExistingDest + if (matchDestResults != null && matchDestResults.get(i).status().equals(Status.OK) && checksumMatch( matchDestResults.get(i).metadata().get(0), matchSrcResults.get(i).metadata().get(0))) { - // If the destination exists, and we are skipping when destinations exist, then we skip. + // If the destination exists, and we are skipping when destinations exist, then we skip + // the copy but note that the source exists in case it should be deleted. + result.filteredExistingSrcs.add(srcResourceIds.get(i)); continue; } - srcToHandle.add(srcResourceIds.get(i)); - destToHandle.add(destResourceIds.get(i)); + result.resultSources.add(srcResourceIds.get(i)); + result.resultDestinations.add(destResourceIds.get(i)); } - return KV.of(srcToHandle, destToHandle); + return result; } private static boolean checksumMatch(MatchResult.Metadata first, MatchResult.Metadata second) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java index 7404f78b799a..042e6c52cc5e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/LocalFileSystem.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -162,8 +163,14 @@ protected void copy(List srcResourceIds, List } @Override - protected void rename(List srcResourceIds, List destResourceIds) + protected void rename( + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) throws IOException { + if (moveOptions.length > 0) { + throw new UnsupportedOperationException("Support for move options is not yet implemented."); + } checkArgument( srcResourceIds.size() == destResourceIds.size(), "Number of source files %s must equal number of destination files %s", diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java index 9b5ee53cc258..6c3942890451 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/storage/GcsFileSystem.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.metrics.Counter; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; @@ -69,14 +70,22 @@ class GcsFileSystem extends FileSystem { /** Number of copy operations performed. */ private Counter numCopies; + /** Number of renames operations performed. */ + private Counter numRenames; + /** Time spent performing copies. */ private Counter copyTimeMsec; + /** Time spent performing renames. */ + private Counter renameTimeMsec; + GcsFileSystem(GcsOptions options) { this.options = checkNotNull(options, "options"); if (options.getGcsPerformanceMetrics()) { numCopies = Metrics.counter(GcsFileSystem.class, "num_copies"); copyTimeMsec = Metrics.counter(GcsFileSystem.class, "copy_time_msec"); + numRenames = Metrics.counter(GcsFileSystem.class, "num_renames"); + renameTimeMsec = Metrics.counter(GcsFileSystem.class, "rename_time_msec"); } } @@ -142,10 +151,20 @@ protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException } @Override - protected void rename(List srcResourceIds, List destResourceIds) + protected void rename( + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) throws IOException { - copy(srcResourceIds, destResourceIds); - delete(srcResourceIds); + Stopwatch stopwatch = Stopwatch.createStarted(); + options + .getGcsUtil() + .rename(toFilenames(srcResourceIds), toFilenames(destResourceIds), moveOptions); + stopwatch.stop(); + if (options.getGcsPerformanceMetrics()) { + numRenames.inc(srcResourceIds.size()); + renameTimeMsec.inc(stopwatch.elapsed(TimeUnit.MILLISECONDS)); + } } @Override diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index 97974f95ca00..f6a67034636c 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -26,6 +26,7 @@ import com.google.api.client.googleapis.batch.json.JsonBatchCallback; import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest; import com.google.api.client.http.HttpHeaders; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.client.http.HttpStatusCodes; @@ -60,6 +61,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Set; import java.util.concurrent.CompletionStage; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -67,10 +69,13 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.options.DefaultValueFactory; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.util.FluentBackoff; @@ -78,6 +83,7 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -89,6 +95,7 @@ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402) }) public class GcsUtil { + /** * This is a {@link DefaultValueFactory} able to create a {@link GcsUtil} using any transport * flags specified on the {@link PipelineOptions}. @@ -153,6 +160,8 @@ public static GcsUtil create( /** Client for the GCS API. */ private Storage storageClient; + private Supplier batchRequestSupplier; + private final HttpRequestInitializer httpRequestInitializer; /** Buffer size for GCS uploads (in bytes). */ private final @Nullable Integer uploadBufferSizeBytes; @@ -208,6 +217,32 @@ private GcsUtil( .build(); googleCloudStorage = new GoogleCloudStorageImpl(googleCloudStorageOptions, storageClient, credentials); + this.batchRequestSupplier = + () -> { + // Capture reference to this so that the most recent storageClient and initializer + // are used. + GcsUtil util = this; + return new BatchInterface() { + final BatchRequest batch = util.storageClient.batch(util.httpRequestInitializer); + + @Override + public void queue( + AbstractGoogleJsonClientRequest request, JsonBatchCallback cb) + throws IOException { + request.queue(batch, cb); + } + + @Override + public void execute() throws IOException { + batch.execute(); + } + + @Override + public int size() { + return batch.size(); + } + }; + }; } // Use this only for testing purposes. @@ -215,6 +250,11 @@ protected void setStorageClient(Storage storageClient) { this.storageClient = storageClient; } + // Use this only for testing purposes. + protected void setBatchRequestSupplier(Supplier supplier) { + this.batchRequestSupplier = supplier; + } + /** * Expands a pattern into matched paths. The pattern path may contain globs, which are expanded in * the result. For patterns that only match a single object, we ensure that the object exists. @@ -559,13 +599,13 @@ public boolean shouldRetry(IOException e) { Thread.currentThread().interrupt(); throw new IOException( String.format( - "Error while attempting to create bucket gs://%s for rproject %s", + "Error while attempting to create bucket gs://%s for project %s", bucket.getName(), projectId), e); } } - private static void executeBatches(List batches) throws IOException { + private static void executeBatches(List batches) throws IOException { ExecutorService executor = MoreExecutors.listeningDecorator( new ThreadPoolExecutor( @@ -576,7 +616,7 @@ private static void executeBatches(List batches) throws IOExceptio new LinkedBlockingQueue<>())); List> futures = new ArrayList<>(); - for (final BatchRequest batch : batches) { + for (final BatchInterface batch : batches) { futures.add(MoreFutures.runAsync(() -> batch.execute(), executor)); } @@ -596,20 +636,20 @@ private static void executeBatches(List batches) throws IOExceptio } /** - * Makes get {@link BatchRequest BatchRequests}. + * Makes get {@link BatchInterface BatchInterfaces}. * * @param paths {@link GcsPath GcsPaths}. * @param results mutable {@link List} for return values. - * @return {@link BatchRequest BatchRequests} to execute. + * @return {@link BatchInterface BatchInterfaces} to execute. * @throws IOException */ @VisibleForTesting - List makeGetBatches( + List makeGetBatches( Collection paths, List results) throws IOException { - List batches = new ArrayList<>(); + List batches = new ArrayList<>(); for (List filesToGet : Lists.partition(Lists.newArrayList(paths), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = createBatchRequest(); + BatchInterface batch = batchRequestSupplier.get(); for (GcsPath path : filesToGet) { results.add(enqueueGetFileSize(path, batch)); } @@ -619,35 +659,85 @@ List makeGetBatches( } /** - * Wrapper for RewriteRequest that supports multiple calls. + * Wrapper for rewriting that supports multiple calls as well as possibly deleting the source + * file. * *

Usage: create, enqueue(), and execute batch. Then, check getReadyToEnqueue() if another * round of enqueue() and execute is required. Repeat until getReadyToEnqueue() returns false. */ class RewriteOp extends JsonBatchCallback { - private GcsPath from; - private GcsPath to; + private final GcsPath from; + private final GcsPath to; + private final boolean deleteSource; + private final boolean ignoreMissingSource; private boolean readyToEnqueue; + private boolean performDelete; + private GoogleJsonError lastError; @VisibleForTesting Storage.Objects.Rewrite rewriteRequest; public boolean getReadyToEnqueue() { return readyToEnqueue; } - public void enqueue(BatchRequest batch) throws IOException { + public GoogleJsonError getLastError() { + return lastError; + } + + public GcsPath getFrom() { + return from; + } + + public GcsPath getTo() { + return to; + } + + public void enqueue(BatchInterface batch) throws IOException { if (!readyToEnqueue) { throw new IOException( String.format( "Invalid state for Rewrite, from=%s, to=%s, readyToEnqueue=%s", from, to, readyToEnqueue)); } - rewriteRequest.queue(batch, this); - readyToEnqueue = false; + if (performDelete) { + Storage.Objects.Delete deleteRequest = + storageClient.objects().delete(from.getBucket(), from.getObject()); + batch.queue( + deleteRequest, + new JsonBatchCallback() { + @Override + public void onSuccess(Void obj, HttpHeaders responseHeaders) { + LOG.debug("Successfully deleted {} after moving to {}", from, to); + readyToEnqueue = false; + lastError = null; + } + + @Override + public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) + throws IOException { + if (e.getCode() == 404) { + LOG.info( + "Ignoring failed deletion of moved file {} which already does not exist: {}", + from, + e); + readyToEnqueue = false; + lastError = null; + } else { + readyToEnqueue = true; + lastError = e; + } + } + }); + } else { + batch.queue(rewriteRequest, this); + } } - public RewriteOp(GcsPath from, GcsPath to) throws IOException { + public RewriteOp(GcsPath from, GcsPath to, boolean deleteSource, boolean ignoreMissingSource) + throws IOException { this.from = from; this.to = to; + this.deleteSource = deleteSource; + this.ignoreMissingSource = ignoreMissingSource; rewriteRequest = storageClient .objects() @@ -661,9 +751,14 @@ public RewriteOp(GcsPath from, GcsPath to) throws IOException { @Override public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders responseHeaders) throws IOException { + lastError = null; if (rewriteResponse.getDone()) { - LOG.debug("Rewrite done: {} to {}", from, to); - readyToEnqueue = false; + if (deleteSource) { + readyToEnqueue = true; + performDelete = true; + } else { + readyToEnqueue = false; + } } else { LOG.debug( "Rewrite progress: {} of {} bytes, {} to {}", @@ -681,21 +776,90 @@ public void onSuccess(RewriteResponse rewriteResponse, HttpHeaders responseHeade @Override public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOException { - readyToEnqueue = false; - throw new IOException(String.format("Error trying to rewrite %s to %s: %s", from, to, e)); + if (e.getCode() == HttpStatusCodes.STATUS_CODE_NOT_FOUND) { + if (ignoreMissingSource) { + // Treat a missing source as a successful rewrite. + readyToEnqueue = false; + lastError = null; + } else { + throw new FileNotFoundException(from.toString()); + } + } else { + lastError = e; + readyToEnqueue = true; + } } } public void copy(Iterable srcFilenames, Iterable destFilenames) throws IOException { - LinkedList rewrites = makeRewriteOps(srcFilenames, destFilenames); - while (rewrites.size() > 0) { - executeBatches(makeCopyBatches(rewrites)); + rewriteHelper(srcFilenames, destFilenames, false, false); + } + + public void rename( + Iterable srcFilenames, Iterable destFilenames, MoveOptions... moveOptions) + throws IOException { + // Rename is implemented as a rewrite followed by deleting the source. If the new object is in + // the same location, the copy is a metadata-only operation. + Set moveOptionSet = Sets.newHashSet(moveOptions); + final boolean ignoreMissingSrc = + moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES); + rewriteHelper(srcFilenames, destFilenames, true, ignoreMissingSrc); + } + + private void rewriteHelper( + Iterable srcFilenames, + Iterable destFilenames, + boolean deleteSource, + boolean ignoreMissingSource) + throws IOException { + LinkedList rewrites = + makeRewriteOps(srcFilenames, destFilenames, deleteSource, ignoreMissingSource); + org.apache.beam.sdk.util.BackOff backoff = BACKOFF_FACTORY.backoff(); + while (true) { + List batches = makeRewriteBatches(rewrites); // Removes completed rewrite ops. + if (batches.isEmpty()) { + break; + } + RewriteOp sampleErrorOp = + rewrites.stream().filter(op -> op.getLastError() != null).findFirst().orElse(null); + if (sampleErrorOp != null) { + long backOffMillis = backoff.nextBackOffMillis(); + if (backOffMillis == org.apache.beam.sdk.util.BackOff.STOP) { + throw new IOException( + String.format( + "Error completing file copies with retries, sample: from %s to %s due to %s", + sampleErrorOp.getFrom().toString(), + sampleErrorOp.getTo().toString(), + sampleErrorOp.getLastError())); + } + LOG.warn( + "Retrying with backoff unsuccessful copy requests, sample request: from {} to {} due to {}", + sampleErrorOp.getFrom(), + sampleErrorOp.getTo(), + sampleErrorOp.getLastError()); + try { + Thread.sleep(backOffMillis); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException( + String.format( + "Interrupted backoff of file copies with retries, sample: from %s to %s due to %s", + sampleErrorOp.getFrom().toString(), + sampleErrorOp.getTo().toString(), + sampleErrorOp.getLastError())); + } + } + executeBatches(batches); } } LinkedList makeRewriteOps( - Iterable srcFilenames, Iterable destFilenames) throws IOException { + Iterable srcFilenames, + Iterable destFilenames, + boolean deleteSource, + boolean ignoreMissingSource) + throws IOException { List srcList = Lists.newArrayList(srcFilenames); List destList = Lists.newArrayList(destFilenames); checkArgument( @@ -707,14 +871,14 @@ LinkedList makeRewriteOps( for (int i = 0; i < srcList.size(); i++) { final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i)); final GcsPath destPath = GcsPath.fromUri(destList.get(i)); - rewrites.addLast(new RewriteOp(sourcePath, destPath)); + rewrites.addLast(new RewriteOp(sourcePath, destPath, deleteSource, ignoreMissingSource)); } return rewrites; } - List makeCopyBatches(LinkedList rewrites) throws IOException { - List batches = new ArrayList<>(); - BatchRequest batch = createBatchRequest(); + List makeRewriteBatches(LinkedList rewrites) throws IOException { + List batches = new ArrayList<>(); + BatchInterface batch = batchRequestSupplier.get(); Iterator it = rewrites.iterator(); while (it.hasNext()) { RewriteOp rewrite = it.next(); @@ -726,7 +890,7 @@ List makeCopyBatches(LinkedList rewrites) throws IOExce if (batch.size() >= MAX_REQUESTS_PER_BATCH) { batches.add(batch); - batch = createBatchRequest(); + batch = batchRequestSupplier.get(); } } if (batch.size() > 0) { @@ -735,11 +899,11 @@ List makeCopyBatches(LinkedList rewrites) throws IOExce return batches; } - List makeRemoveBatches(Collection filenames) throws IOException { - List batches = new ArrayList<>(); + List makeRemoveBatches(Collection filenames) throws IOException { + List batches = new ArrayList<>(); for (List filesToDelete : Lists.partition(Lists.newArrayList(filenames), MAX_REQUESTS_PER_BATCH)) { - BatchRequest batch = createBatchRequest(); + BatchInterface batch = batchRequestSupplier.get(); for (String file : filesToDelete) { enqueueDelete(GcsPath.fromUri(file), batch); } @@ -749,17 +913,19 @@ List makeRemoveBatches(Collection filenames) throws IOExce } public void remove(Collection filenames) throws IOException { + // TODO(BEAM-8268): It would be better to add per-file retries and backoff + // instead of failing everything if a single operation fails. executeBatches(makeRemoveBatches(filenames)); } - private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchRequest batch) + private StorageObjectOrIOException[] enqueueGetFileSize(final GcsPath path, BatchInterface batch) throws IOException { final StorageObjectOrIOException[] ret = new StorageObjectOrIOException[1]; Storage.Objects.Get getRequest = storageClient.objects().get(path.getBucket(), path.getObject()); - getRequest.queue( - batch, + batch.queue( + getRequest, new JsonBatchCallback() { @Override public void onSuccess(StorageObject response, HttpHeaders httpHeaders) @@ -806,11 +972,11 @@ public static StorageObjectOrIOException create(IOException ioException) { } } - private void enqueueDelete(final GcsPath file, BatchRequest batch) throws IOException { + private void enqueueDelete(final GcsPath file, BatchInterface batch) throws IOException { Storage.Objects.Delete deleteRequest = storageClient.objects().delete(file.getBucket(), file.getObject()); - deleteRequest.queue( - batch, + batch.queue( + deleteRequest, new JsonBatchCallback() { @Override public void onSuccess(Void obj, HttpHeaders responseHeaders) { @@ -829,7 +995,13 @@ public void onFailure(GoogleJsonError e, HttpHeaders responseHeaders) throws IOE }); } - private BatchRequest createBatchRequest() { - return storageClient.batch(httpRequestInitializer); + @VisibleForTesting + interface BatchInterface { + void queue(AbstractGoogleJsonClientRequest request, JsonBatchCallback cb) + throws IOException; + + void execute() throws IOException; + + int size(); } } diff --git a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java index 0923ca37ae29..2890e72da262 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/test/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilTest.java @@ -26,13 +26,18 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.api.client.googleapis.batch.BatchRequest; +import com.google.api.client.googleapis.batch.json.JsonBatchCallback; +import com.google.api.client.googleapis.json.GoogleJsonError; import com.google.api.client.googleapis.json.GoogleJsonError.ErrorInfo; import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest; import com.google.api.client.http.HttpRequest; import com.google.api.client.http.HttpResponse; import com.google.api.client.http.HttpStatusCodes; @@ -51,6 +56,7 @@ import com.google.api.services.storage.Storage; import com.google.api.services.storage.model.Bucket; import com.google.api.services.storage.model.Objects; +import com.google.api.services.storage.model.RewriteResponse; import com.google.api.services.storage.model.StorageObject; import com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadOptions; import java.io.ByteArrayInputStream; @@ -71,11 +77,14 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.beam.sdk.extensions.gcp.auth.TestCredential; import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.BatchInterface; import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.RewriteOp; import org.apache.beam.sdk.extensions.gcp.util.GcsUtil.StorageObjectOrIOException; import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath; +import org.apache.beam.sdk.io.fs.MoveOptions.StandardMoveOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.util.FluentBackoff; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; @@ -826,9 +835,9 @@ private static List makeGcsPaths(String s, int n) { return ret.build(); } - private static int sumBatchSizes(List batches) { + private static int sumBatchSizes(List batches) { int ret = 0; - for (BatchRequest b : batches) { + for (BatchInterface b : batches) { ret += b.size(); assertThat(b.size(), greaterThan(0)); } @@ -841,7 +850,7 @@ public void testMakeRewriteOps() throws IOException { GcsUtil gcsUtil = gcsOptions.getGcsUtil(); LinkedList rewrites = - gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1)); + gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1), false, false); assertEquals(1, rewrites.size()); RewriteOp rewrite = rewrites.pop(); @@ -861,7 +870,7 @@ public void testMakeRewriteOpsWithOptions() throws IOException { gcsUtil.maxBytesRewrittenPerCall = 1337L; LinkedList rewrites = - gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1)); + gcsUtil.makeRewriteOps(makeStrings("s", 1), makeStrings("d", 1), false, false); assertEquals(1, rewrites.size()); RewriteOp rewrite = rewrites.pop(); @@ -871,26 +880,27 @@ public void testMakeRewriteOpsWithOptions() throws IOException { } @Test - public void testMakeCopyBatches() throws IOException { + public void testMakeRewriteBatches() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); // Small number of files fits in 1 batch - List batches = - gcsUtil.makeCopyBatches(gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 3))); + List batches = + gcsUtil.makeRewriteBatches( + gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 3), false, false)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3)); // 1 batch of files fits in 1 batch batches = - gcsUtil.makeCopyBatches( - gcsUtil.makeRewriteOps(makeStrings("s", 100), makeStrings("d", 100))); + gcsUtil.makeRewriteBatches( + gcsUtil.makeRewriteOps(makeStrings("s", 100), makeStrings("d", 100), false, false)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(100)); // A little more than 5 batches of files fits in 6 batches batches = - gcsUtil.makeCopyBatches( - gcsUtil.makeRewriteOps(makeStrings("s", 501), makeStrings("d", 501))); + gcsUtil.makeRewriteBatches( + gcsUtil.makeRewriteOps(makeStrings("s", 501), makeStrings("d", 501), false, false)); assertThat(batches.size(), equalTo(6)); assertThat(sumBatchSizes(batches), equalTo(501)); } @@ -901,7 +911,142 @@ public void testMakeRewriteOpsInvalid() throws IOException { thrown.expect(IllegalArgumentException.class); thrown.expectMessage("Number of source files 3"); - gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1)); + gcsUtil.makeRewriteOps(makeStrings("s", 3), makeStrings("d", 1), false, false); + } + + private class FakeBatcher implements BatchInterface { + ArrayList> requests = new ArrayList<>(); + + @Override + public void queue(AbstractGoogleJsonClientRequest request, JsonBatchCallback cb) { + assertNotNull(request); + assertNotNull(cb); + requests.add( + () -> { + try { + try { + T result = request.execute(); + cb.onSuccess(result, null); + } catch (FileNotFoundException e) { + GoogleJsonError error = new GoogleJsonError(); + error.setCode(HttpStatusCodes.STATUS_CODE_NOT_FOUND); + cb.onFailure(error, null); + } catch (Exception e) { + System.out.println("Propagating exception as server error " + e); + e.printStackTrace(); + GoogleJsonError error = new GoogleJsonError(); + error.setCode(HttpStatusCodes.STATUS_CODE_SERVER_ERROR); + cb.onFailure(error, null); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }); + } + + @Override + public void execute() throws IOException { + RuntimeException lastException = null; + for (Supplier request : requests) { + try { + request.get(); + } catch (RuntimeException e) { + lastException = e; + } + } + if (lastException != null) { + throw lastException; + } + } + + @Override + public int size() { + return requests.size(); + } + } + + @Test + public void testRename() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher()); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Rewrite mockStorageRewrite = Mockito.mock(Storage.Objects.Rewrite.class); + Storage.Objects.Delete mockStorageDelete1 = Mockito.mock(Storage.Objects.Delete.class); + Storage.Objects.Delete mockStorageDelete2 = Mockito.mock(Storage.Objects.Delete.class); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null)) + .thenReturn(mockStorageRewrite); + when(mockStorageRewrite.execute()) + .thenThrow(new SocketTimeoutException("SocketException")) + .thenReturn(new RewriteResponse().setDone(true)); + when(mockStorageObjects.delete("bucket", "s0")) + .thenReturn(mockStorageDelete1) + .thenReturn(mockStorageDelete2); + + when(mockStorageDelete1.execute()).thenThrow(new SocketTimeoutException("SocketException")); + + gcsUtil.rename(makeStrings("s", 1), makeStrings("d", 1)); + verify(mockStorageRewrite, times(2)).execute(); + verify(mockStorageDelete1, times(1)).execute(); + verify(mockStorageDelete2, times(1)).execute(); + } + + @Test + public void testRenameIgnoringMissing() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher()); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Rewrite mockStorageRewrite1 = Mockito.mock(Storage.Objects.Rewrite.class); + Storage.Objects.Rewrite mockStorageRewrite2 = Mockito.mock(Storage.Objects.Rewrite.class); + Storage.Objects.Delete mockStorageDelete = Mockito.mock(Storage.Objects.Delete.class); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null)) + .thenReturn(mockStorageRewrite1); + when(mockStorageRewrite1.execute()).thenThrow(new FileNotFoundException()); + when(mockStorageObjects.rewrite("bucket", "s1", "bucket", "d1", null)) + .thenReturn(mockStorageRewrite2); + when(mockStorageRewrite2.execute()).thenReturn(new RewriteResponse().setDone(true)); + when(mockStorageObjects.delete("bucket", "s1")).thenReturn(mockStorageDelete); + + gcsUtil.rename( + makeStrings("s", 2), makeStrings("d", 2), StandardMoveOptions.IGNORE_MISSING_FILES); + verify(mockStorageRewrite1, times(1)).execute(); + verify(mockStorageRewrite2, times(1)).execute(); + verify(mockStorageDelete, times(1)).execute(); + } + + @Test + public void testRenamePropagateMissingException() throws IOException { + GcsOptions pipelineOptions = gcsOptionsWithTestCredential(); + GcsUtil gcsUtil = pipelineOptions.getGcsUtil(); + + Storage mockStorage = Mockito.mock(Storage.class); + gcsUtil.setStorageClient(mockStorage); + gcsUtil.setBatchRequestSupplier(() -> new FakeBatcher()); + + Storage.Objects mockStorageObjects = Mockito.mock(Storage.Objects.class); + Storage.Objects.Rewrite mockStorageRewrite = Mockito.mock(Storage.Objects.Rewrite.class); + + when(mockStorage.objects()).thenReturn(mockStorageObjects); + when(mockStorageObjects.rewrite("bucket", "s0", "bucket", "d0", null)) + .thenReturn(mockStorageRewrite); + when(mockStorageRewrite.execute()).thenThrow(new FileNotFoundException()); + + assertThrows(IOException.class, () -> gcsUtil.rename(makeStrings("s", 1), makeStrings("d", 1))); + verify(mockStorageRewrite, times(1)).execute(); } @Test @@ -909,7 +1054,7 @@ public void testMakeRemoveBatches() throws IOException { GcsUtil gcsUtil = gcsOptionsWithTestCredential().getGcsUtil(); // Small number of files fits in 1 batch - List batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); + List batches = gcsUtil.makeRemoveBatches(makeStrings("s", 3)); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3)); @@ -930,7 +1075,7 @@ public void testMakeGetBatches() throws IOException { // Small number of files fits in 1 batch List results = Lists.newArrayList(); - List batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); + List batches = gcsUtil.makeGetBatches(makeGcsPaths("s", 3), results); assertThat(batches.size(), equalTo(1)); assertThat(sumBatchSizes(batches), equalTo(3)); assertEquals(3, results.size()); diff --git a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java index 3a556dd82041..e5232df899ff 100644 --- a/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/s3/S3FileSystem.java @@ -64,6 +64,7 @@ import org.apache.beam.sdk.io.aws.options.S3Options; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; @@ -545,8 +546,13 @@ CompleteMultipartUploadResult multipartCopy( @Override protected void rename( - List sourceResourceIds, List destinationResourceIds) + List sourceResourceIds, + List destinationResourceIds, + MoveOptions... moveOptions) throws IOException { + if (moveOptions.length > 0) { + throw new UnsupportedOperationException("Support for move options is not yet implemented."); + } copy(sourceResourceIds, destinationResourceIds); delete(sourceResourceIds); } diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java index a4a991a1a156..11a5215a8d9d 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/s3/S3FileSystem.java @@ -46,6 +46,7 @@ import org.apache.beam.sdk.io.aws2.options.S3Options; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.sdk.util.MoreFutures; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; @@ -567,8 +568,13 @@ CompleteMultipartUploadResponse multipartCopy( @Override protected void rename( - List sourceResourceIds, List destinationResourceIds) + List sourceResourceIds, + List destinationResourceIds, + MoveOptions... moveOptions) throws IOException { + if (moveOptions.length > 0) { + throw new UnsupportedOperationException("Support for move options is not yet implemented."); + } copy(sourceResourceIds, destinationResourceIds); delete(sourceResourceIds); } diff --git a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java index 78a66367fcf0..7bc6d2679db7 100644 --- a/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java +++ b/sdks/java/io/azure/src/main/java/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystem.java @@ -52,6 +52,7 @@ import org.apache.beam.sdk.io.azure.options.BlobstoreOptions; import org.apache.beam.sdk.io.fs.CreateOptions; import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.sdk.util.InstanceBuilder; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; @@ -389,8 +390,14 @@ String generateSasToken() throws IOException { } @Override - protected void rename(List srcResourceIds, List destResourceIds) + protected void rename( + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) throws IOException { + if (moveOptions.length > 0) { + throw new UnsupportedOperationException("Support for move options is not yet implemented."); + } copy(srcResourceIds, destResourceIds); delete(srcResourceIds); } diff --git a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java index a5897594e3a6..a813e318d3a2 100644 --- a/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java +++ b/sdks/java/io/hadoop-file-system/src/main/java/org/apache/beam/sdk/io/hdfs/HadoopFileSystem.java @@ -37,6 +37,7 @@ import org.apache.beam.sdk.io.fs.MatchResult; import org.apache.beam.sdk.io.fs.MatchResult.Metadata; import org.apache.beam.sdk.io.fs.MatchResult.Status; +import org.apache.beam.sdk.io.fs.MoveOptions; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.apache.hadoop.conf.Configuration; @@ -244,8 +245,13 @@ protected void copy(List srcResourceIds, List srcResourceIds, List destResourceIds) + List srcResourceIds, + List destResourceIds, + MoveOptions... moveOptions) throws IOException { + if (moveOptions.length > 0) { + throw new UnsupportedOperationException("Support for move options is not yet implemented."); + } for (int i = 0; i < srcResourceIds.size(); ++i) { final Path srcPath = srcResourceIds.get(i).toPath();