Moves data from PostGreSQL database table to a Apache Kafka topic.
This may come in handy when you need to have RDBMS transactions that both modify the database and send messages to Apache Kafka.
Given a PostGreSQL table with the following structure:
- an
id
field as a primary key - an arbitrary
VARCHAR
field containing what you are expecting to send to Kafka - a trigger that notifies inserts using a PostGreSQL channel
When Push The Elephant is in execution, all the data in the above table is moved to a Kafka topic.
This allows you to write projects that both changes your PostGreSQL data and send Kafka messages in a transactional context.
All you have to do is write a row in the above table within your transaction.
The tools comes with a command line interface:
Push the Elephant 0.0.2
Mirko Bonasorte <[email protected]>
Moves data from a PostgreSQL table to Kafka topic using LISTEN/NOTIFY mechanisms
USAGE:
pte [OPTIONS]
FLAGS:
-h, --help Prints help information
-V, --version Prints version information
OPTIONS:
-b, --buffer-size <BUFFER_SIZE>
Kafka buffer size after which messages are written (default: 100)
-z, --channel-name <CHANNEL_NAME> PostGreSQL channel name (default: events.activity)
-c, --column-name <COLUMN_NAME> PostGreSQL Table column name (default: payload)
-k, --kafka-urls <PG_URL> Kafka URLs (default: localhost:9092)
-l, --log4rs-configuration <LOG4RS_CONFIGURATION> Log4rs YAML configuration file
-x, --notify-timeout <NOTIFY_TIMEOUT> PostGreSQL Listen timeout (ms, default: 3000)
-X, --notify-timeout-total <NOTIFY_TIMEOUT_TOTAL>
Timeout after which rows are processed using a standard query (ms, default: 60000)
-p, --pgurl <PG_URL> PostGreSQL URL (default: postgres://postgres@localhost:5433)
-t, --table-name <TABLE_NAME> PostGreSQL Table name (default: events)
-w, --topic-name <TOPIC_NAME> Kafka topic name (default: events)
-y, --yaml-file <YAML_FILE>
YAML file with the following structure:
configurations:
-
pgurl: a_postgresql_url
buffer_size: 12345
notify_timeout: 67890
kafka_brokers:
- kafka_broker1
- kafka_broker2
- ...
The notify-timeout
defines how much time the tool has to wait before it can flush data to Kafka.
The notify-timeout-total
defines how much time the tool has to wait before it can fallback to a standard SQL query to fetch the data to be moved to Kafka.
The buffer-size
define how many messages are to be collected before flushing data to Kafka.
The --yaml-file
option can not be specified together with the other parameters, except the --log4s-configuration
param.
You can safely use the tool as part of your Rust project, as follows:
use push_the_elephant;
push_the_elephant::WorkerBuilder::default()
.pgurl("postgres://push_the_elephant:push_the_elephant@localhost:5432/push_the_elephant")
.kafka_brokers(vec!["kafka.foo.com:9092".to_string()])
.table_name("events")
.column_name("payload")
.channel("events.activity")
.build().unwrap().run().unwrap();
The following script contains an example of a trigger that intercepts all the inserts into the events
table and sends such rows to the events.activity
PostGreSQL channel.
begin;
create or replace function tg_notify_events ()
returns trigger
language plpgsql
as $$
declare
channel text := TG_ARGV[0];
begin
PERFORM (
with new_row(id, payload) as (select NEW.id, NEW.payload)
select pg_notify(channel, row_to_json(new_row)::text)
from new_row
);
RETURN NULL;
end;
$$;
CREATE TRIGGER notify_events
AFTER INSERT
ON events
FOR EACH ROW
EXECUTE PROCEDURE tg_notify_events('events.activity');
commit;