Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ private static Version GetICUVersion()

public static bool IsNet5CompatFileStreamEnabled => _net5CompatFileStream.Value;

public static bool IsNet5CompatFileStreamDisabled => !_net5CompatFileStream.Value;

private static readonly Lazy<bool> s_fileLockingDisabled = new Lazy<bool>(() => GetStaticNonPublicBooleanPropertyValue("Microsoft.Win32.SafeHandles.SafeFileHandle", "DisableFileLocking"));

public static bool IsFileLockingEnabled => IsWindows || !s_fileLockingDisabled.Value;
Expand Down
30 changes: 30 additions & 0 deletions src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,36 @@ public async Task IncompleteReadCantSetPositionBeyondEndOfFile(FileShare fileSha
Assert.Equal(fileSize, fs.Position);
}
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported), nameof(PlatformDetection.IsNet5CompatFileStreamDisabled))]
[InlineData(FileShare.None, FileOptions.Asynchronous)] // FileShare.None: exclusive access
[InlineData(FileShare.ReadWrite, FileOptions.Asynchronous)] // FileShare.ReadWrite: others can write to the file, the length can't be cached
[InlineData(FileShare.None, FileOptions.None)]
[InlineData(FileShare.ReadWrite, FileOptions.None)]
public async Task ConcurrentReadsKeepCorrectOrder(FileShare fileShare, FileOptions options)
{
const int fileSize = 10_000;
string filePath = GetTestFilePath();
byte[] content = RandomNumberGenerator.GetBytes(fileSize);
File.WriteAllBytes(filePath, content);

byte[] buffer = new byte[fileSize];

using (FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, fileShare, bufferSize: 0, options))
{
Task<int>[] reads = Enumerable.Range(0, 10).Select(index => ReadAsync(fs, buffer, index * 1000, 1000)).ToArray();

// the reads were not awaited, it's an anti-pattern and Position can be (0, fileSize) now:
Assert.InRange(fs.Position, 0, fileSize);

await Task.WhenAll(reads);

Assert.All(reads, read => Assert.Equal(1000, read.Result));
AssertExtensions.SequenceEqual(content, buffer);
Assert.Equal(fileSize, fs.Position);
Assert.Equal(fileSize, fs.Length);
}
}
}

[ActiveIssue("https://github.com/dotnet/runtime/issues/34582", TestPlatforms.Windows, TargetFrameworkMonikers.Netcoreapp, TestRuntimes.Mono)]
Expand Down
30 changes: 30 additions & 0 deletions src/libraries/System.IO.FileSystem/tests/FileStream/WriteAsync.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Linq;
using System.Security.Cryptography;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -161,6 +163,34 @@ public async Task WriteAsyncInternalBufferOverflow()
}
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported), nameof(PlatformDetection.IsNet5CompatFileStreamDisabled))]
[InlineData(FileShare.None, FileOptions.Asynchronous)] // FileShare.None: exclusive access
[InlineData(FileShare.ReadWrite, FileOptions.Asynchronous)] // FileShare.ReadWrite: others can write to the file, the length can't be cached
[InlineData(FileShare.None, FileOptions.None)]
[InlineData(FileShare.ReadWrite, FileOptions.None)]
public async Task ConcurrentWritesKeepCorrectOrder(FileShare fileShare, FileOptions options)
{
const int fileSize = 10_000;
string filePath = GetTestFilePath();
byte[] content = RandomNumberGenerator.GetBytes(fileSize);

using (FileStream fs = new FileStream(filePath, FileMode.CreateNew, FileAccess.Write, fileShare, bufferSize: 0, options))
{
Task[] writes = Enumerable.Range(0, 10).Select(index => WriteAsync(fs, content, index * 1000, 1000)).ToArray();

// the writes were not awaited, it's an anti-pattern and Position can be (0, fileSize) now:
Assert.InRange(fs.Position, 0, fileSize);

await Task.WhenAll(writes);

Assert.Equal(fileSize, fs.Position);
Assert.Equal(fileSize, fs.Length);
Assert.All(writes, write => Assert.True(write.IsCompletedSuccessfully));
}

AssertExtensions.SequenceEqual(content, await File.ReadAllBytesAsync(filePath));
}

public static IEnumerable<object[]> MemberData_FileStreamAsyncWriting()
{
foreach (bool useAsync in new[] { true, false })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,7 @@ internal void Complete(uint errorCode, uint numBytes)
OSFileStreamStrategy? strategy = _strategy;
ReleaseResources();

if (strategy is not null && _bufferSize != numBytes) // true only for incomplete operations
{
strategy.OnIncompleteOperation(_bufferSize, (int)numBytes);
}
strategy?.OnFinishedAsyncOperation(_bufferSize, (int)numBytes);

switch (errorCode)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private void ExecuteInternal()
break;
case Operation.Write:
RandomAccess.WriteAtOffset(_fileHandle, _singleSegment.Span, _fileOffset);
result = _singleSegment.Length;
break;
case Operation.ReadScatter:
Debug.Assert(_readScatterBuffers != null);
Expand All @@ -118,18 +119,7 @@ private void ExecuteInternal()
}
finally
{
if (_strategy is not null)
{
// WriteAtOffset returns void, so we need to fix position only in case of an exception
if (exception is not null)
{
_strategy.OnIncompleteOperation(_singleSegment.Length, 0);
}
else if (_operation == Operation.Read && result != _singleSegment.Length)
{
_strategy.OnIncompleteOperation(_singleSegment.Length, (int)result);
}
}
_strategy?.OnFinishedAsyncOperation(_singleSegment.Length, (int)result);

_operation = Operation.None;
_context = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorC
{
if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
{
strategy?.OnIncompleteOperation(buffer.Length, 0);
strategy?.OnFinishedAsyncOperation(buffer.Length, 0);
}
}

Expand Down Expand Up @@ -368,7 +368,7 @@ private static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorC
{
if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS)
{
strategy?.OnIncompleteOperation(buffer.Length, 0);
strategy?.OnFinishedAsyncOperation(buffer.Length, 0);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;
using Microsoft.Win32.SafeHandles;

namespace System.IO.Strategies
Expand All @@ -15,7 +14,8 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy
protected readonly SafeFileHandle _fileHandle; // only ever null if ctor throws
private readonly FileAccess _access; // What file was opened for.

protected long _filePosition;
protected long _filePosition; // updated after every operation finishes
private long _nextAsyncOperationOffset; // updated before async operation starts and after it finishes
private long _length = -1; // negative means that hasn't been fetched.
private long _appendStart; // When appending, prevent overwriting file.
private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing.
Expand Down Expand Up @@ -101,8 +101,15 @@ public unsafe sealed override long Length

// in case of concurrent incomplete reads, there can be multiple threads trying to update the position
// at the same time. That is why we are using Interlocked here.
internal void OnIncompleteOperation(int expectedBytesTransferred, int actualBytesTransferred)
=> Interlocked.Add(ref _filePosition, actualBytesTransferred - expectedBytesTransferred);
internal void OnFinishedAsyncOperation(int expectedBytesTransferred, int actualBytesTransferred)
{
if (actualBytesTransferred > 0)
{
_filePosition += actualBytesTransferred;
}

Interlocked.Add(ref _nextAsyncOperationOffset, -expectedBytesTransferred);
}

private bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached;

Expand Down Expand Up @@ -294,7 +301,7 @@ public sealed override Task WriteAsync(byte[] buffer, int offset, int count, Can

public sealed override ValueTask WriteAsync(ReadOnlyMemory<byte> source, CancellationToken cancellationToken)
{
long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1;
long writeOffset = CanSeek ? _filePosition + Interlocked.Add(ref _nextAsyncOperationOffset, source.Length) - source.Length : -1;
return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken, this);
}

Expand Down Expand Up @@ -324,7 +331,7 @@ public sealed override ValueTask<int> ReadAsync(Memory<byte> destination, Cancel
// This implementation updates the file position before the operation starts and updates it after incomplete read.
// This is done to keep backward compatibility for concurrent reads.
// It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time.
long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length;
long readOffset = _filePosition + Interlocked.Add(ref _nextAsyncOperationOffset, destination.Length) - destination.Length;
return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, readOffset, cancellationToken, this);
}
}
Expand Down