diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..554b297 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,24 @@ +language: python + +python: + - 3.6 + +matrix: + include: + - python: 3.6 + +install: + - pip install -U setuptools codecov + - pip install -r requirements-dev.txt + - pip freeze + +script: + - python setup.py check -rms + - flake8 --ignore=E501 asyncpool setup.py + - python3 -m coverage run --omit=tests/* -m unittest discover -s tests + - python3 -m coverage html + +after_success: + codecov + +cache: pip diff --git a/asyncpool/__init__.py b/asyncpool/__init__.py index 8898b4a..fede50e 100644 --- a/asyncpool/__init__.py +++ b/asyncpool/__init__.py @@ -1,13 +1,16 @@ import asyncio -from datetime import datetime, timezone +from datetime import datetime, timezone, timedelta + def utc_now(): # utcnow returns a naive datetime, so we have to set the timezone manually return datetime.utcnow().replace(tzinfo=timezone.utc) + class Terminator: pass + class AsyncPool: def __init__(self, loop, num_workers: int, name: str, logger, worker_co, load_factor: int = 1, job_accept_duration: int = None, max_task_time: int = None, return_futures: bool = False, @@ -41,7 +44,7 @@ def __init__(self, loop, num_workers: int, name: str, logger, worker_co, load_fa self._queue = asyncio.Queue(num_workers * load_factor) self._workers = None self._exceptions = False - self._job_accept_duration = job_accept_duration + self._job_accept_duration = timedelta(seconds=job_accept_duration) if job_accept_duration is not None else None self._first_push_dt = None self._max_task_time = max_task_time self._return_futures = return_futures diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..4cbdd01 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,5 @@ +-e . +asynctest==0.12.0 +coverage==4.5.1 +flake8==3.5.0 +docutils==0.14 \ No newline at end of file diff --git a/setup.py b/setup.py index be3467b..56dbe5d 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,4 @@ from setuptools import setup -from setuptools.command.test import test as TestCommand -import os import sys py_version = sys.version_info[:2] @@ -13,6 +11,14 @@ efficiently and with explicit timeouts. """ + +def my_test_suite(): + import asynctest + test_loader = asynctest.TestLoader() + test_suite = test_loader.discover('tests') + return test_suite + + setup( name='asyncpool', version='1.0', @@ -23,8 +29,8 @@ long_description=long_description, packages=['asyncpool'], include_package_data=True, - license = "MIT", - classifiers = [ + license="MIT", + classifiers=[ "License :: OSI Approved :: MIT License", "Topic :: Internet :: WWW/HTTP", "Topic :: Software Development :: Testing", @@ -35,4 +41,5 @@ 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', ], -) \ No newline at end of file + test_suite='setup.my_test_suite', +) diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..24eec24 --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +import asynctest +import logging +import asyncio + +from asyncpool import AsyncPool + + +class WorkPoolTestCases(asynctest.TestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + logging.basicConfig(level=logging.INFO) + self._logger = logging.Logger('') + self._evt_wait = None + + async def setUp(self): + self._evt_wait = asyncio.Event() + + async def tearDown(self): + self._evt_wait.set() # unblock any blocked workers + + async def test_worker_limit(self): + num_called = 0 + + evt_hit = asyncio.Event() + + async def worker(param): + nonlocal num_called + num_called += 1 + assert param == 5 + evt_hit.set() + await self._evt_wait.wait() + + async with AsyncPool(None, 5, '', self._logger, worker) as wq: + # tests that worker limit/load factor of 1 works correctly + for _ in range(10): # five workers plus 5 in queue + await asyncio.wait_for(wq.push(5), 1) + + self.assertEqual(num_called, 5) + + with self.assertRaises(asyncio.TimeoutError): + # with load_factor==1, and all workers stuck we should timeout + await asyncio.wait_for(wq.push(5), 1) + + self.assertEqual(wq.total_queued, 10) + + # unblock workers + self._evt_wait.set() + await asyncio.sleep(1) # clear the workers + + evt_hit.clear() + + await asyncio.wait_for(wq.push(5), 1) + await asyncio.wait_for(evt_hit.wait(), 1) + self.assertEqual(num_called, 11) + self.assertFalse(wq.exceptions) + + async def test_load_factor(self): + async def worker(param): + await self._evt_wait.wait() + + async with AsyncPool(None, 5, '', self._logger, worker, 2) as wq: + for _ in range(15): # 5 in-flight, + 10 in queue per load factor + await asyncio.wait_for(wq.push(5), 1) + + with self.assertRaises(asyncio.TimeoutError): + # with load_factor==1, and all workers stuck we should timeout + await asyncio.wait_for(wq.push(5), 1) + + # unblock workers + self._evt_wait.set() + await asyncio.sleep(1) # let them clear the queue + + await asyncio.wait_for(wq.push(5), 1) + self.assertFalse(wq.exceptions) + + async def test_task_timeout(self): + async def worker(param): + await self._evt_wait.wait() + + async with AsyncPool(None, 5, '', self._logger, worker, max_task_time=1, return_futures=True) as wq: + fut = await asyncio.wait_for(wq.push(5), 1) + + for i in range(5): + await asyncio.sleep(1) + + if fut.done(): + e = fut.exception() + self.assertIsInstance(e, asyncio.TimeoutError) + self.assertTrue(wq.exceptions) + return + + self.fail('future did not time out') + + async def test_join(self): + key = 'blah' + + async def worker(param): + await asyncio.sleep(1) # wait a sec before returning result + return param + + async with AsyncPool(None, 5, '', self._logger, worker, return_futures=True) as wq: + fut = await asyncio.wait_for(wq.push('blah'), 0.1) + self.assertFalse(fut.done()) + + self.assertTrue(fut.done()) + result = fut.result() + self.assertEqual(result, key) + + +if __name__ == '__main__': + asynctest.main()