Skip to content

Commit

Permalink
Merge pull request #79 from bjoe2k4/master
Browse files Browse the repository at this point in the history
upgrade to aioredis v2
  • Loading branch information
Snawoot authored Dec 17, 2021
2 parents 049865e + d469c77 commit 2b98d33
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 26 deletions.
7 changes: 4 additions & 3 deletions config_examples/mta-sts-daemon.yml.redis
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@ shutdown_timeout: 20
cache:
type: redis
options:
address: "redis://127.0.0.1/0?timeout=5"
minsize: 5
maxsize: 25
url: "redis://127.0.0.1/0"
max_connections: 25
socket_timeout: 1.0
socket_connect_timeout: 1.0
default_zone:
strict_testing: false
timeout: 4
Expand Down
1 change: 1 addition & 0 deletions postfix_mta_sts_resolver/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
INTERNAL_CACHE_SIZE = 10000
SQLITE_THREADS = cpu_count()
SQLITE_TIMEOUT = 5
REDIS_CONNECT_TIMEOUT = 5
REDIS_TIMEOUT = 5
CACHE_GRACE = 60
PROACTIVE_FETCH_ENABLED = False
Expand Down
25 changes: 14 additions & 11 deletions postfix_mta_sts_resolver/redis_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from . import defaults
from .base_cache import BaseCache, CacheEntry

# Remove once fixed: https://github.com/aio-libs/aioredis-py/issues/1115
aioredis.Redis.__del__ = lambda *args: None # type: ignore

def pack_entry(entry):
ts, pol_id, pol_body = entry # pylint: disable=invalid-name,unused-variable
Expand All @@ -25,18 +27,20 @@ def unpack_entry(packed):
class RedisCache(BaseCache):
def __init__(self, **opts):
self._opts = dict(opts)
self._opts['timeout'] = self._opts.get('timeout',
defaults.REDIS_TIMEOUT)
self._opts['encoding'] = None
self._opts['socket_timeout'] = self._opts.get('socket_timeout',
defaults.REDIS_TIMEOUT)
self._opts['socket_connect_timeout'] = self._opts.get(
'socket_connect_timeout', defaults.REDIS_CONNECT_TIMEOUT)
self._opts['encoding'] = 'utf-8'
self._pool = None

async def setup(self):
self._pool = await aioredis.create_redis_pool(**self._opts)
self._pool = await aioredis.from_url(**self._opts)

async def get(self, key):
assert self._pool is not None
key = key.encode('utf-8')
res = await self._pool.zrevrange(key, 0, 0, "WITHSCORES")
res = await self._pool.zrevrange(key, 0, 0, withscores=True)
if not res:
return None
packed, ts = res[0] # pylint: disable=invalid-name
Expand All @@ -50,10 +54,10 @@ async def set(self, key, value):
key = key.encode('utf-8')

# Write
pipe = self._pool.pipeline()
pipe.zadd(key, ts, packed)
pipe.zremrangebyrank(key, 0, -2)
await pipe.execute()
async with self._pool.pipeline(transaction=True) as pipe:
pipe.zadd(key, {packed: ts})
pipe.zremrangebyrank(key, 0, -2)
await pipe.execute()

async def scan(self, token, amount_hint):
assert self._pool is not None
Expand Down Expand Up @@ -83,5 +87,4 @@ async def set_proactive_fetch_ts(self, timestamp):

async def teardown(self):
assert self._pool is not None
self._pool.close()
await self._pool.wait_closed()
await self._pool.close()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
],
extras_require={
'sqlite': 'aiosqlite>=0.10.0',
'redis': 'aioredis>=1.2.0,<2.0.0',
'redis': 'aioredis>=2.0.0',
'dev': [
'pytest>=3.0.0',
'pytest-cov',
Expand Down
2 changes: 1 addition & 1 deletion snapcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ parts:
python-version: python3
python-packages:
- "aiosqlite>=0.10.0"
- "aioredis>=1.2.0,<2.0.0"
- "aioredis>=2.0.0"
build-packages:
- gcc
- make
Expand Down
20 changes: 10 additions & 10 deletions tests/test_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,16 @@ async def setup_cache(cache_type, cache_opts):
cache = utils.create_cache(cache_type, cache_opts)
await cache.setup()
if cache_type == 'redis':
cache._pool.flushdb()
await cache._pool.flushdb()
return cache, tmpfile

@pytest.mark.parametrize("cache_type,cache_opts,safe_set", [
("internal", {}, True),
("internal", {}, False),
("sqlite", {}, True),
("sqlite", {}, False),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, True),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, False)
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, True),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, False)
])
@pytest.mark.asyncio
async def test_cache_lifecycle(cache_type, cache_opts, safe_set):
Expand All @@ -46,7 +46,7 @@ async def test_cache_lifecycle(cache_type, cache_opts, safe_set):
@pytest.mark.parametrize("cache_type,cache_opts", [
("internal", {}),
("sqlite", {}),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}),
])
@pytest.mark.asyncio
async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts):
Expand Down Expand Up @@ -78,12 +78,12 @@ async def test_proactive_fetch_ts_lifecycle(cache_type, cache_opts):
("sqlite", {}, 3, 4),
("sqlite", {}, 0, 4),
("sqlite", {}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 1),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 2),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 3),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 3, 4),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, 0, 4),
("redis", {"address": "redis://127.0.0.1/0?timeout=5"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 1),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 2),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 3),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 3, 4),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, 0, 4),
("redis", {"url": "redis://127.0.0.1/0?socket_timeout=5&socket_connect_timeout=5"}, constants.DOMAIN_QUEUE_LIMIT*2, constants.DOMAIN_QUEUE_LIMIT),
])
@pytest.mark.timeout(10)
@pytest.mark.asyncio
Expand Down

0 comments on commit 2b98d33

Please sign in to comment.