Skip to content

tembo-io/pg_auto_dw

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

56 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

pg_auto_dw

Static Badge OSSRank Warning: Under Active Development

Overview

pg_auto_dw is a permissively-licensed open-source Postgres Extension that automates the creation of a Postgres-based data warehouse, given one or more transactional Postgres database inputs.

We aim to do this within a structured environment that incorporates best practices and harnesses the capabilities of Large Language Models (LLM) technologies.

We are starting with automation to facilitate a data vault implementation for our data warehouse. This will be a rudimentary raw vault setup, but we hope it will lead to substantial downstream business models.

Goals

  • Automate the DW Build
  • Automate DW Maintenance
  • Understand DW Health
  • Support Data Governance

These capabilities will be delivered through a small set of Postgres functions.

Walkthrough

Setup

  1. Install extension

    DROP EXTENSION IF EXISTS pg_auto_dw CASCADE;
    CREATE EXTENSION pg_auto_dw;

    Installing this extension installs a couple source sample tables in the PUBLIC SCHEMA as well as the PG_CRYPTO extension.

  2. Restart your Postgres instance.

  3. Create a destination schema

    Choose a name for a schema for your data warehouse to be built into.

    DROP SCHEMA IF EXISTS my_dw CASCADE;
    CREATE SCHEMA my_dw;
  4. Reload configuration

    SELECT pg_reload_conf();
  5. Confirm setup

    SHOW pg_auto_dw.database_name;
    SHOW pg_auto_dw.dw_schema;

    This should return postgres and the schema name you selected.

  6. Set your LLM and reload configuration

    ALTER SYSTEM SET pg_auto_dw.model TO 'gpt-4o';
    ALTER SYSTEM SET pg_auto_dw.transformer_server_type TO 'openai';
    ALTER SYSTEM SET pg_auto_dw.transformer_server_url TO 'https://api.openai.com/v1/chat/completions';
    ALTER SYSTEM SET pg_auto_dw.transformer_server_token TO 'xxx';
    SELECT pg_reload_conf();

Load sample data

DROP TABLE IF EXISTS public.seller;
CREATE TABLE public.seller (
    seller_id UUID PRIMARY KEY,  -- Designating seller_id as the primary key
    city VARCHAR(255),
    state CHAR(2),
    zip_5 VARCHAR(10)
);

INSERT INTO public.seller (seller_id, city, state, zip_5) VALUES
('9449f25aeaf531019b76999ea49a6949','rio de janeiro','RJ','21040'),
('9bc484c87d79cd4874e05ca182658045','sao paulo','SP','02422'),
('3442f8959a84dea7ee197c632cb2df15','campinas','SP','13023'),
('d149de2f383552baea37a7198c2296ce','sao paulo','SP','04193'),
('c747d5b92c7648417faea95d36d763e8','pacatuba','CE','61800'),
('455f46ef09a9e45667e2981df84b5cc2','sorocaba','SP','18052'),
('8ff38bc3969e67c36c48343a07090f66','sao paulo','SP','08260'),
('50bf89f1349bc0409a268c3a49678009','jaci','SP','15155'),
('323ce52b5b81df2cd804b017b7f09aa7','sao paulo','SP','03306'),
('1284de4ae8aa26997e748c851557cf0e','laranjeiras do sul','SP','85301'),
('f80edd2c5aaa505cc4b0a3b219abf4b8','sao paulo','SP','03431');

DROP TABLE IF EXISTS public.orders;
CREATE TABLE public.orders (
	order_id UUID PRIMARY KEY,
    seller_id UUID,
    order_date timestamp,
    order_amount NUMERIC(10,2)
);

INSERT INTO public.orders (order_id, seller_id, order_date, order_amount) VALUES
(gen_random_uuid(), '9449f25aeaf531019b76999ea49a6949', now(), 20.01),
(gen_random_uuid(), '9449f25aeaf531019b76999ea49a6949', now(), 44.01),
(gen_random_uuid(), '9bc484c87d79cd4874e05ca182658045', now(), 99.03);

Build Data Warehouse

  1. Set your sources

    SELECT auto_dw.source_include('public', 'seller');
    SELECT auto_dw.source_include('public', 'orders');

    Postgres Regex is used behind the scenes. To do an exact match, use:

    SELECT auto_dw.source_include('public', '^sellers$');
  2. Confirm the table columns are queued for processing

    SELECT * FROM auto_dw.source_column();

    You should see a list of columns with status Queued for Processing.

  3. Go

    SELECT auto_dw.go();

Accessing your data

Here's an example materialized view to pull the data together into a flat view.

CREATE MATERIALIZED VIEW my_mat_view AS
SELECT
sat_orders.order_date,
sat_orders.order_amount,
sat_seller.city,
sat_seller.state,
sat_seller.zip_5
FROM dw_dev.link_order_seller
LEFT JOIN dw_dev.sat_orders ON link_order_seller.link_order_seller_hk = sat_orders.link_order_seller_hk
LEFT JOIN dw_dev.hub_seller ON link_order_seller.hub_seller_hk = hub_seller.hub_seller_hk
LEFT JOIN dw_dev.sat_seller ON hub_seller.hub_seller_hk = sat_seller.hub_seller_hk;
;

Tips

If your field isn't being interpreted correctly, try adding a comment to the field, which the LLM does consider.

COMMENT ON COLUMN public.orders.seller_id IS 'is business key';

Setting up foreign data wrappers

The example above reads data from the same instance that it's writing to. Normally you'd want to isolate analytical workloads from transactional workloads.

You can use Postgres foreign data wrapper functionality to accomplish this.

-- Enable the postgres_fdw extension
CREATE EXTENSION postgres_fdw;

-- Inspect existing foreign servers
SELECT * FROM pg_foreign_server; -- Run on the previously configured client system to inspect existing foreign servers.

-- Create a new foreign server
CREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'remote_server_ip', dbname 'foreign_db', port '5432');

-- Inspect existing user mappings (if applicable)
SELECT * FROM pg_user_mappings; -- Run on the previously configured client system to view user mappings for foreign servers.

-- Create a user mapping for the foreign server
CREATE USER MAPPING FOR local_user
SERVER foreign_server
OPTIONS (user 'foreign_user', password 'password');

-- Manually define a foreign table
CREATE FOREIGN TABLE foreign_table_name (
    column1 datatype,   -- Replace with the column name and datatype in the local schema.
    column2 datatype    -- Repeat for additional columns.
)
SERVER foreign_server
OPTIONS (
    schema_name 'public',       -- Schema name of the source table in the foreign server.
    table_name 'source_table'   -- Table name in the foreign server.
);

-- Automatically via Schema
-- Use this approach to bulk import tables, minimizing manual effort.
IMPORT FOREIGN SCHEMA public          -- Replace 'public' with the schema name in the foreign server.
FROM SERVER foreign_server            -- Specify the name of the foreign server.
INTO local_schema;                    -- Replace 'local_schema' with the schema name in the client system.

Advanced Demo: Auto Data Governance

Sometimes it’s best to get a little push-back when creating a data warehouse, which supports appropriate data governance. In this instance a table was not ready to deploy to the data warehouse as a table column may need to be considered sensitive and handled appropriately. In this sample script, Auto DW’s engine understands the attribute is useful for analysis, but also may need to be considered sensitive. In this script the user will:

  1. Identify a Skipped Table
/* Identify source tables skipped and not integration into the data warehouse. */
SELECT schema, "table", status, status_response 
FROM auto_dw.source_table()
WHERE status_code = 'SKIP';

Note: Running this code will provide an understanding of which table was skipped along with a high level reason. You should see the following output from the status_response: “Source Table was skipped as column(s) need additional context. Please run the following SQL query for more information: SELECT schema, table, column, status, status_response FROM auto_dw.source_status_detail() WHERE schema = 'public' AND table = 'customers'.”

  1. Identify the Root Cause
/* Identify the source table column that caused the problem, understand the issue, and potential solution. */
SELECT schema, "table", "column", status, confidence_level, status_response
FROM auto_dw.source_column()
WHERE schema = 'PUBLIC' AND "table" = 'CUSTOMER';

Note: Running this code will provide an understanding of which table column was skipped along with a reason in the status_response. You should see the following output: “Requires Attention: Column cannot be appropriately categorized as it may contain sensitive data. Specifically, if the zip is an extended zip it may be considered PII.”

  1. Decide to Institute Some Data Governance Best Practices
/* Altering column length restricts the acceptance of extended ZIP codes.*/
ALTER TABLE customer ALTER COLUMN zip TYPE VARCHAR(5);

Note: Here the choice was up to the user to make a change that facilitated LLM understanding of data sensitivity. In this case, limiting the type to VARCHAR(5) will allow the LLM to understand that this column will not contain sensitive information in the future.

flowchart LR
    Start(("Start")) --> tbl["Identify a Skipped Table\nauto_dw.source_table()"]
    tbl --> col["Identify the Root Cause\nauto_dw.source_column()"]
    col --> DW[("Institute Data Governance\nBest Practices")]
    DW --> Done(("Done"))
Loading

Auto DW Process Flow: The script highlighted in Act 2 demonstrates that there are several approaches to successfully implementing a data warehouse when using this extension. Below is a BPMN diagram that illustrates these various paths.

flowchart LR
 subgraph functions_informative["Informative Functions"]
    direction LR
        health["auto_dw.health()"]
        source_tables["auto_dw.source_tables()"]
        source_column["auto_dw.source_column()"]
  end
 subgraph functions_interactive["Interactive Functions"]
    direction LR
        source_clude["auto_dw.source_include(object_pattern)"]
        update_context["auto_dw.update_context(object, context)"]
        go["auto_dw.go(flag, status)"]
  end
 subgraph data_gov["Data Governance"]
    direction BT
        to_gov{"X"} --> gov["Issue\nGovernance"]
  end
    start(("Start")) --> command["Choose Command"]
    command --> split{"X"}
    split --> health & source_tables & source_column & source_clude & update_context & go --> join{"X"}
    join --> review["Review Results"]
    review --> data_gov --> more_auto{"More\nAutomations?"} 
    more_auto --> |no| done(("Done"))
    more_auto --> |yes| start_again(("Restart"))
Loading