Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for redis-sentinel #95

Merged
merged 5 commits into from
Feb 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions config_examples/mta-sts-daemon.yml.redis_sentinel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
host: 127.0.0.1
port: 8461
reuse_port: true
shutdown_timeout: 20
cache:
type: redis_sentinel
options:
sentinel_master_name: "mymaster"
sentinels:
- ["sentinel1", 26379]
- ["sentinel2", 26379]
- ["sentinel3", 26379]
password: "StrongPassword"
max_connections: 25
socket_timeout: 1.0
socket_connect_timeout: 1.0
default_zone:
strict_testing: false
timeout: 4
zones:
myzone:
strict_testing: false
timeout: 4
72 changes: 72 additions & 0 deletions postfix_mta_sts_resolver/redis_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,75 @@ async def set_proactive_fetch_ts(self, timestamp):
async def teardown(self):
assert self._pool is not None
await self._pool.close()

class RedisSentinelCache(BaseCache):
def __init__(self, **opts):
self._opts = dict(opts)
self._opts['socket_timeout'] = self._opts.get(
'socket_timeout',defaults.REDIS_TIMEOUT
)
self._opts['socket_connect_timeout'] = self._opts.get(
'socket_connect_timeout', defaults.REDIS_CONNECT_TIMEOUT
)
self._opts['encoding'] = 'utf-8'
self._pool = None

async def setup(self):
sentinel = aioredis.sentinel.Sentinel(self._opts['sentinels'])
sentinel_master_name = self._opts['sentinel_master_name']
for key in ['sentinels', 'sentinel_master_name']:
self._opts.pop(key)
opts = dict((k,v) for k, v in self._opts.items())
self._pool = sentinel.master_for(sentinel_master_name, **opts)

async def get(self, key):
assert self._pool is not None
key = key.encode('utf-8')
res = await self._pool.zrevrange(key, 0, 0, withscores=True)
if not res:
return None
packed, ts = res[0] # pylint: disable=invalid-name
entry = unpack_entry(packed)
return CacheEntry(ts=ts, pol_id=entry.pol_id, pol_body=entry.pol_body)

async def set(self, key, value):
assert self._pool is not None
packed = pack_entry(value)
ts = value.ts # pylint: disable=invalid-name
key = key.encode('utf-8')

# Write
async with self._pool.pipeline(transaction=True) as pipe:
pipe.zadd(key, {packed: ts})
pipe.zremrangebyrank(key, 0, -2)
await pipe.execute()

async def scan(self, token, amount_hint):
assert self._pool is not None
if token is None:
token = b'0'

new_token, keys = await self._pool.scan(cursor=token, count=amount_hint)
if not new_token:
new_token = None

result = []
for key in keys:
key = key.decode('utf-8')
if key != '_metadata':
result.append((key, await self.get(key)))
return new_token, result

async def get_proactive_fetch_ts(self):
assert self._pool is not None
val = await self._pool.hget('_metadata', 'proactive_fetch_ts')
return 0 if not val else float(val.decode('utf-8'))

async def set_proactive_fetch_ts(self, timestamp):
assert self._pool is not None
val = str(timestamp).encode('utf-8')
await self._pool.hset('_metadata', 'proactive_fetch_ts', val)

async def teardown(self):
assert self._pool is not None
await self._pool.close()
4 changes: 4 additions & 0 deletions postfix_mta_sts_resolver/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,10 @@ def create_cache(cache_type, options):
# pylint: disable=import-outside-toplevel
from . import redis_cache
cache = redis_cache.RedisCache(**options)
elif cache_type == "redis_sentinel":
# pylint: disable=import-outside-toplevel
from . import redis_cache
cache = redis_cache.RedisSentinelCache(**options)
else:
raise NotImplementedError("Unsupported cache type!")
return cache
Expand Down