Skip to content

Commit

Permalink
Feature/padw 113 source column build status integration (#30)
Browse files Browse the repository at this point in the history
* feat (PADW-113): Unify build id and dv_schema id.

chore (PADW-113): Remove old sample function as associated query.

* feat (PADW-113): Return 'built' status to SQL function source_column in column "status" by searching DV schemas for columns.
  • Loading branch information
analyzer1 authored Dec 16, 2024
1 parent 50c0a03 commit 58f6d93
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 69 deletions.
11 changes: 4 additions & 7 deletions extension/src/controller/dv_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::model::dv_schema::{

use super::dv_loader::*;

pub fn build_dv(build_id: &String, dv_objects_query: &str, load_data: bool) {
pub fn build_dv(build_id: Uuid, dv_objects_query: &str, load_data: bool) {

let mut dv_objects_hm: HashMap<u32, Vec<TransformerObject>> = HashMap::new();

Expand Down Expand Up @@ -283,14 +283,13 @@ pub fn build_dv(build_id: &String, dv_objects_query: &str, load_data: bool) {
}
);


// Build DVTransformerSchema

// Get the current time in GMT
let now_gmt = Utc::now().naive_utc();

let mut dv_schema = DVSchema {
id: Uuid::new_v4(),
id: build_id,
dw_schema,
create_timestamp_gmt: now_gmt,
modified_timestamp_gmt: now_gmt,
Expand All @@ -304,10 +303,10 @@ pub fn build_dv(build_id: &String, dv_objects_query: &str, load_data: bool) {

log!("DV Schema JSON: {:#?}", dv_schema);

dv_schema_push_to_repo(&build_id, &mut dv_schema);
dv_schema_push_to_repo(&build_id.to_string(), &mut dv_schema);

// ToDo: Remove as this is redundant and for testing purposes. However, this function will be integral for future data refreshes.
match dv_load_schema_from_build_id(&build_id) {
match dv_load_schema_from_build_id(&build_id.to_string()) {
Some(schema) => {
dv_schema = schema;
}
Expand All @@ -320,8 +319,6 @@ pub fn build_dv(build_id: &String, dv_objects_query: &str, load_data: bool) {

}



fn dv_schema_push_to_repo(build_id: &String, dv_schema: &mut DVSchema) {

let now_gmt = Utc::now().naive_utc();
Expand Down
40 changes: 39 additions & 1 deletion extension/src/controller/dv_loader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,44 @@
use pg_sys::Hash;
use pgrx::prelude::*;
use std::collections::HashMap;
use crate::model::dv_schema::*;
use crate::model::dv_schema::{self, *};

pub fn column_in_dv_schemas(table_oid: u32, column_ordinal_position: i16) -> bool {

// get DV_SCHEMAS via Query
let get_schemas_query: &str = r#"
SELECT schema
FROM auto_dw.dv_repo;
"#;

// Load schemas
let mut dv_schemas: Vec<DVSchema> = Vec::new();
Spi::connect( |client| {

let schema_results = client.select(get_schemas_query, None, None);
match schema_results {
Ok(schema_results) => {
for schema_result in schema_results {
let schema_json = schema_result.get_datum_by_ordinal(1).unwrap().value::<pgrx::Json>().unwrap().unwrap();
let pgrx::Json(schema_json_value) = schema_json;
let dv_schema: Result<DVSchema, serde_json::Error> = serde_json::from_value(schema_json_value);

match dv_schema {
Ok(dv_schema) => dv_schemas.push(dv_schema),
Err(e) => panic!("Failure to unwrap dv_schema, error: {e}"),
}
}
},
Err(e) => panic!("Get Schemas Query Failure, error: {e}"),
}
});

for dv_schema in dv_schemas {
if dv_schema.contains_column(table_oid, column_ordinal_position) {return true;}
}

false
}

pub fn dv_load_schema_from_build_id(build_id: &String) -> Option<DVSchema> {
let get_schema_query: &str = r#"
Expand Down
92 changes: 40 additions & 52 deletions extension/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod controller; // Coordinates application logic and model-service interactions.
mod model; // Defines data structures and data-related methods.
mod utility; // Initialization, Configuration Management, and External Services

use controller::dv_loader;
pub use pgrx::prelude::*;
use utility::guc;
use uuid::Uuid;
Expand All @@ -23,17 +24,15 @@ fn go_default() -> String {
let build_id = Uuid::new_v4();
let message = format!("Build ID: {} | Data warehouse tables are currently being built.", build_id);
info!("{}", message);
let build_id = build_id.to_string();
let build_flag = "Build";
let build_status = "RTD";
let status = "Ready to Deploy";
let query_insert = &queries::insert_into_build_call(
&build_id, &build_flag, &build_status, &status, &accepted_transformer_confidence_level);
&build_id.to_string(), &build_flag, &build_status, &status, &accepted_transformer_confidence_level);
_ = Spi::run(query_insert);
let query_build_pull = &queries::build_object_pull(&build_id);
let query_build_pull = &queries::build_object_pull(&build_id.to_string());
let load_data = true;
controller::dv_builder::build_dv(&build_id, query_build_pull, load_data);

controller::dv_builder::build_dv(build_id, query_build_pull, load_data);
message
}

Expand All @@ -47,20 +46,18 @@ fn build_default() -> String {
let build_id = Uuid::new_v4();
let message = format!("Build ID: {} | Data warehouse tables are currently being built.", build_id);
info!("{}", message);
let build_id = build_id.to_string();
let build_flag = "Build";
let build_status = "RTD";
let status = "Ready to Deploy";
let query_insert = &queries::insert_into_build_call(
&build_id, &build_flag, &build_status, &status, &accepted_transformer_confidence_level);
&build_id.to_string(), &build_flag, &build_status, &status, &accepted_transformer_confidence_level);
_ = Spi::run(query_insert);
let query_build_pull = &queries::build_object_pull(&build_id);
let query_build_pull = &queries::build_object_pull(&build_id.to_string());
let load_data = false;
controller::dv_builder::build_dv(&build_id, query_build_pull, load_data);
controller::dv_builder::build_dv(build_id, query_build_pull, load_data);
message
}


#[pg_extern]
fn source_include( schema_pattern_include: &str,
table_pattern_include: default!(Option<&str>, "NULL"),
Expand Down Expand Up @@ -103,38 +100,6 @@ fn source_exclude( schema_pattern_exclude: &str,
"Pattern Excluded"
}

#[pg_extern]
fn source_table() -> Result<
TableIterator<
'static,
(
name!(schema, Result<Option<String>, pgrx::spi::Error>),
name!(table, Result<Option<String>, pgrx::spi::Error>),
name!(status, Result<Option<String>, pgrx::spi::Error>),
name!(status_code, Result<Option<String>, pgrx::spi::Error>),
name!(status_response, Result<Option<String>, pgrx::spi::Error>)
)
>,
spi::Error,
> {
let query: &str = queries::SOURCE_TABLE_SAMPLE;

info!("Evaluation of TABLE customer");
Spi::connect(|client| {
Ok(client
.select(query, None, None)?
.map(|row| (
row["schema"].value(),
row["table"].value(),
row["status"].value(),
row["status_code"].value(),
row["status_response"].value())
)
.collect::<Vec<_>>())
})
.map(TableIterator::new)
}

#[pg_extern]
fn source_column() -> Result<
TableIterator<
Expand Down Expand Up @@ -164,17 +129,40 @@ fn source_column() -> Result<
Spi::connect(|client| {
Ok(client
.select(query, None, None)?
.map(|row| (
row["schema"].value(),
row["table"].value(),
row["column"].value(),
row["status"].value(),
row["category"].value(),
row["is_sensitive"].value(),
row["confidence_level"].value(),
row["status_response"].value(),
.map(|row| {

let table_oid: Result<Option<u32>, pgrx::spi::Error> = row["table_oid"].value();
let column_ordinal_position: Result<Option<i16>, pgrx::spi::Error> = row["column_ordinal_position"].value();

let mut is_built = false;

if let (
Ok(Some(table_oid)),
Ok(Some(column_ordinal_position)),
) = (table_oid, column_ordinal_position) {
log!("Table OID: {table_oid}, Column Ordinal Position: {column_ordinal_position}");
is_built = dv_loader::column_in_dv_schemas(table_oid, column_ordinal_position);
} else {
log!("One or both values are missing or there was an error.");
}

(
row["schema"].value(),
row["table"].value(),
row["column"].value(),
{
if is_built {
Ok(Some("built".to_string()))
} else {
row["status"].value()
}
},
row["category"].value(),
row["is_sensitive"].value(),
row["confidence_level"].value(),
row["status_response"].value(),
)
)
})
.collect::<Vec<_>>())
})
.map(TableIterator::new)
Expand Down
50 changes: 49 additions & 1 deletion extension/src/model/dv_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@ pub struct DVSchema {
pub link_keys: Vec<LinkKey>,
}

impl DVSchema {
pub fn contains_column(&self, table_oid: u32, column_ordinal_position: i16) -> bool {

for link_key in &self.link_keys {
for business_key in &link_key.business_keys {
if business_key.contains_column(table_oid, column_ordinal_position) {return true;}
}

for descriptor in &link_key.descriptors {
if let Some(source_column) = &descriptor.descriptor_link.source_column {
if source_column.contains_column(table_oid, column_ordinal_position) {return true;}
}
}
}

for business_key in &self.business_keys {
if business_key.contains_column(table_oid, column_ordinal_position) {return true;}
}
false
}
}


#[derive(Serialize, Deserialize, Debug)]
pub struct LinkKey {
#[serde(rename = "ID")]
Expand Down Expand Up @@ -54,6 +77,24 @@ pub struct BusinessKeyPartLink {
pub hub_target_column: Option<ColumnData>,
}

impl BusinessKey {
pub fn contains_column(&self, table_oid: u32, column_ordinal_position: i16) -> bool {
// BK Part Search
for bkp_link in &self.business_key_part_links {
for source_column in &bkp_link.source_columns {
if source_column.contains_column(table_oid, column_ordinal_position) {return true;}
}
}
// Descriptor Search
for descriptor in &self.descriptors {
if let Some(source_column) = &descriptor.descriptor_link.source_column {
if source_column.contains_column(table_oid, column_ordinal_position) {return true;}
}
}
false
}
}

#[derive(Serialize, Deserialize, Debug)]
pub struct Descriptor {
#[serde(rename = "ID")]
Expand Down Expand Up @@ -96,4 +137,11 @@ pub struct ColumnData {
pub column_ordinal_position: i16,
#[serde(rename = "Column Type")]
pub column_type_name: String,
}
}

impl ColumnData {
pub fn contains_column(&self, table_oid: u32, column_ordinal_position: i16) -> bool {
if self.table_oid == table_oid && self.column_ordinal_position == column_ordinal_position {true}
else {false}
}
}
13 changes: 5 additions & 8 deletions extension/src/model/queries.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,5 @@
use crate::utility::guc;

pub const SOURCE_TABLE_SAMPLE: &str = r#"
WITH Temp_Data (schema, "table", status, status_code, status_response) AS (
VALUES
('PUBLIC', 'CUSTOMER', 'Skipped', 'SKIP', 'Source Table was skipped as column(s) need additional context. Please run the following SQL query for more information: SELECT schema, table, column, status, status_response FROM auto_dw.source_status_detail() WHERE schema = ''public'' AND table = ''customers''.')
)
SELECT * FROM Temp_Data;
"#;

pub const SOURCE_OBJECTS_JSON: &str = r#"
WITH
table_tranformation_time_cal AS (
Expand Down Expand Up @@ -461,7 +453,9 @@ pub fn source_column(accepted_transformer_confidence_level: &str) -> String {
SELECT
t.pk_transformer_responses,
s.schema_name,
s.schema_oid,
s.table_name,
s.table_oid,
s.column_name,
s.column_ordinal_position,
t.confidence_score,
Expand Down Expand Up @@ -525,8 +519,11 @@ pub fn source_column(accepted_transformer_confidence_level: &str) -> String {
)
SELECT
schema_name::TEXT AS schema,
schema_oid AS schema_oid,
table_name::TEXT AS table,
table_oid AS table_oid,
column_name::TEXT AS column,
column_ordinal_position AS column_ordinal_position,
status,
CASE
WHEN category IS NULL THEN '-'
Expand Down

0 comments on commit 58f6d93

Please sign in to comment.