Skip to content

Commit

Permalink
Added receive_timeout timeout for websocket to receive complete messa…
Browse files Browse the repository at this point in the history
…ge. #1024 #1325
  • Loading branch information
Nikolay Kim committed Jan 31, 2017
1 parent b3c80ee commit a69ea90
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 41 deletions.
4 changes: 3 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ CHANGES
1.3.0 (XXXX-XX-XX)
------------------

- separate read + connect + request timeouts # 1523
- Separate read + connect + request timeouts # 1523

- Fix polls demo run application #1487

Expand All @@ -15,6 +15,8 @@ CHANGES
- Do not use readline when reading the content of a part
in the multipart reader #1535

- Added `receive_timeout` timeout for websocket to receive complete message. #1024 #1325

- Remove `web.Application` dependency from `web.UrlDispatcher` #1510

- Accepting back-pressure from slow websocket clients #1367
Expand Down
19 changes: 16 additions & 3 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from .client_ws import ClientWebSocketResponse
from .cookiejar import CookieJar
from .errors import WSServerHandshakeError
from .helpers import Timeout
from .helpers import Timeout, TimeService

__all__ = ('ClientSession', 'request', 'get', 'options', 'head',
'delete', 'post', 'put', 'patch', 'ws_connect')
Expand Down Expand Up @@ -55,7 +55,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
response_class=ClientResponse,
ws_response_class=ClientWebSocketResponse,
version=aiohttp.HttpVersion11,
cookie_jar=None, read_timeout=None):
cookie_jar=None, read_timeout=None, time_service=None):

if connector is None:
connector = aiohttp.TCPConnector(loop=loop)
Expand Down Expand Up @@ -107,6 +107,10 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._request_class = request_class
self._response_class = response_class
self._ws_response_class = ws_response_class
self._time_service = (
time_service
if time_service is not None
else TimeService(self._loop))

def __del__(self, _warnings=warnings):
if not self.closed:
Expand All @@ -120,6 +124,10 @@ def __del__(self, _warnings=warnings):
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)

@property
def time_service(self):
return self._time_service

def request(self, method, url, **kwargs):
"""Perform HTTP request."""
return _RequestContextManager(self._request(method, url, **kwargs))
Expand Down Expand Up @@ -278,6 +286,7 @@ def _request(self, method, url, *,
def ws_connect(self, url, *,
protocols=(),
timeout=10.0,
receive_timeout=None,
autoclose=True,
autoping=True,
auth=None,
Expand All @@ -290,6 +299,7 @@ def ws_connect(self, url, *,
self._ws_connect(url,
protocols=protocols,
timeout=timeout,
receive_timeout=receive_timeout,
autoclose=autoclose,
autoping=autoping,
auth=auth,
Expand All @@ -302,6 +312,7 @@ def ws_connect(self, url, *,
def _ws_connect(self, url, *,
protocols=(),
timeout=10.0,
receive_timeout=None,
autoclose=True,
autoping=True,
auth=None,
Expand Down Expand Up @@ -394,7 +405,9 @@ def _ws_connect(self, url, *,
timeout,
autoclose,
autoping,
self._loop)
self._loop,
time_service=self.time_service,
receive_timeout=receive_timeout)

def _prepare_headers(self, headers):
""" Add default headers and transform it to CIMultiDict
Expand Down
23 changes: 14 additions & 9 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,20 @@
class ClientWebSocketResponse:

def __init__(self, reader, writer, protocol,
response, timeout, autoclose, autoping, loop):
response, timeout, autoclose, autoping, loop, *,
time_service=None, receive_timeout=None):
self._response = response
self._conn = response.connection

self._writer = writer
self._reader = reader
self._protocol = protocol
self._time_service = time_service
self._closed = False
self._closing = False
self._close_code = None
self._timeout = timeout
self._receive_timeout = receive_timeout
self._autoclose = autoclose
self._autoping = autoping
self._loop = loop
Expand Down Expand Up @@ -115,7 +118,7 @@ def close(self, *, code=1000, message=b''):
return False

@asyncio.coroutine
def receive(self):
def receive(self, timeout=None):
if self._waiting:
raise RuntimeError('Concurrent call to receive() is not allowed')

Expand All @@ -126,7 +129,9 @@ def receive(self):
return CLOSED_MESSAGE

try:
msg = yield from self._reader.read()
with self._time_service.timeout(
timeout or self._receive_timeout):
msg = yield from self._reader.read()
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except WebSocketError as exc:
Expand Down Expand Up @@ -156,26 +161,26 @@ def receive(self):
self._waiting = False

@asyncio.coroutine
def receive_str(self):
msg = yield from self.receive()
def receive_str(self, *, timeout=None):
msg = yield from self.receive(timeout)
if msg.type != WSMsgType.TEXT:
raise TypeError(
"Received message {}:{!r} is not str".format(msg.type,
msg.data))
return msg.data

@asyncio.coroutine
def receive_bytes(self):
msg = yield from self.receive()
def receive_bytes(self, *, timeout=None):
msg = yield from self.receive(timeout)
if msg.type != WSMsgType.BINARY:
raise TypeError(
"Received message {}:{!r} is not bytes".format(msg.type,
msg.data))
return msg.data

@asyncio.coroutine
def receive_json(self, *, loads=json.loads):
data = yield from self.receive_str()
def receive_json(self, *, loads=json.loads, timeout=None):
data = yield from self.receive_str(timeout=timeout)
return loads(data)

if PY_35:
Expand Down
6 changes: 2 additions & 4 deletions aiohttp/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,6 @@ class LowresTimeout:
""" Low resolution timeout context manager """

def __init__(self, timeout, time_service, loop):
assert timeout is not None

self._loop = loop
self._timeout = timeout
self._time_service = time_service
Expand All @@ -712,14 +710,14 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self._task = None

if exc_type is asyncio.CancelledError and self._cancelled:
self._cancel_handler = None
self._task = None
raise asyncio.TimeoutError from None
if self._timeout is not None:
self._cancel_handler.cancel()
self._cancel_handler = None
self._task = None

def _cancel_task(self):
self._cancelled = self._task.cancel()
Expand Down
8 changes: 8 additions & 0 deletions aiohttp/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import unittest
from abc import ABC, abstractmethod
from contextlib import contextmanager
from unittest import mock

from multidict import CIMultiDict
Expand Down Expand Up @@ -513,6 +514,13 @@ def make_mocked_request(method, path, headers=None, *,
time_service.time.return_value = 12345
time_service.strtime.return_value = "Tue, 15 Nov 1994 08:12:31 GMT"

@contextmanager
def timeout(*args, **kw):
yield

time_service.timeout = mock.Mock()
time_service.timeout.side_effect = timeout

task = mock.Mock()

req = Request(message, payload,
Expand Down
25 changes: 16 additions & 9 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ def __bool__(self):
class WebSocketResponse(StreamResponse):

def __init__(self, *,
timeout=10.0, autoclose=True, autoping=True, protocols=()):
timeout=10.0, receive_timeout=None,
autoclose=True, autoping=True, protocols=()):
super().__init__(status=101)
self._protocols = protocols
self._protocol = None
Expand All @@ -46,8 +47,10 @@ def __init__(self, *,
self._waiting = False
self._exception = None
self._timeout = timeout
self._receive_timeout = receive_timeout
self._autoclose = autoclose
self._autoping = autoping
self._time_service = None

@asyncio.coroutine
def prepare(self, request):
Expand Down Expand Up @@ -75,6 +78,8 @@ def _pre_start(self, request):
else: # pragma: no cover
raise HTTPInternalServerError() from err

self._time_service = request.time_service

if self.status != status:
self.set_status(status)
for k, v in headers:
Expand Down Expand Up @@ -224,7 +229,7 @@ def close(self, *, code=1000, message=b''):
return False

@asyncio.coroutine
def receive(self):
def receive(self, timeout=None):
if self._reader is None:
raise RuntimeError('Call .prepare() first')
if self._waiting:
Expand All @@ -240,7 +245,9 @@ def receive(self):
return CLOSED_MESSAGE

try:
msg = yield from self._reader.read()
with self._time_service.timeout(
timeout or self._receive_timeout):
msg = yield from self._reader.read()
except (asyncio.CancelledError, asyncio.TimeoutError):
raise
except WebSocketError as exc:
Expand Down Expand Up @@ -281,26 +288,26 @@ def receive_msg(self):
return (yield from self.receive())

@asyncio.coroutine
def receive_str(self):
msg = yield from self.receive()
def receive_str(self, *, timeout=None):
msg = yield from self.receive(timeout)
if msg.type != WSMsgType.TEXT:
raise TypeError(
"Received message {}:{!r} is not str".format(msg.type,
msg.data))
return msg.data

@asyncio.coroutine
def receive_bytes(self):
msg = yield from self.receive()
def receive_bytes(self, *, timeout=None):
msg = yield from self.receive(timeout)
if msg.type != WSMsgType.BINARY:
raise TypeError(
"Received message {}:{!r} is not bytes".format(msg.type,
msg.data))
return msg.data

@asyncio.coroutine
def receive_json(self, *, loads=json.loads):
data = yield from self.receive_str()
def receive_json(self, *, loads=json.loads, timeout=None):
data = yield from self.receive_str(timeout=timeout)
return loads(data)

def write(self, data):
Expand Down
6 changes: 5 additions & 1 deletion docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ The client session supports the context manager protocol for self closing.
URLs may be either :class:`str` or :class:`~yarl.URL`

.. comethod:: ws_connect(url, *, protocols=(), timeout=10.0,\
receive_timeout=None,\
auth=None,\
autoclose=True,\
autoping=True,\
Expand All @@ -401,8 +402,11 @@ The client session supports the context manager protocol for self closing.

:param tuple protocols: Websocket protocols

:param float timeout: Timeout for websocket read. 10 seconds by default
:param float timeout: Timeout for websocket to close. 10 seconds by default

:param float receive_timeout: Timeout for websocket to receive complete message.
None(unlimited) seconds by default

:param aiohttp.BasicAuth auth: an object that represents HTTP
Basic Authorization (optional)

Expand Down
Loading

0 comments on commit a69ea90

Please sign in to comment.