Skip to content

Commit

Permalink
Merge pull request #51 from clue-labs/timeout
Browse files Browse the repository at this point in the history
Add TimeoutConnector decorator
  • Loading branch information
clue committed Nov 19, 2016
2 parents 3bcd121 + 3242509 commit bd03a1c
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 1 deletion.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,31 @@ $secureConnector = new React\SocketClient\SecureConnector($dnsConnector, $loop,
));
```

### Connection timeouts

The `TimeoutConnector` class decorates any given `Connector` instance.
It provides the same `create()` method, but will automatically reject the
underlying connection attempt if it takes too long.

```php
$timeoutConnector = new React\SocketClient\TimeoutConnector($connector, 3.0, $loop);

$timeoutConnector->create('google.com', 80)->then(function (React\Stream\Stream $stream) {
// connection succeeded within 3.0 seconds
});
```

Pending connection attempts can be cancelled by cancelling its pending promise like so:

```php
$promise = $timeoutConnector->create($host, $port);

$promise->cancel();
```

Calling `cancel()` on a pending promise will cancel the underlying connection
attempt, abort the timer and reject the resulting promise.

### Unix domain sockets

Similarly, the `UnixConnector` class can be used to connect to Unix domain socket (UDS)
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
"react/dns": "0.4.*|0.3.*",
"react/event-loop": "0.4.*|0.3.*",
"react/stream": "0.4.*|0.3.*",
"react/promise": "^2.1 || ^1.2"
"react/promise": "^2.1 || ^1.2",
"react/promise-timer": "~1.0"
},
"autoload": {
"psr-4": {
Expand Down
50 changes: 50 additions & 0 deletions src/TimeoutConnector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
<?php

namespace React\SocketClient;

use React\SocketClient\ConnectorInterface;
use React\EventLoop\LoopInterface;
use React\Promise\Timer;
use React\Stream\Stream;
use React\Promise\Promise;
use React\Promise\CancellablePromiseInterface;

class TimeoutConnector implements ConnectorInterface
{
private $connector;
private $timeout;
private $loop;

public function __construct(ConnectorInterface $connector, $timeout, LoopInterface $loop)
{
$this->connector = $connector;
$this->timeout = $timeout;
$this->loop = $loop;
}

public function create($host, $port)
{
$promise = $this->connector->create($host, $port);

return Timer\timeout(new Promise(
function ($resolve, $reject) use ($promise) {
// resolve/reject with result of TCP/IP connection
$promise->then($resolve, $reject);
},
function ($_, $reject) use ($promise) {
// cancellation should reject connection attempt
$reject(new \RuntimeException('Connection attempt cancelled during connection'));

// forefully close TCP/IP connection if it completes despite cancellation
$promise->then(function (Stream $stream) {
$stream->close();
});

// (try to) cancel pending TCP/IP connection
if ($promise instanceof CancellablePromiseInterface) {
$promise->cancel();
}
}
), $this->timeout, $this->loop);
}
}
125 changes: 125 additions & 0 deletions tests/TimeoutConnectorTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
<?php

namespace React\Tests\SocketClient;

use React\SocketClient\TimeoutConnector;
use React\Promise;
use React\EventLoop\Factory;

class TimeoutConnectorTest extends TestCase
{
public function testRejectsOnTimeout()
{
$promise = new Promise\Promise(function () { });

$connector = $this->getMock('React\SocketClient\ConnectorInterface');
$connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise));

$loop = Factory::create();

$timeout = new TimeoutConnector($connector, 0.01, $loop);

$timeout->create('google.com', 80)->then(
$this->expectCallableNever(),
$this->expectCallableOnce()
);

$loop->run();
}

public function testRejectsWhenConnectorRejects()
{
$promise = Promise\reject(new \RuntimeException());

$connector = $this->getMock('React\SocketClient\ConnectorInterface');
$connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise));

$loop = Factory::create();

$timeout = new TimeoutConnector($connector, 5.0, $loop);

$timeout->create('google.com', 80)->then(
$this->expectCallableNever(),
$this->expectCallableOnce()
);

$loop->run();
}

public function testResolvesWhenConnectorResolves()
{
$promise = Promise\resolve();

$connector = $this->getMock('React\SocketClient\ConnectorInterface');
$connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise));

$loop = Factory::create();

$timeout = new TimeoutConnector($connector, 5.0, $loop);

$timeout->create('google.com', 80)->then(
$this->expectCallableOnce(),
$this->expectCallableNever()
);

$loop->run();
}

public function testRejectsAndCancelsPendingPromiseOnTimeout()
{
$promise = new Promise\Promise(function () { }, $this->expectCallableOnce());

$connector = $this->getMock('React\SocketClient\ConnectorInterface');
$connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise));

$loop = Factory::create();

$timeout = new TimeoutConnector($connector, 0.01, $loop);

$timeout->create('google.com', 80)->then(
$this->expectCallableNever(),
$this->expectCallableOnce()
);

$loop->run();
}

public function testCancelsPendingPromiseOnCancel()
{
$promise = new Promise\Promise(function () { }, $this->expectCallableOnce());

$connector = $this->getMock('React\SocketClient\ConnectorInterface');
$connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise));

$loop = Factory::create();

$timeout = new TimeoutConnector($connector, 0.01, $loop);

$out = $timeout->create('google.com', 80);
$out->cancel();

$out->then($this->expectCallableNever(), $this->expectCallableOnce());
}

public function testCancelClosesStreamIfTcpResolvesDespiteCancellation()
{
$stream = $this->getMockBuilder('React\Stream\Stream')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
$stream->expects($this->once())->method('close');

$promise = new Promise\Promise(function () { }, function ($resolve) use ($stream) {
$resolve($stream);
});

$connector = $this->getMock('React\SocketClient\ConnectorInterface');
$connector->expects($this->once())->method('create')->with('google.com', 80)->will($this->returnValue($promise));

$loop = Factory::create();

$timeout = new TimeoutConnector($connector, 0.01, $loop);

$out = $timeout->create('google.com', 80);
$out->cancel();

$out->then($this->expectCallableNever(), $this->expectCallableOnce());
}
}

0 comments on commit bd03a1c

Please sign in to comment.