Skip to content

Commit

Permalink
Merge pull request #20 from Snawoot/aiosqlite_upd
Browse files Browse the repository at this point in the history
sqlite pool: switch to proper connection creation from aiosqlite==0.10.0
  • Loading branch information
Snawoot authored Apr 8, 2019
2 parents ed961c3 + aa5d603 commit 34d7c31
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 32 deletions.
63 changes: 32 additions & 31 deletions postfix_mta_sts_resolver/sqlite_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@ def __init__(self, threads, conn_args=(), conn_kwargs={}, init_queries=()):
self._stopped = False

async def _new_conn(self):
cm = aiosqlite.connect(*self._conn_args, **self._conn_kwargs)
c = await cm.__aenter__()
free = partial(cm.__aexit__, None, None, None)
db = await aiosqlite.connect(*self._conn_args, **self._conn_kwargs)
try:
for q in self._init_queries:
await c.execute(q)
async with db.cursor() as cur:
for q in self._init_queries:
await cur.execute(q)
except:
await free()
await db.close()
raise
return free, c
return db

async def prepare(self):
for _ in range(self._threads):
Expand All @@ -41,25 +40,26 @@ async def stop(self):
self._stopped = True
try:
while True:
free, db = self._free_conns.get_nowait()
await free()
db = self._free_conns.get_nowait()
await db.close()
except asyncio.QueueEmpty:
pass

def borrow(self, timeout=None):
#assert self._ready
if not self._ready:
raise RuntimeError("Pool not prepared!")
class PoolBorrow:
async def __aenter__(s):
s._conn = await asyncio.wait_for(self._free_conns.get(),
timeout)
return s._conn[1]
return s._conn

async def __aexit__(s, exc_type, exc, tb):
if self._stopped:
await s._conn[0]()
await s._conn.close()
return
if exc_type is not None:
await s._conn[0]()
await s._conn.close()
s._conn = await self._new_conn()
self._free_conns.put_nowait(s._conn)
return PoolBorrow()
Expand Down Expand Up @@ -92,16 +92,17 @@ async def setup(self):
"create unique index if not exists sts_policy_domain on sts_policy_cache (domain)",
"create index if not exists sts_policy_domain_ts on sts_policy_cache (domain, ts)",
]
async with self._pool.borrow(self._timeout) as db:
for q in queries:
await db.execute(q)
await db.commit()
async with self._pool.borrow(self._timeout) as conn:
async with conn.cursor() as cur:
for q in queries:
await cur.execute(q)
await conn.commit()

async def get(self, key):
async with self._pool.borrow(self._timeout) as db:
async with db.execute('select ts, pol_id, pol_body from '
'sts_policy_cache where domain=?',
(key,)) as cur:
async with self._pool.borrow(self._timeout) as conn:
async with conn.execute('select ts, pol_id, pol_body from '
'sts_policy_cache where domain=?',
(key,)) as cur:
res = await cur.fetchone()
if res is not None:
ts, pol_id, pol_body = res
Expand All @@ -114,18 +115,18 @@ async def get(self, key):
async def set(self, key, value):
ts, pol_id, pol_body = value
pol_body = json.dumps(pol_body)
async with self._pool.borrow(self._timeout) as db:
async with self._pool.borrow(self._timeout) as conn:
try:
await db.execute('insert into sts_policy_cache (domain, ts, '
'pol_id, pol_body) values (?, ?, ?, ?)',
(key, int(ts), pol_id, pol_body))
await db.commit()
await conn.execute('insert into sts_policy_cache (domain, ts, '
'pol_id, pol_body) values (?, ?, ?, ?)',
(key, int(ts), pol_id, pol_body))
await conn.commit()
except sqlite3.IntegrityError:
await db.execute('update sts_policy_cache set ts = ?, '
'pol_id = ?, pol_body = ? where domain = ? '
'and ts < ?',
(int(ts), pol_id, pol_body, key, int(ts)))
await db.commit()
await conn.execute('update sts_policy_cache set ts = ?, '
'pol_id = ?, pol_body = ? where domain = ? '
'and ts < ?',
(int(ts), pol_id, pol_body, key, int(ts)))
await conn.commit()

async def teardown(self):
await self._pool.stop()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
'aiodns>=1.1.1',
'aiohttp>=3.4.4',
'PyYAML>=3.12',
'aiosqlite>=0.9.0',
'aiosqlite>=0.10.0',
'aioredis>=1.2.0',
'sdnotify>=0.3.2',
],
Expand Down

0 comments on commit 34d7c31

Please sign in to comment.