diff --git a/config_examples/mta-sts-daemon.yml.redis b/config_examples/mta-sts-daemon.yml.redis index d00a4be..f9f2615 100644 --- a/config_examples/mta-sts-daemon.yml.redis +++ b/config_examples/mta-sts-daemon.yml.redis @@ -5,9 +5,10 @@ shutdown_timeout: 20 cache: type: redis options: - address: "redis://127.0.0.1/0?timeout=5" - minsize: 5 - maxsize: 25 + url: "redis://127.0.0.1/0" + max_connections: 25 + socket_timeout: 1.0 + socket_connect_timeout: 1.0 default_zone: strict_testing: false timeout: 4 diff --git a/postfix_mta_sts_resolver/defaults.py b/postfix_mta_sts_resolver/defaults.py index ac3568b..0e35e97 100644 --- a/postfix_mta_sts_resolver/defaults.py +++ b/postfix_mta_sts_resolver/defaults.py @@ -11,6 +11,7 @@ INTERNAL_CACHE_SIZE = 10000 SQLITE_THREADS = cpu_count() SQLITE_TIMEOUT = 5 +REDIS_CONNECT_TIMEOUT = 5 REDIS_TIMEOUT = 5 CACHE_GRACE = 60 PROACTIVE_FETCH_ENABLED = False diff --git a/postfix_mta_sts_resolver/redis_cache.py b/postfix_mta_sts_resolver/redis_cache.py index ef25e48..ce6ed55 100644 --- a/postfix_mta_sts_resolver/redis_cache.py +++ b/postfix_mta_sts_resolver/redis_cache.py @@ -5,6 +5,8 @@ from . import defaults from .base_cache import BaseCache, CacheEntry +# Remove once fixed: https://github.com/aio-libs/aioredis-py/issues/1115 +aioredis.Redis.__del__ = lambda *args: None # type: ignore def pack_entry(entry): ts, pol_id, pol_body = entry # pylint: disable=invalid-name,unused-variable @@ -25,18 +27,20 @@ def unpack_entry(packed): class RedisCache(BaseCache): def __init__(self, **opts): self._opts = dict(opts) - self._opts['timeout'] = self._opts.get('timeout', - defaults.REDIS_TIMEOUT) - self._opts['encoding'] = None + self._opts['socket_timeout'] = self._opts.get('socket_timeout', + defaults.REDIS_TIMEOUT) + self._opts['socket_connect_timeout'] = self._opts.get( + 'socket_connect_timeout', defaults.REDIS_CONNECT_TIMEOUT) + self._opts['encoding'] = 'utf-8' self._pool = None async def setup(self): - self._pool = await aioredis.create_redis_pool(**self._opts) + self._pool = await aioredis.from_url(**self._opts) async def get(self, key): assert self._pool is not None key = key.encode('utf-8') - res = await self._pool.zrevrange(key, 0, 0, "WITHSCORES") + res = await self._pool.zrevrange(key, 0, 0, withscores=True) if not res: return None packed, ts = res[0] # pylint: disable=invalid-name @@ -50,10 +54,10 @@ async def set(self, key, value): key = key.encode('utf-8') # Write - pipe = self._pool.pipeline() - pipe.zadd(key, ts, packed) - pipe.zremrangebyrank(key, 0, -2) - await pipe.execute() + async with self._pool.pipeline(transaction=True) as pipe: + pipe.zadd(key, {packed: ts}) + pipe.zremrangebyrank(key, 0, -2) + await pipe.execute() async def scan(self, token, amount_hint): assert self._pool is not None @@ -83,5 +87,4 @@ async def set_proactive_fetch_ts(self, timestamp): async def teardown(self): assert self._pool is not None - self._pool.close() - await self._pool.wait_closed() + await self._pool.close() diff --git a/setup.py b/setup.py index 258d89a..b040dfd 100644 --- a/setup.py +++ b/setup.py @@ -26,7 +26,7 @@ ], extras_require={ 'sqlite': 'aiosqlite>=0.10.0', - 'redis': 'aioredis>=1.2.0,<2.0.0', + 'redis': 'aioredis>=2.0.0', 'dev': [ 'pytest>=3.0.0', 'pytest-cov', diff --git a/snapcraft.yaml b/snapcraft.yaml index c8b5651..35c7316 100644 --- a/snapcraft.yaml +++ b/snapcraft.yaml @@ -13,7 +13,7 @@ parts: python-version: python3 python-packages: - "aiosqlite>=0.10.0" - - "aioredis>=1.2.0,<2.0.0" + - "aioredis>=2.0.0" build-packages: - gcc - make diff --git a/tests/test_cache.py b/tests/test_cache.py index 360cbd2..2e165d4 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -13,7 +13,7 @@ async def setup_cache(cache_type, cache_opts): cache = utils.create_cache(cache_type, cache_opts) await cache.setup() if cache_type == 'redis': - cache._pool.flushdb() + await cache._pool.flushdb() return cache, tmpfile @pytest.mark.parametrize("cache_type,cache_opts,safe_set", [ @@ -21,8 +21,8 @@ async def setup_cache(cache_type, cache_opts): ("internal", {}, False), ("sqlite", {}, True), ("sqlite", {}, False), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, True), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, False) + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, True), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, False) ]) @pytest.mark.asyncio async def test_cache_lifecycle(cache_type, cache_opts, safe_set): @@ -46,7 +46,7 @@ async def test_cache_lifecycle(cache_type, cache_opts, safe_set): @pytest.mark.parametrize("cache_type,cache_opts", [ ("internal", {}), ("sqlite", {}), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}), ]) @pytest.mark.asyncio async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts): @@ -78,12 +78,12 @@ async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts): ("sqlite", {}, 3, 4), ("sqlite", {}, 0, 4), ("sqlite", {}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 1), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 2), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 3), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 4), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 0, 4), - ("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 1), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 2), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 3), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 4), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 0, 4), + ("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT), ]) @pytest.mark.timeout(10) @pytest.mark.asyncio