diff --git a/Cargo.lock b/Cargo.lock index 4def486..55f50d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1357,16 +1357,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1564,8 +1554,6 @@ dependencies = [ "orb-billing", "pgrx", "pgrx-tests", - "reqwest", - "reqwest-middleware", "serde_json", "supabase-wrappers", "thiserror", @@ -2274,7 +2262,6 @@ dependencies = [ "js-sys", "log", "mime", - "mime_guess", "native-tls", "once_cell", "percent-encoding", @@ -2295,21 +2282,6 @@ dependencies = [ "winreg", ] -[[package]] -name = "reqwest-middleware" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a735987236a8e238bf0296c7e351b999c188ccc11477f311b82b55c93984216" -dependencies = [ - "anyhow", - "async-trait", - "http", - "reqwest", - "serde", - "task-local-extensions", - "thiserror", -] - [[package]] name = "rustc-demangle" version = "0.1.23" @@ -2735,7 +2707,8 @@ checksum = "81cdd64d312baedb58e21336b31bc043b77e01cc99033ce76ef539f78e965ebc" [[package]] name = "supabase-wrappers" version = "0.1.18" -source = "git+https://github.com/supabase/wrappers.git?rev=99242d70eb7e551b700e2db31cc618850d835bc2#99242d70eb7e551b700e2db31cc618850d835bc2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9bf99aa26691813b950feeb6d40f3570bebb4a24b1a2cf387be250d7dadefdf" dependencies = [ "pgrx", "supabase-wrappers-macros", @@ -2747,7 +2720,8 @@ dependencies = [ [[package]] name = "supabase-wrappers-macros" version = "0.1.16" -source = "git+https://github.com/supabase/wrappers.git?rev=99242d70eb7e551b700e2db31cc618850d835bc2#99242d70eb7e551b700e2db31cc618850d835bc2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fd5531b513912a65d64b6dfd2effcdece61b4fde911658e847f522b4bb56a24" dependencies = [ "proc-macro2", "quote", @@ -2824,15 +2798,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" -[[package]] -name = "task-local-extensions" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba323866e5d033818e3240feeb9f7db2c4296674e4d9e16b97b7bf8f490434e8" -dependencies = [ - "pin-utils", -] - [[package]] name = "tempfile" version = "3.10.1" @@ -3181,15 +3146,6 @@ dependencies = [ "unic-common", ] -[[package]] -name = "unicase" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.15" diff --git a/Cargo.toml b/Cargo.toml index e4b258f..8c6c808 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,14 +17,11 @@ pg_test = [] [dependencies] pgrx = "=0.11.3" orb-billing = "0.11.0" -# supabase-wrappers = "=0.1.18" -supabase-wrappers = { git = "https://github.com/supabase/wrappers.git", default-features = false, rev = "99242d70eb7e551b700e2db31cc618850d835bc2" } +supabase-wrappers = { version = "0.1.18", default-features = false } tokio = { version = "1", features = ["full"] } serde_json = "1.0" thiserror = "1.0.48" futures = "0.3.28" -reqwest-middleware = "0.2.4" -reqwest = { version = "0.11.4", features = ["json"] } [dev-dependencies] pgrx-tests = "=0.11.3" diff --git a/src/lib.rs b/src/lib.rs index 6748618..f55af65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,11 @@ -use pgrx::warning; -use pgrx::{pg_sys, prelude::*, JsonB}; +use pgrx::{pg_sys, prelude::*, warning, JsonB}; use serde_json::Value as JsonValue; -use std::collections::HashMap; -use std::env; -use std::str::FromStr; +use std::{collections::HashMap, env, str::FromStr}; use supabase_wrappers::prelude::*; use tokio::runtime::Runtime; pg_module_magic!(); mod orb_fdw; -use crate::orb_fdw::OrbFdwError; +use crate::orb_fdw::{OrbFdwError, OrbFdwResult}; use futures::StreamExt; use orb_billing::{ Client as OrbClient, ClientConfig as OrbClientConfig, Customer as OrbCustomer, @@ -16,24 +13,21 @@ use orb_billing::{ SubscriptionListParams, }; -// TODO: Remove all unwraps. Handle the errors -fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { +fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> OrbFdwResult> { match obj { - "customers" => { - body_to_rows( - resp, - "data", // This might need to be an empty string if resp is directly an array - vec![ - ("id", "user_id", "string"), - ("external_customer_id", "organization_id", "string"), - ("name", "first_name", "string"), - ("email", "email", "string"), - ("payment_provider_id", "stripe_id", "string"), - ("created_at", "created_at", "timestamp_iso"), - ], - tgt_cols, - ) - } + "customers" => body_to_rows( + resp, + "data", + vec![ + ("id", "user_id", "string"), + ("external_customer_id", "organization_id", "string"), + ("name", "first_name", "string"), + ("email", "email", "string"), + ("payment_provider_id", "stripe_id", "string"), + ("created_at", "created_at", "timestamp_iso"), + ], + tgt_cols, + ), "subscriptions" => body_to_rows( resp, "data", @@ -69,8 +63,7 @@ fn resp_to_rows(obj: &str, resp: &JsonValue, tgt_cols: &[Column]) -> Vec { tgt_cols, ), _ => { - warning!("unsupported object: {}", obj); - Vec::new() + error!("{}", OrbFdwError::ObjectNotImplemented(obj.to_string())) } } } @@ -80,21 +73,22 @@ fn body_to_rows( obj_key: &str, normal_cols: Vec<(&str, &str, &str)>, tgt_cols: &[Column], -) -> Vec { +) -> OrbFdwResult> { let mut result = Vec::new(); let objs = if resp.is_array() { - // If `resp` is directly an array - resp.as_array().unwrap() + match resp.as_array() { + Some(value) => value, + None => return Ok(result), + } } else { - // If `resp` is an object containing the array under `obj_key` match resp .as_object() .and_then(|v| v.get(obj_key)) .and_then(|v| v.as_array()) { Some(objs) => objs, - None => return result, + None => return Ok(result), } }; @@ -116,15 +110,21 @@ fn body_to_rows( "bool" => v.as_bool().map(Cell::Bool), "i64" => v.as_i64().map(Cell::I64), "string" => v.as_str().map(|a| Cell::String(a.to_owned())), - "timestamp" => v.as_str().map(|a| { - let secs = a.parse::().unwrap() / 1000; - let ts = to_timestamp(secs as f64); - Cell::Timestamp(ts.to_utc()) - }), - "timestamp_iso" => v.as_str().map(|a| { - let ts = Timestamp::from_str(a).unwrap(); - Cell::Timestamp(ts) - }), + "timestamp" => Some( + v.as_str() + .and_then(|a| a.parse::().ok()) + .map(|ms| to_timestamp(ms as f64 / 1000.0).to_utc()) + .map(Cell::Timestamp) + .ok_or(OrbFdwError::InvalidTimestampFormat(v.to_string())) + .ok()?, + ), + "timestamp_iso" => Some( + v.as_str() + .and_then(|a| Timestamp::from_str(a).ok()) + .map(Cell::Timestamp) + .ok_or(OrbFdwError::InvalidTimestampFormat(v.to_string())) + .ok()?, + ), "json" => Some(Cell::Json(JsonB(v.clone()))), _ => None, }); @@ -134,13 +134,25 @@ fn body_to_rows( // put all properties into 'attrs' JSON column if tgt_cols.iter().any(|c| &c.name == "attrs") { - let attrs = serde_json::from_str(&obj.to_string()).unwrap(); + let attrs = serde_json::from_str(&obj.to_string())?; row.push("attrs", Some(Cell::Json(JsonB(attrs)))); } result.push(row); } - result + Ok(result) +} + +fn process_data(data: Vec>) -> Vec { + data.into_iter() + .filter_map(|item_result| match item_result { + Ok(item) => Some(item), + Err(e) => { + warning!("Error processing item: {}", e); + None + } + }) + .collect() } #[wrappers_fdw( @@ -156,20 +168,27 @@ pub(crate) struct OrbFdw { tgt_cols: Vec, } -type OrbFdwResult = Result; impl ForeignDataWrapper for OrbFdw { fn new(options: &HashMap) -> OrbFdwResult { let token = if let Some(access_token) = options.get("api_key") { access_token.to_owned() } else { - warning!("Cannot find api_key in options"); - env::var("ORB_API_KEY").unwrap() - }; - let client_config = OrbClientConfig { - api_key: token.to_string(), + warning!("Cannot find api_key in options; trying environment variable"); + match env::var("ORB_API_KEY") { + Ok(key) => key, + Err(_) => { + error!("{}", OrbFdwError::ApiKeyNotFound); + } + } }; + let client_config = OrbClientConfig { api_key: token }; let orb_client = OrbClient::new(client_config); - let rt = create_async_runtime().expect("failed to create async runtime"); + let rt = match create_async_runtime() { + Ok(runtime) => runtime, + Err(e) => { + error!("Failed to create async runtime: {}", e); + } + }; Ok(Self { rt, client: orb_client, @@ -186,7 +205,13 @@ impl ForeignDataWrapper for OrbFdw { _limit: &Option, options: &HashMap, ) -> OrbFdwResult<()> { - let obj = require_option("object", options).expect("invalid option"); + let obj = match require_option("object", options) { + Ok(object) => object, + Err(_e) => error!( + "{}", + OrbFdwError::MissingRequiredOption("object".to_string()) + ), + }; self.scan_result = None; self.tgt_cols = columns.to_vec(); let mut result = Vec::new(); @@ -196,67 +221,38 @@ impl ForeignDataWrapper for OrbFdw { "customers" => { let customers_stream = self .client - .list_customers(&ListParams::DEFAULT.page_size(400)); + .list_customers(&ListParams::DEFAULT.page_size(500)); let customers = customers_stream.collect::>().await; + let processed_customers: Vec = process_data(customers); - // Process the Vec> - let processed_customers: Vec = customers - .into_iter() - .filter_map(|customer_result| match customer_result { - Ok(customer) => Some(customer), - Err(e) => { - warning!("Error processing customer: {}", e); - None - } - }) - .collect(); - - info!("Found {} customers in Orb", processed_customers.len()); - serde_json::to_value(processed_customers).expect("failed deserializing users") + match serde_json::to_value(processed_customers) { + Ok(value) => value, + Err(e) => error!("{}", OrbFdwError::JsonSerializationError(e)), + } } "subscriptions" => { let subscriptions_stream = self .client - .list_subscriptions(&SubscriptionListParams::DEFAULT.page_size(400)); + .list_subscriptions(&SubscriptionListParams::DEFAULT.page_size(500)); let subscriptions = subscriptions_stream.collect::>().await; + let processed_subscriptions: Vec = process_data(subscriptions); - let processed_subscriptions: Vec = subscriptions - .into_iter() - .filter_map(|customer_result| match customer_result { - Ok(customer) => Some(customer), - Err(e) => { - warning!("Error processing customer: {}", e); - None - } - }) - .collect(); - - info!( - "Found {} subscriptions in Orb", - processed_subscriptions.len() - ); - serde_json::to_value(processed_subscriptions) - .expect("failed deserializing users") + match serde_json::to_value(processed_subscriptions) { + Ok(value) => value, + Err(e) => error!("{}", OrbFdwError::JsonSerializationError(e)), + } } "invoices" => { let invoices_stream = self .client - .list_invoices(&InvoiceListParams::DEFAULT.page_size(400)); + .list_invoices(&InvoiceListParams::DEFAULT.page_size(500)); let invoices = invoices_stream.collect::>().await; + let processed_invoices: Vec = process_data(invoices); - let processed_invoices: Vec = invoices - .into_iter() - .filter_map(|customer_result| match customer_result { - Ok(customer) => Some(customer), - Err(e) => { - warning!("Error processing customer: {}", e); - None - } - }) - .collect(); - - info!("Found {} subscriptions in Orb", processed_invoices.len()); - serde_json::to_value(processed_invoices).expect("failed deserializing users") + match serde_json::to_value(processed_invoices) { + Ok(value) => value, + Err(e) => error!("{}", OrbFdwError::JsonSerializationError(e)), + } } _ => { warning!("unsupported object: {}", obj); @@ -264,7 +260,10 @@ impl ForeignDataWrapper for OrbFdw { } }; - let mut rows = resp_to_rows(obj, &obj_js, &self.tgt_cols[..]); + let mut rows = match resp_to_rows(obj, &obj_js, &self.tgt_cols[..]) { + Ok(rows) => rows, + Err(e) => error!("{}", e), + }; result.append(&mut rows); Ok(()) }); diff --git a/src/orb_fdw.rs b/src/orb_fdw.rs index 47a4a45..aa5df79 100644 --- a/src/orb_fdw.rs +++ b/src/orb_fdw.rs @@ -1,40 +1,24 @@ #![allow(clippy::module_inception)] use pgrx::pg_sys::panic::ErrorReport; use pgrx::prelude::PgSqlErrorCode; -use std::num::ParseIntError; use thiserror::Error; #[derive(Error, Debug)] pub enum OrbFdwError { - #[error("invalid service account key: {0}")] - InvalidServiceAccount(#[from] std::io::Error), - #[error("Orb object '{0}' not implemented")] ObjectNotImplemented(String), - #[error("column '{0}' data type is not supported")] - UnsupportedColumnType(String), - #[error("invalid timestamp format: {0}")] InvalidTimestampFormat(String), - #[error("invalid Orb response: {0}")] - InvalidResponse(String), - - #[error("invalid api_key header")] - InvalidApiKeyHeader, - - #[error("request failed: {0}")] - RequestError(#[from] reqwest::Error), - - #[error("request middleware failed: {0}")] - RequestMiddlewareError(#[from] reqwest_middleware::Error), + #[error("api_key key not found")] + ApiKeyNotFound, - #[error("`limit` option must be an integer: {0}")] - LimitOptionParseError(#[from] ParseIntError), + #[error("JSON serialization error: {0}")] + JsonSerializationError(#[from] serde_json::Error), - #[error("parse JSON response failed: {0}")] - JsonParseError(#[from] serde_json::Error), + #[error("Missing required option: '{0}'")] + MissingRequiredOption(String), } impl From for ErrorReport { @@ -43,4 +27,4 @@ impl From for ErrorReport { } } -pub type _OrbFdwResult = Result; +pub type OrbFdwResult = Result;