Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgmq-python: explicitly cast types #365

Merged
merged 4 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/pgmq_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
- "tembo-pgmq-python/tembo_pgmq_python/**"
- "tembo-pgmq-python/tests/**"
- "tembo-pgmq-python/pyproject.toml"
- "pgmq-extension/**"
push:
branches:
- main
Expand All @@ -22,7 +23,7 @@ on:
- "tembo-pgmq-python/tembo_pgmq_python/**"
- "tembo-pgmq-python/tests/**"
- "tembo-pgmq-python/pyproject.toml"

- "pgmq-extension/**"
jobs:
lints:
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion tembo-pgmq-python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "tembo-pgmq-python"
version = "0.9.1"
version = "0.9.2"
description = "Python client for the PGMQ Postgres extension."
authors = ["Adam Hendel <[email protected]>"]
license = "PostgreSQL"
Expand Down
10 changes: 5 additions & 5 deletions tembo-pgmq-python/tembo_pgmq_python/async_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> in
async def _send_internal(self, queue, message, delay, conn):
self.logger.debug(f"Sending message to queue '{queue}' with delay={delay}")
result = await conn.fetchrow(
"SELECT * FROM pgmq.send($1, $2::jsonb, $3);",
"SELECT * FROM pgmq.send($1::text, $2::jsonb, $3::integer);",
queue,
dumps(message).decode("utf-8"),
delay,
Expand All @@ -186,11 +186,11 @@ async def send_batch(self, queue: str, messages: List[dict], delay: int = 0, con
else:
return await self._send_batch_internal(queue, messages, delay, conn)

async def _send_batch_internal(self, queue, messages, delay, conn):
async def _send_batch_internal(self, queue: str, messages: List[dict], delay: int, conn):
self.logger.debug(f"Sending batch of messages to queue '{queue}' with delay={delay}")
jsonb_array = [dumps(message).decode("utf-8") for message in messages]
result = await conn.fetch(
"SELECT * FROM pgmq.send_batch($1, $2::jsonb[], $3);",
"SELECT * FROM pgmq.send_batch($1::text, $2::jsonb[], $3::integer);",
queue,
jsonb_array,
delay,
Expand All @@ -213,7 +213,7 @@ async def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optiona
async def _read_internal(self, queue, vt, batch_size, conn):
self.logger.debug(f"Reading message from queue '{queue}' with vt={vt}")
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1, $2, $3);",
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down Expand Up @@ -246,7 +246,7 @@ async def read_batch(
async def _read_batch_internal(self, queue, vt, batch_size, conn):
self.logger.debug(f"Reading batch of messages from queue '{queue}' with vt={vt}")
rows = await conn.fetch(
"SELECT * FROM pgmq.read($1, $2, $3);",
"SELECT * FROM pgmq.read($1::text, $2::integer, $3::integer);",
queue,
vt or self.vt,
batch_size,
Expand Down
10 changes: 5 additions & 5 deletions tembo-pgmq-python/tembo_pgmq_python/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,15 @@ def list_queues(self, conn=None) -> List[str]:
def send(self, queue: str, message: dict, delay: int = 0, conn=None) -> int:
"""Send a message to a queue."""
self.logger.debug(f"send called with conn: {conn}")
query = "select * from pgmq.send(%s, %s, %s);"
query = "select * from pgmq.send(%s::text, %s::jsonb, %s::integer);"
result = self._execute_query_with_result(query, [queue, Jsonb(message), delay], conn=conn)
return result[0][0]

@transaction
def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None) -> List[int]:
"""Send a batch of messages to a queue."""
self.logger.debug(f"send_batch called with conn: {conn}")
query = "select * from pgmq.send_batch(%s, %s, %s);"
query = "select * from pgmq.send_batch(%s::text, %s::jsonb[], %s::integer);"
params = [queue, [Jsonb(message) for message in messages], delay]
result = self._execute_query_with_result(query, params, conn=conn)
return [message[0] for message in result]
Expand All @@ -133,7 +133,7 @@ def send_batch(self, queue: str, messages: List[dict], delay: int = 0, conn=None
def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Message]:
"""Read a message from a queue."""
self.logger.debug(f"read called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, 1], conn=conn)
messages = [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
return messages[0] if messages else None
Expand All @@ -142,7 +142,7 @@ def read(self, queue: str, vt: Optional[int] = None, conn=None) -> Optional[Mess
def read_batch(self, queue: str, vt: Optional[int] = None, batch_size=1, conn=None) -> Optional[List[Message]]:
"""Read a batch of messages from a queue."""
self.logger.debug(f"read_batch called with conn: {conn}")
query = "select * from pgmq.read(%s, %s, %s);"
query = "select * from pgmq.read(%s::text, %s::integer, %s::integer);"
rows = self._execute_query_with_result(query, [queue, vt or self.vt, batch_size], conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]

Expand All @@ -158,7 +158,7 @@ def read_with_poll(
) -> Optional[List[Message]]:
"""Read messages from a queue with polling."""
self.logger.debug(f"read_with_poll called with conn: {conn}")
query = "select * from pgmq.read_with_poll(%s, %s, %s, %s, %s);"
query = "select * from pgmq.read_with_poll(%s::text, %s::integer, %s::integer, %s::integer, %s::integer);"
params = [queue, vt or self.vt, qty, max_poll_seconds, poll_interval_ms]
rows = self._execute_query_with_result(query, params, conn=conn)
return [Message(msg_id=x[0], read_ct=x[1], enqueued_at=x[2], vt=x[3], message=x[4]) for x in rows]
Expand Down
Loading