Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
Expand Down Expand Up @@ -85,7 +86,9 @@ protected void copy(

@Override
protected void rename(
List<ClassLoaderResourceId> srcResourceIds, List<ClassLoaderResourceId> destResourceIds)
List<ClassLoaderResourceId> srcResourceIds,
List<ClassLoaderResourceId> destResourceIds,
MoveOptions... moveOptions)
throws IOException {
throw new UnsupportedOperationException("Read-only filesystem.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,10 @@ final void moveToOutputFiles(
dstFiles,
StandardMoveOptions.IGNORE_MISSING_FILES,
StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
removeTemporaryFiles(srcFiles);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still need removeTemporaryFiles() to remove the temporary directory for batch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding back, I think we need this as well if we are filtering if the destination exists because we still need to remove the source in that case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might make sense to update the semantics of "StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS" to delete the source during rename. That will allow us to prevent the double delete for the case where the source existed.

Also probably removeTemporaryFiles() should be updated to just cleanup the temporary directory (where appropriate) instead of trying to delete already renamed files.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I modified FileSystems.rename to delete srcs that existed but were filtered due to dest existing.
I kept the existing methods in FileBasedSink because it appears they are designed to be called by subclasses. I changed to pass an empty set for known files after the rename to avoid the unnecessary delete.


// The rename ensures that the source files are deleted. However we may still need to clean
// up the directory or orphaned files.
removeTemporaryFiles(Collections.emptyList());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.io.fs.CreateOptions;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;

/**
Expand Down Expand Up @@ -113,6 +114,9 @@ protected abstract void copy(List<ResourceIdT> srcResourceIds, List<ResourceIdT>
*
* @param srcResourceIds the references of the source resources
* @param destResourceIds the references of the destination resources
* @param moveOptions move options specifying handling of error conditions
* @throws UnsupportedOperationException if move options are specified and not supported by the
* FileSystem
* @throws FileNotFoundException if the source resources are missing. When rename throws, the
* state of the resources is unknown but safe: for every (source, destination) pair of
* resources, the following are possible: a) source exists, b) destination exists, c) source
Expand All @@ -121,7 +125,10 @@ protected abstract void copy(List<ResourceIdT> srcResourceIds, List<ResourceIdT>
* resource.
*/
protected abstract void rename(
List<ResourceIdT> srcResourceIds, List<ResourceIdT> destResourceIds) throws IOException;
List<ResourceIdT> srcResourceIds,
List<ResourceIdT> destResourceIds,
MoveOptions... moveOptions)
throws IOException;

/**
* Deletes a collection of resources.
Expand Down
126 changes: 62 additions & 64 deletions sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Joiner;
Expand Down Expand Up @@ -273,25 +272,11 @@ public static ReadableByteChannel open(ResourceId resourceId) throws IOException
public static void copy(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds, MoveOptions... moveOptions)
throws IOException {
validateSrcDestLists(srcResourceIds, destResourceIds);
if (srcResourceIds.isEmpty()) {
// Short-circuit.
return;
}

List<ResourceId> srcToCopy = srcResourceIds;
List<ResourceId> destToCopy = destResourceIds;
if (Sets.newHashSet(moveOptions)
.contains(MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES)) {
KV<List<ResourceId>, List<ResourceId>> existings =
filterMissingFiles(srcResourceIds, destResourceIds);
srcToCopy = existings.getKey();
destToCopy = existings.getValue();
FilterResult filtered = filterFiles(srcResourceIds, destResourceIds, moveOptions);
if (!filtered.resultSources.isEmpty()) {
getFileSystemInternal(filtered.resultSources.iterator().next().getScheme())
.copy(filtered.resultSources, filtered.resultDestinations);
}
if (srcToCopy.isEmpty()) {
return;
}
getFileSystemInternal(srcToCopy.iterator().next().getScheme()).copy(srcToCopy, destToCopy);
}

/**
Expand All @@ -304,32 +289,31 @@ public static void copy(
*
* <p>It doesn't support renaming globs.
*
* <p>Src files will be removed, even if the copy is skipped due to specified move options.
*
* @param srcResourceIds the references of the source resources
* @param destResourceIds the references of the destination resources
*/
public static void rename(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds, MoveOptions... moveOptions)
throws IOException {
Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
validateSrcDestLists(srcResourceIds, destResourceIds);
if (srcResourceIds.isEmpty()) {
// Short-circuit.
return;
}

List<ResourceId> srcToRename = srcResourceIds;
List<ResourceId> destToRename = destResourceIds;
if (moveOptionSet.size() > 0) {
KV<List<ResourceId>, List<ResourceId>> existings =
filterFiles(srcResourceIds, destResourceIds, moveOptions);
srcToRename = existings.getKey();
destToRename = existings.getValue();
FilterResult filtered = filterFiles(srcResourceIds, destResourceIds, moveOptions);
if (!filtered.resultSources.isEmpty()) {
try {
getFileSystemInternal(filtered.resultSources.iterator().next().getScheme())
.rename(filtered.resultSources, filtered.resultDestinations, moveOptions);
} catch (UnsupportedOperationException e) {
// Some file systems do not yet support specifying the move options. We handle the move
// options above with filtering so specifying them is just an optimization for error
// handling and it is safe to rename without specifying them.
getFileSystemInternal(filtered.resultSources.iterator().next().getScheme())
.rename(filtered.resultSources, filtered.resultDestinations);
}
}
if (srcToRename.isEmpty()) {
return;
if (!filtered.filteredExistingSrcs.isEmpty()) {
getFileSystemInternal(filtered.filteredExistingSrcs.iterator().next().getScheme())
.delete(filtered.filteredExistingSrcs);
}
getFileSystemInternal(srcToRename.iterator().next().getScheme())
.rename(srcToRename, destToRename);
}

/**
Expand Down Expand Up @@ -391,50 +375,64 @@ public ResourceId apply(@Nonnull Metadata input) {
.delete(resourceIdsToDelete);
}

private static KV<List<ResourceId>, List<ResourceId>> filterMissingFiles(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds) throws IOException {
return filterFiles(srcResourceIds, destResourceIds, StandardMoveOptions.IGNORE_MISSING_FILES);
}
private static class FilterResult {
public List<ResourceId> resultSources = new ArrayList();
public List<ResourceId> resultDestinations = new ArrayList();
public List<ResourceId> filteredExistingSrcs = new ArrayList();
};

private static KV<List<ResourceId>, List<ResourceId>> filterFiles(
private static FilterResult filterFiles(
List<ResourceId> srcResourceIds, List<ResourceId> destResourceIds, MoveOptions... moveOptions)
throws IOException {
Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
Boolean ignoreMissingSrc = moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES);
Boolean skipExistingDest =
moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
validateSrcDestLists(srcResourceIds, destResourceIds);
if (srcResourceIds.isEmpty()) {
// Short-circuit.
return KV.of(Collections.<ResourceId>emptyList(), Collections.<ResourceId>emptyList());
FilterResult result = new FilterResult();
if (moveOptions.length == 0 || srcResourceIds.isEmpty()) {
// Nothing will be filtered.
result.resultSources = srcResourceIds;
result.resultDestinations = destResourceIds;
return result;
}
Set<MoveOptions> moveOptionSet = Sets.newHashSet(moveOptions);
final boolean ignoreMissingSrc =
moveOptionSet.contains(StandardMoveOptions.IGNORE_MISSING_FILES);
final boolean skipExistingDest =
moveOptionSet.contains(StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
final int size = srcResourceIds.size();

List<ResourceId> srcToHandle = new ArrayList<>();
List<ResourceId> destToHandle = new ArrayList<>();

List<MatchResult> matchSrcResults = matchResources(srcResourceIds);
List<MatchResult> matchDestResults = new ArrayList<>();
// Match necessary srcs and dests with a single match call.
List<ResourceId> matchResources = new ArrayList<>();
if (ignoreMissingSrc) {
matchResources.addAll(srcResourceIds);
}
if (skipExistingDest) {
matchDestResults = matchResources(destResourceIds);
matchResources.addAll(destResourceIds);
}

for (int i = 0; i < matchSrcResults.size(); ++i) {
if (matchSrcResults.get(i).status().equals(Status.NOT_FOUND) && ignoreMissingSrc) {
// If the source is not found, and we are ignoring found source files, then we skip it.
List<MatchResult> matchResults = matchResources(matchResources);
List<MatchResult> matchSrcResults = ignoreMissingSrc ? matchResults.subList(0, size) : null;
List<MatchResult> matchDestResults =
skipExistingDest
? matchResults.subList(matchResults.size() - size, matchResults.size())
: null;

for (int i = 0; i < size; ++i) {
if (matchSrcResults != null && matchSrcResults.get(i).status().equals(Status.NOT_FOUND)) {
// If the source is not found, and we are ignoring missing source files, then we skip it.
continue;
}
if (skipExistingDest
if (matchDestResults != null
&& matchDestResults.get(i).status().equals(Status.OK)
&& checksumMatch(
matchDestResults.get(i).metadata().get(0),
matchSrcResults.get(i).metadata().get(0))) {
// If the destination exists, and we are skipping when destinations exist, then we skip.
// If the destination exists, and we are skipping when destinations exist, then we skip
// the copy but note that the source exists in case it should be deleted.
result.filteredExistingSrcs.add(srcResourceIds.get(i));
continue;
}
srcToHandle.add(srcResourceIds.get(i));
destToHandle.add(destResourceIds.get(i));
result.resultSources.add(srcResourceIds.get(i));
result.resultDestinations.add(destResourceIds.get(i));
}
return KV.of(srcToHandle, destToHandle);
return result;
}

private static boolean checksumMatch(MatchResult.Metadata first, MatchResult.Metadata second) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -162,8 +163,14 @@ protected void copy(List<LocalResourceId> srcResourceIds, List<LocalResourceId>
}

@Override
protected void rename(List<LocalResourceId> srcResourceIds, List<LocalResourceId> destResourceIds)
protected void rename(
List<LocalResourceId> srcResourceIds,
List<LocalResourceId> destResourceIds,
MoveOptions... moveOptions)
throws IOException {
if (moveOptions.length > 0) {
throw new UnsupportedOperationException("Support for move options is not yet implemented.");
}
checkArgument(
srcResourceIds.size() == destResourceIds.size(),
"Number of source files %s must equal number of destination files %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.fs.MatchResult.Status;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -69,14 +70,22 @@ class GcsFileSystem extends FileSystem<GcsResourceId> {
/** Number of copy operations performed. */
private Counter numCopies;

/** Number of renames operations performed. */
private Counter numRenames;

/** Time spent performing copies. */
private Counter copyTimeMsec;

/** Time spent performing renames. */
private Counter renameTimeMsec;

GcsFileSystem(GcsOptions options) {
this.options = checkNotNull(options, "options");
if (options.getGcsPerformanceMetrics()) {
numCopies = Metrics.counter(GcsFileSystem.class, "num_copies");
copyTimeMsec = Metrics.counter(GcsFileSystem.class, "copy_time_msec");
numRenames = Metrics.counter(GcsFileSystem.class, "num_renames");
renameTimeMsec = Metrics.counter(GcsFileSystem.class, "rename_time_msec");
}
}

Expand Down Expand Up @@ -142,10 +151,20 @@ protected ReadableByteChannel open(GcsResourceId resourceId) throws IOException
}

@Override
protected void rename(List<GcsResourceId> srcResourceIds, List<GcsResourceId> destResourceIds)
protected void rename(
List<GcsResourceId> srcResourceIds,
List<GcsResourceId> destResourceIds,
MoveOptions... moveOptions)
throws IOException {
copy(srcResourceIds, destResourceIds);
delete(srcResourceIds);
Stopwatch stopwatch = Stopwatch.createStarted();
options
.getGcsUtil()
.rename(toFilenames(srcResourceIds), toFilenames(destResourceIds), moveOptions);
stopwatch.stop();
if (options.getGcsPerformanceMetrics()) {
numRenames.inc(srcResourceIds.size());
renameTimeMsec.inc(stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
}

@Override
Expand Down
Loading