Skip to content

Commit

Permalink
Add support for network streams (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
brecht-vermeersch authored Feb 7, 2025
1 parent 5a57560 commit d076bea
Show file tree
Hide file tree
Showing 7 changed files with 186 additions and 27 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4

- name: Setup node for the mock server
uses: actions/setup-node@v4

- name: Install PHP with extensions
uses: shivammathur/setup-php@v2
with:
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@
"guzzlehttp/guzzle": "^7.8"
},
"require-dev": {
"guzzlehttp/test-server": "^0.1.0",
"laravel/pint": "1.20.0",
"phpbench/phpbench": "^1.3",
"phpstan/phpstan": "^2.0",
"phpstan/phpstan-strict-rules": "^2.0",
"phpstan/phpstan-deprecation-rules": "^2.0",
"phpstan/phpstan-strict-rules": "^2.0",
"phpunit/phpunit": "^10.5"
},
"config": {
Expand Down
19 changes: 18 additions & 1 deletion src/Blob/BlobClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public function upload($content, ?UploadBlobOptions $options = null): void
$options = new UploadBlobOptions();
}

$content = StreamUtils::streamFor($content);
$content = $this->createUploadStream($content, $options);

if ($content->getSize() === null || ! $content->isSeekable()) {
$this->uploadInSequentialBlocks($content, $options);
Expand All @@ -147,6 +147,23 @@ public function upload($content, ?UploadBlobOptions $options = null): void
}
}

/**
* @param string|resource|StreamInterface $content
*/
private function createUploadStream($content, UploadBlobOptions $options): StreamInterface
{
if ($content instanceof StreamInterface) {
$content = $content->detach();
}

// fix network streams only reading 8KB chunks
if (is_resource($content)) {
stream_set_chunk_size($content, $options->maximumTransferSize);
}

return StreamUtils::streamFor($content);
}

private function uploadSingle(StreamInterface $content, UploadBlobOptions $options): void
{
try {
Expand Down
21 changes: 0 additions & 21 deletions tests/Blob/BlobFeatureTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use AzureOss\Storage\Blob\BlobServiceClient;
use AzureOss\Storage\Blob\Helpers\BlobUriParserHelper;
use GuzzleHttp\Psr7\Utils;
use PHPUnit\Framework\TestCase;

abstract class BlobFeatureTestCase extends TestCase
Expand Down Expand Up @@ -40,26 +39,6 @@ protected function cleanContainer(string $containerName): void
}
}

protected function withFile(int $size, callable $callable): void
{
$path = sys_get_temp_dir() . '/azure-oss-test-file';

unlink($path);
$resource = Utils::streamFor(Utils::tryFopen($path, 'w'));

$chunk = 1000;
while ($size > 0) {
$chunkContent = str_pad('', min($chunk, $size));
$resource->write($chunkContent);
$size -= $chunk;
}
$resource->close();

$callable(Utils::streamFor(Utils::tryFopen($path, 'r')));

unlink($path);
}

protected function markTestSkippedWhenUsingSimulator(): void
{
if (BlobUriParserHelper::isDevelopmentUri($this->serviceClient->uri)) {
Expand Down
9 changes: 5 additions & 4 deletions tests/Blob/Feature/BlobClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use AzureOss\Storage\Blob\Sas\BlobSasBuilder;
use AzureOss\Storage\Blob\Sas\BlobSasPermissions;
use AzureOss\Storage\Tests\Blob\BlobFeatureTestCase;
use AzureOss\Storage\Tests\Utils\FileFactory;
use GuzzleHttp\Psr7\NoSeekStream;
use GuzzleHttp\Psr7\StreamDecoratorTrait;
use PHPUnit\Framework\Attributes\Test;
Expand Down Expand Up @@ -165,7 +166,7 @@ public function exists_works_throws_if_container_doesnt_exist(): void
#[Test]
public function upload_works_with_single_upload(): void
{
$this->withFile(1000, function (StreamInterface $file) {
FileFactory::withStream(1000, function (StreamInterface $file) {
$beforeUploadContent = $file->getContents();
$file->rewind();

Expand All @@ -185,7 +186,7 @@ public function upload_works_with_single_upload(): void
#[Test]
public function upload_works_with_parallel_upload(): void
{
$this->withFile(1000, function (StreamInterface $file) {
FileFactory::withStream(1000, function (StreamInterface $file) {
$beforeUploadContent = $file->getContents();
$file->rewind();

Expand All @@ -206,7 +207,7 @@ public function upload_works_with_parallel_upload(): void
#[Test]
public function upload_works_with_unknown_sized_stream(): void
{
$this->withFile(1000, function (StreamInterface $file) {
FileFactory::withStream(1000, function (StreamInterface $file) {
$stream = new class ($file) implements StreamInterface {
use StreamDecoratorTrait;

Expand Down Expand Up @@ -236,7 +237,7 @@ public function getSize(): ?int
#[Test]
public function upload_works_with_non_seekable_stream(): void
{
$this->withFile(1000, function (StreamInterface $file) {
FileFactory::withStream(1000, function (StreamInterface $file) {
$stream = new NoSeekStream($file);

$beforeUploadContent = $file->getContents();
Expand Down
128 changes: 128 additions & 0 deletions tests/Blob/Feature/MockBlobClientTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
<?php

declare(strict_types=1);

namespace AzureOss\Storage\Tests\Blob\Feature;

use AzureOss\Storage\Blob\BlobClient;
use AzureOss\Storage\Blob\BlobServiceClient;
use AzureOss\Storage\Blob\Models\UploadBlobOptions;
use AzureOss\Storage\Tests\Utils\FileFactory;
use GuzzleHttp\Psr7\Response;
use GuzzleHttp\Psr7\StreamDecoratorTrait;
use GuzzleHttp\Psr7\Uri;
use GuzzleHttp\Server\Server;
use PHPUnit\Framework\Attributes\Test;
use PHPUnit\Framework\TestCase;
use Psr\Http\Message\StreamInterface;

class MockBlobClientTest extends TestCase
{
private BlobClient $mockBlobClient;

protected function setUp(): void
{
Server::start();

/** @phpstan-ignore-next-line */
$uri = new Uri(Server::$url . '/devstoreaccount1');
$mockServiceClient = new BlobServiceClient($uri);
$mockContainerClient = $mockServiceClient->getContainerClient('test');
$this->mockBlobClient = $mockContainerClient->getBlobClient('test');
}

protected function tearDown(): void
{
Server::stop();
}

#[Test]
public function upload_single_sends_correct_amount_of_requests(): void
{
$this->expectNotToPerformAssertions();

Server::enqueue([
new Response(200), // only one request
new Response(501), // fail if more requests
]);

FileFactory::withStream(1000, function (StreamInterface $file) {
$this->mockBlobClient->upload($file, new UploadBlobOptions("text/plain", initialTransferSize: 2000));
});
}

#[Test]
public function upload_parallel_blocks_sends_correct_amount_of_requests(): void
{
$this->expectNotToPerformAssertions(); // should not throw because of the 501

Server::enqueue([
...array_fill(0, 11, new Response(200)), // 10 chunks + 1 commit request
new Response(501), // fail if more requests
]);

FileFactory::withStream(50_000_000, function (StreamInterface $file) {
$this->mockBlobClient->upload($file, new UploadBlobOptions("text/plain", initialTransferSize: 0, maximumTransferSize: 5_000_000));
});
}

#[Test]
public function upload_parallel_blocks_sends_correct_amount_of_requests_for_small_files(): void
{
$this->expectNotToPerformAssertions(); // should not throw because of the 501

Server::enqueue([
...array_fill(0, 2, new Response(200)), // 1 chunks + 1 commit request
new Response(501), // fail if more requests
]);

FileFactory::withStream(50_000, function (StreamInterface $file) {
$this->mockBlobClient->upload($file, new UploadBlobOptions("text/plain", initialTransferSize: 0, maximumTransferSize: 8_000_000));
});
}

#[Test]
public function upload_sequential_blocks_sends_correct_amount_of_requests(): void
{
$this->expectNotToPerformAssertions(); // should not throw because of the 501

Server::enqueue([
...array_fill(0, 11, new Response(200)), // 10 chunks + 1 commit request
new Response(501), // fail if more requests
]);

FileFactory::withStream(50_000_000, function (StreamInterface $file) {
$stream = new class ($file) implements StreamInterface {
use StreamDecoratorTrait;

public function getSize(): ?int
{
return null;
}
};

$this->mockBlobClient->upload($stream, new UploadBlobOptions("text/plain", initialTransferSize: 0, maximumTransferSize: 5_000_000));
});
}

#[Test]
public function upload_parallel_blocks_sends_correct_amount_of_requests_with_a_network_request(): void
{
$this->expectNotToPerformAssertions(); // should not throw because of the 501

Server::enqueue([
new Response(200, body: str_repeat('X', 50_000_000)), // stream for fopen
...array_fill(0, 20, new Response(200)), // with network streams some chunks in the beginning are smaller. It should be less than 20 requests still.
new Response(501), // fail if more requests
]);

/** @phpstan-ignore-next-line */
$stream = fopen(Server::$url, 'r');

if ($stream === false) {
self::fail();
}

$this->mockBlobClient->upload($stream, new UploadBlobOptions("text/plain", initialTransferSize: 0, maximumTransferSize: 5_000_000));
}
}
30 changes: 30 additions & 0 deletions tests/Utils/FileFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace AzureOss\Storage\Tests\Utils;

use GuzzleHttp\Psr7\Utils;

final class FileFactory
{
public static function withStream(int $size, callable $callable): void
{
$path = sys_get_temp_dir() . '/azure-oss-test-file';

unlink($path);
$resource = Utils::streamFor(Utils::tryFopen($path, 'w'));

$chunk = 1000;
while ($size > 0) {
$chunkContent = str_pad('', min($chunk, $size));
$resource->write($chunkContent);
$size -= $chunk;
}
$resource->close();

$callable(Utils::streamFor(Utils::tryFopen($path, 'r')));

unlink($path);
}
}

0 comments on commit d076bea

Please sign in to comment.