Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Stream ReadAtLeast and ReadExactly #69272

Merged
merged 16 commits into from
May 20, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal bool TryGetNextHeader(Stream archiveStream, bool copyData)
Span<byte> buffer = rented.AsSpan(0, TarHelpers.RecordSize); // minimumLength means the array could've been larger
buffer.Clear(); // Rented arrays aren't clean

TarHelpers.ReadOrThrow(archiveStream, buffer);
archiveStream.ReadExactly(buffer);

try
{
Expand Down Expand Up @@ -486,10 +486,7 @@ private void ReadExtendedAttributesBlock(Stream archiveStream)
}

byte[] buffer = new byte[(int)_size];
if (archiveStream.Read(buffer.AsSpan()) != _size)
{
throw new EndOfStreamException();
}
archiveStream.ReadExactly(buffer);

string dataAsString = TarHelpers.GetTrimmedUtf8String(buffer);

Expand Down Expand Up @@ -520,11 +517,7 @@ private void ReadGnuLongPathDataBlock(Stream archiveStream)
}

byte[] buffer = new byte[(int)_size];

if (archiveStream.Read(buffer.AsSpan()) != _size)
{
throw new EndOfStreamException();
}
archiveStream.ReadExactly(buffer);

string longPath = TarHelpers.GetTrimmedUtf8String(buffer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,6 @@ private static string GetTrimmedString(ReadOnlySpan<byte> buffer, Encoding encod
// removing the trailing null or space chars.
internal static string GetTrimmedUtf8String(ReadOnlySpan<byte> buffer) => GetTrimmedString(buffer, Encoding.UTF8);

// Reads the specified number of bytes and stores it in the byte buffer passed by reference.
// Throws if end of stream is reached.
internal static void ReadOrThrow(Stream archiveStream, Span<byte> buffer)
{
int totalRead = 0;
while (totalRead < buffer.Length)
{
int bytesRead = archiveStream.Read(buffer.Slice(totalRead));
if (bytesRead == 0)
{
throw new EndOfStreamException();
}
totalRead += bytesRead;
}
}

// Returns true if it successfully converts the specified string to a DateTimeOffset, false otherwise.
internal static bool TryConvertToDateTimeOffset(string value, out DateTimeOffset timestamp)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,13 @@ internal static Encoding GetEncoding(string text)
/// </summary>
internal static void ReadBytes(Stream stream, byte[] buffer, int bytesToRead)
{
int bytesLeftToRead = bytesToRead;
if (bytesToRead <= 0)
return;

int totalBytesRead = 0;

while (bytesLeftToRead > 0)
int bytesRead = stream.ReadAtLeast(buffer.AsSpan(0, bytesToRead), bytesToRead, throwOnEndOfStream: false);
if (bytesRead < bytesToRead)
{
int bytesRead = stream.Read(buffer, totalBytesRead, bytesLeftToRead);
if (bytesRead == 0) throw new IOException(SR.UnexpectedEndOfStream);

totalBytesRead += bytesRead;
bytesLeftToRead -= bytesRead;
throw new IOException(SR.UnexpectedEndOfStream);
eerhardt marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down
274 changes: 274 additions & 0 deletions src/libraries/System.IO/tests/Stream/Stream.ReadAtLeast.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace System.IO.Tests
{
public class Stream_ReadAtLeast
{
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task DelegatesToRead_Success(bool async)
{
bool readInvoked = false;
var s = new DelegateStream(
canReadFunc: () => true,
readFunc: (array, offset, count) =>
{
readInvoked = true;
Assert.NotNull(array);
Assert.Equal(0, offset);
Assert.Equal(30, count);

for (int i = 0; i < 10; i++) array[offset + i] = (byte)i;
return 10;
});

byte[] buffer = new byte[30];

Assert.Equal(10, async ? await s.ReadAtLeastAsync(buffer, 10) : s.ReadAtLeast(buffer, 10));
Assert.True(readInvoked);
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
for (int i = 10; i < 30; i++) Assert.Equal(0, buffer[i]);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ReadMoreThanOnePage(bool async)
{
int readInvokedCount = 0;
var s = new DelegateStream(
canReadFunc: () => true,
readFunc: (array, offset, count) =>
{
readInvokedCount++;

for (int i = 0; i < 10; i++) array[offset + i] = (byte)i;
return 10;
});

byte[] buffer = new byte[30];

Assert.Equal(20, async ? await s.ReadAtLeastAsync(buffer, 20) : s.ReadAtLeast(buffer, 20));
Assert.Equal(2, readInvokedCount);
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
for (int i = 10; i < 20; i++) Assert.Equal(i - 10, buffer[i]);
for (int i = 20; i < 30; i++) Assert.Equal(0, buffer[i]);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ReadMoreThanMinimumBytes(bool async)
{
int readInvokedCount = 0;
var s = new DelegateStream(
canReadFunc: () => true,
readFunc: (array, offset, count) =>
{
readInvokedCount++;

int byteCount = Math.Min(count, 10);
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
return byteCount;
});

// first try with a buffer that doesn't fill 3 full pages
byte[] buffer = new byte[28];

Assert.Equal(28, async ? await s.ReadAtLeastAsync(buffer, 22) : s.ReadAtLeast(buffer, 22));
Assert.Equal(3, readInvokedCount);
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
for (int i = 10; i < 20; i++) Assert.Equal(i - 10, buffer[i]);
for (int i = 20; i < 28; i++) Assert.Equal(i - 20, buffer[i]);

// now try with a buffer that is bigger than 3 pages
readInvokedCount = 0;
buffer = new byte[32];

Assert.Equal(30, async ? await s.ReadAtLeastAsync(buffer, 22) : s.ReadAtLeast(buffer, 22));
Assert.Equal(3, readInvokedCount);
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
for (int i = 10; i < 20; i++) Assert.Equal(i - 10, buffer[i]);
for (int i = 20; i < 30; i++) Assert.Equal(i - 20, buffer[i]);
for (int i = 30; i < 32; i++) Assert.Equal(0, buffer[i]);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ReadAtLeastZero(bool async)
{
int readInvokedCount = 0;
var s = new DelegateStream(
canReadFunc: () => true,
readFunc: (array, offset, count) =>
{
readInvokedCount++;

int byteCount = Math.Min(count, 10);
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
return byteCount;
});

byte[] buffer = new byte[20];
if (async)
{
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(buffer, 0));
}
else
{
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(buffer, 0));
}
Assert.Equal(0, readInvokedCount);

// now try with an empty buffer
byte[] emptyBuffer = Array.Empty<byte>();
if (async)
{
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(emptyBuffer, 0));
}
else
{
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(emptyBuffer, 0));
}
Assert.Equal(0, readInvokedCount);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task NegativeMinimumBytes(bool async)
{
int readInvokedCount = 0;
var s = new DelegateStream(
canReadFunc: () => true,
readFunc: (array, offset, count) =>
{
readInvokedCount++;

int byteCount = Math.Min(count, 10);
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
return byteCount;
});

byte[] buffer = new byte[10];
if (async)
{
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(buffer, -1));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(buffer, -10));
}
else
{
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(buffer, -1));
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(buffer, -10));
}
Assert.Equal(0, readInvokedCount);
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task BufferSmallerThanMinimumBytes(bool async)
{
int readInvokedCount = 0;
var s = new DelegateStream(
canReadFunc: () => true,
readFunc: (array, offset, count) =>
{
readInvokedCount++;

int byteCount = Math.Min(count, 10);
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
return byteCount;
});

byte[] buffer = new byte[20];
byte[] emptyBuffer = Array.Empty<byte>();
if (async)
{
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(buffer, 21));
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(async () => await s.ReadAtLeastAsync(emptyBuffer, 1));
}
else
{
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(buffer, 21));
Assert.Throws<ArgumentOutOfRangeException>(() => s.ReadAtLeast(emptyBuffer, 1));
}
}

[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task HandleEndOfStream(bool async)
{
int readInvokedCount = 0;
var s = new DelegateStream(
canReadFunc: () => true,
readFunc: (array, offset, count) =>
{
readInvokedCount++;

if (readInvokedCount == 1)
{
int byteCount = Math.Min(count, 10);
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
return byteCount;
}
else
{
return 0;
}
});

byte[] buffer = new byte[20];
if (async)
{
await Assert.ThrowsAsync<EndOfStreamException>(async () => await s.ReadAtLeastAsync(buffer, 11));
}
else
{
Assert.Throws<EndOfStreamException>(() => s.ReadAtLeast(buffer, 11));
}
Assert.Equal(2, readInvokedCount);

readInvokedCount = 0;

Assert.Equal(10, async ? await s.ReadAtLeastAsync(buffer, 11, throwOnEndOfStream: false) : s.ReadAtLeast(buffer, 11, throwOnEndOfStream: false));
for (int i = 0; i < 10; i++) Assert.Equal(i, buffer[i]);
for (int i = 10; i < 20; i++) Assert.Equal(0, buffer[i]);
Assert.Equal(2, readInvokedCount);
}

[Fact]
public async Task CancellationTokenIsPassedThrough()
{
int readInvokedCount = 0;
var s = new DelegateStream(
canReadFunc: () => true,
readAsyncFunc: (array, offset, count, cancellationToken) =>
{
readInvokedCount++;
cancellationToken.ThrowIfCancellationRequested();

int byteCount = Math.Min(count, 10);
for (int i = 0; i < byteCount; i++) array[offset + i] = (byte)i;
return Task.FromResult(10);
});

byte[] buffer = new byte[20];

using CancellationTokenSource cts = new CancellationTokenSource();
CancellationToken token = cts.Token;
cts.Cancel();

await Assert.ThrowsAsync<OperationCanceledException>(async () => await s.ReadAtLeastAsync(buffer, 10, cancellationToken: token));
Assert.Equal(1, readInvokedCount);
}
}
}
Loading