diff --git a/config_examples/mta-sts-daemon.yml.redis_sentinel b/config_examples/mta-sts-daemon.yml.redis_sentinel new file mode 100644 index 0000000..d13c644 --- /dev/null +++ b/config_examples/mta-sts-daemon.yml.redis_sentinel @@ -0,0 +1,23 @@ +host: 127.0.0.1 +port: 8461 +reuse_port: true +shutdown_timeout: 20 +cache: + type: redis_sentinel + options: + sentinel_master_name: "mymaster" + sentinels: + - ["sentinel1", 26379] + - ["sentinel2", 26379] + - ["sentinel3", 26379] + password: "StrongPassword" + max_connections: 25 + socket_timeout: 1.0 + socket_connect_timeout: 1.0 +default_zone: + strict_testing: false + timeout: 4 +zones: + myzone: + strict_testing: false + timeout: 4 diff --git a/postfix_mta_sts_resolver/redis_cache.py b/postfix_mta_sts_resolver/redis_cache.py index 4ec2c35..4644cb3 100644 --- a/postfix_mta_sts_resolver/redis_cache.py +++ b/postfix_mta_sts_resolver/redis_cache.py @@ -87,3 +87,75 @@ async def set_proactive_fetch_ts(self, timestamp): async def teardown(self): assert self._pool is not None await self._pool.close() + +class RedisSentinelCache(BaseCache): + def __init__(self, **opts): + self._opts = dict(opts) + self._opts['socket_timeout'] = self._opts.get( + 'socket_timeout',defaults.REDIS_TIMEOUT + ) + self._opts['socket_connect_timeout'] = self._opts.get( + 'socket_connect_timeout', defaults.REDIS_CONNECT_TIMEOUT + ) + self._opts['encoding'] = 'utf-8' + self._pool = None + + async def setup(self): + sentinel = aioredis.sentinel.Sentinel(self._opts['sentinels']) + sentinel_master_name = self._opts['sentinel_master_name'] + for key in ['sentinels', 'sentinel_master_name']: + self._opts.pop(key) + opts = dict((k,v) for k, v in self._opts.items()) + self._pool = sentinel.master_for(sentinel_master_name, **opts) + + async def get(self, key): + assert self._pool is not None + key = key.encode('utf-8') + res = await self._pool.zrevrange(key, 0, 0, withscores=True) + if not res: + return None + packed, ts = res[0] # pylint: disable=invalid-name + entry = unpack_entry(packed) + return CacheEntry(ts=ts, pol_id=entry.pol_id, pol_body=entry.pol_body) + + async def set(self, key, value): + assert self._pool is not None + packed = pack_entry(value) + ts = value.ts # pylint: disable=invalid-name + key = key.encode('utf-8') + + # Write + async with self._pool.pipeline(transaction=True) as pipe: + pipe.zadd(key, {packed: ts}) + pipe.zremrangebyrank(key, 0, -2) + await pipe.execute() + + async def scan(self, token, amount_hint): + assert self._pool is not None + if token is None: + token = b'0' + + new_token, keys = await self._pool.scan(cursor=token, count=amount_hint) + if not new_token: + new_token = None + + result = [] + for key in keys: + key = key.decode('utf-8') + if key != '_metadata': + result.append((key, await self.get(key))) + return new_token, result + + async def get_proactive_fetch_ts(self): + assert self._pool is not None + val = await self._pool.hget('_metadata', 'proactive_fetch_ts') + return 0 if not val else float(val.decode('utf-8')) + + async def set_proactive_fetch_ts(self, timestamp): + assert self._pool is not None + val = str(timestamp).encode('utf-8') + await self._pool.hset('_metadata', 'proactive_fetch_ts', val) + + async def teardown(self): + assert self._pool is not None + await self._pool.close() diff --git a/postfix_mta_sts_resolver/utils.py b/postfix_mta_sts_resolver/utils.py index 81d9054..761ea34 100644 --- a/postfix_mta_sts_resolver/utils.py +++ b/postfix_mta_sts_resolver/utils.py @@ -227,6 +227,10 @@ def create_cache(cache_type, options): # pylint: disable=import-outside-toplevel from . import redis_cache cache = redis_cache.RedisCache(**options) + elif cache_type == "redis_sentinel": + # pylint: disable=import-outside-toplevel + from . import redis_cache + cache = redis_cache.RedisSentinelCache(**options) else: raise NotImplementedError("Unsupported cache type!") return cache