Skip to content

Commit

Permalink
Add example to main documentation (#8)
Browse files Browse the repository at this point in the history
* Improve README.md

* Improve README.md

* Update README.md
  • Loading branch information
ismailsimsek authored Jan 30, 2025
1 parent 9a26d4c commit c4a8822
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 59 deletions.
83 changes: 36 additions & 47 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,76 +20,65 @@ pip install https://github.com/memiiso/pydbzengine/archive/master.zip --upgrade

## How to Use

A python Example:
First install the packages, `pip install pydbzengine[dev]`

```python
from typing import List
from pydbzengine import ChangeEvent, BasePythonChangeHandler
from pydbzengine import Properties, DebeziumJsonEngine


class TestChangeHandler(BasePythonChangeHandler):
class PrintChangeHandler(BasePythonChangeHandler):
"""
An example implementation of a handler class, where the data received from java is processed.
A custom change event handler class.
This class processes batches of Debezium change events received from the engine.
The `handleJsonBatch` method is where you implement your logic for consuming
and processing these events. Currently, it prints basic information about
each event to the console.
"""

def handleJsonBatch(self, records: List[ChangeEvent]):
"""
Handles a batch of Debezium change events.
This method is called by the Debezium engine with a list of ChangeEvent objects.
Change this method to implement your desired processing logic. For example,
you might parse the event data, transform it, and load it into a database or
other destination.
Args:
records: A list of ChangeEvent objects representing the changes captured by Debezium.
"""
print(f"Received {len(records)} records")
print(f"Record 1 table: {records[0].destination()}")
print(f"Record 1 key: {records[0].key()}")
print(f"Record 1 value: {records[0].value()}")
for record in records:
print(f"destination: {record.destination()}")
print(f"key: {record.key()}")
print(f"value: {record.value()}")
print("--------------------------------------")
# @NOTE ..... your code goes here .....
# @NOTE ..... process the data, for-example read it into pandas and save to destination etc. .....


if __name__ == '__main__':
props = Properties()
props.setProperty("name", "engine")
props.setProperty("snapshot.mode", "initial_only")
# ..... add further Debezium config properties .....
# Add further Debezium connector configuration properties here. For example:
# props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector")
# props.setProperty("database.hostname", "your_database_host")
# props.setProperty("database.port", "3306")

# Create a DebeziumJsonEngine instance, passing the configuration properties and the custom change event handler.
engine = DebeziumJsonEngine(properties=props, handler=PrintChangeHandler())

# pass the config and then handler class we created above to the DebeziumJsonEngine
engine = DebeziumJsonEngine(properties=props, handler=TestChangeHandler())
# start consuming the event
# Start the Debezium engine to begin consuming and processing change events.
engine.run()
```

Above code outputs logs like below

```asciidoc
2025-01-28 17:59:11,375 [INFO] [main] org.apache.kafka.connect.json.JsonConverterConfig (AbstractConfig.java:371) - JsonConverterConfig values:
converter.type = key
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false

2025-01-28 17:59:11,378 [INFO] [main] org.apache.kafka.connect.json.JsonConverterConfig (AbstractConfig.java:371) - JsonConverterConfig values:
converter.type = value
decimal.format = BASE64
replace.null.with.default = true
schemas.cache.size = 1000
schemas.enable = false
......further debezium logs.........

2025-01-28 17:59:11,909 [INFO] [pool-4-thread-1] io.debezium.relational.RelationalSnapshotChangeEventSource (RelationalSnapshotChangeEventSource.java:660) - Finished exporting 9 records for table 'inventory.products' (4 of 5 tables); total duration '00:00:00.003'
2025-01-28 17:59:11,909 [INFO] [pool-4-thread-1] io.debezium.relational.RelationalSnapshotChangeEventSource (RelationalSnapshotChangeEventSource.java:614) - Exporting data from table 'inventory.products_on_hand' (5 of 5 tables)

Received 2 records
Record 1 table: testc.inventory.orders
Record 1 key: {"id":10004}
Record 1 value: {"id":10004,"order_date":16852,"purchaser":1003,"quantity":1,"product_id":107,"__deleted":"false","__op":"r","__table":"orders","__source_ts_ms":1738083551906,"__ts_ms":1738083551905}
--------------------------------------
Received 2 records
Record 1 table: testc.inventory.products
Record 1 key: {"id":102}
Record 1 value: {"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"__deleted":"false","__op":"r","__table":"products","__source_ts_ms":1738083551906,"__ts_ms":1738083551909}
--------------------------------------

......further debezium logs.........
```
#### How to consume events with dlt
For dlt example please see [dlt_consuming.py](pydbzengine/examples/dlt_consuming.py)

https://github.com/memiiso/pydbzengine/blob/main/pydbzengine/examples/dlt_consuming.py#L92-L160


### Contributors

Expand Down
55 changes: 43 additions & 12 deletions pydbzengine/examples/dlt_consuming.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,33 +90,64 @@ def debezium_engine_props(sourcedb: DbPostgresql):
return props

def main():
# start PG container which is used as replication source
"""
Demonstrates capturing change data from PostgreSQL using Debezium and loading
it into DuckDB using dlt.
This example starts a PostgreSQL container, configures Debezium to capture changes,
processes the change events with a custom handler using dlt, and finally queries
the DuckDB database to display the loaded data.
"""

# Start the PostgreSQL container that will serve as the replication source.
sourcedb = DbPostgresql()
sourcedb.start()
# get debezium engine configuration Properties

# Get Debezium engine configuration properties, including connection details
# for the PostgreSQL database. This function debezium_engine_props returns all the properties
props = debezium_engine_props(sourcedb=sourcedb)
# create dlt pipeline to consume events to duckdb

# Create a dlt pipeline to load the change events into DuckDB.
dlt_pipeline = dlt.pipeline(
pipeline_name="dbz_cdc_events_example",
destination="duckdb",
dataset_name="dbz_data"
)
# create handler class, which will process generated debezium events wih dlt

# Instantiate change event handler (DltChangeHandler) that uses the dlt pipeline
# to process and load the Debezium events. This handler has
# the logic for transforming and loading the events.
handler = DltChangeHandler(dlt_pipeline=dlt_pipeline)
# give the config and the handler class to the DebeziumJsonEngine

# Create a DebeziumJsonEngine instance, providing the configuration properties
# and the custom event handler.
engine = DebeziumJsonEngine(properties=props, handler=handler)
# run the engine async then interrupt after timeout seconds, to test the result!

# Run the Debezium engine asynchronously with a timeout. This allows the example
# to run for a limited time and then terminate automatically.
Utils.run_engine_async(engine=engine, timeout_sec=60)
# engine.run()
# engine.run() # This would be used for synchronous execution (without timeout)

# ================ PRINT THE CONSUMED DATA ===========================
# ================ PRINT THE CONSUMED DATA FROM DUCKDB ===========================
# Connect to the DuckDB database.
con = duckdb.connect(DUCKDB_FILE.as_posix())

# Retrieve a list of all tables in the DuckDB database.
result = con.sql("SHOW ALL TABLES").fetchall()

# Iterate through the tables and display the data from tables within the 'dbz_data' schema.
for r in result:
database, schema, table = r[:3]
if schema == "dbz_data":
con.sql(f"select * from {database}.{schema}.{table}").show()
database, schema, table = r[:3] # Extract database, schema, and table names.
if schema == "dbz_data": # Only show data from the schema where Debezium loaded the data.
print(f"Data in table {table}:")
con.sql(f"select * from {database}.{schema}.{table}").show() # Display table data


if __name__ == "__main__":
""" First run `pip install pydbzengine[dev]` """
"""
Main entry point for the script.
Before running, ensure you have installed the necessary dependencies:
`pip install pydbzengine[dev]`
"""
main()

0 comments on commit c4a8822

Please sign in to comment.