Skip to content

Commit

Permalink
Resolved #615.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Dec 26, 2024
1 parent a9913e5 commit ad519ac
Showing 1 changed file with 22 additions and 44 deletions.
66 changes: 22 additions & 44 deletions python_ccaf/src/kickstarter/avro_flight_consolidator_ccaf_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pyflink.table.confluent import ConfluentSettings, ConfluentTableDescriptor
from pyflink.table.expressions import col, lit
import argparse
from functools import reduce

from src.kickstarter.helper.settings import get_secrets, FLINK_CLOUD, FLINK_REGION, FLINK_COMPUTE_POOL_ID, FLINK_API_KEY, FLINK_API_SECRET, ORGANIZATION_ID, ENVIRONMENT_ID

Expand Down Expand Up @@ -100,57 +101,34 @@ def run():
exit(1)

# The first table is the SkyOne table that is read in.
skyone_airline = (
tbl_env.from_path(f"{catalog_name}.{database_name}.skyone_avro")
.select(
col("email_address"),
col("departure_time"),
col("departure_airport_code"),
col("arrival_time"),
col("arrival_airport_code"),
col("flight_number"),
col("confirmation_code"),
lit("SkyOne")
)
)
airline = tbl_env.from_path(f"{catalog_name}.{database_name}.skyone_avro")

# Get the schema and columns from the airline table.
schema = airline.get_schema()

# The columns that are not needed in the table the represents general airline flight data.
exclude_airline_columns = ["key", "flight_duration", "ticket_price", "aircraft", "booking_agency_email", "$rowtime"]

# Get only the columns that are not in the exclude_columns list.
flight_expressions = [col(field) for field in schema.get_field_names() if field not in exclude_airline_columns]
flight_columns = [field for field in schema.get_field_names() if field not in exclude_airline_columns]

# The first table is the SkyOne table that is read in.
skyone_airline = airline.select(*flight_expressions, lit("SkyOne"))

# The second table is the Sunset table that is read in.
sunset_airline = (
tbl_env.from_path(f"{catalog_name}.{database_name}.sunset_avro")
.select(
col("email_address"),
col("departure_time"),
col("departure_airport_code"),
col("arrival_time"),
col("arrival_airport_code"),
col("flight_number"),
col("confirmation_code"),
lit("Sunset")
)
sunset_airline = airline.select(*flight_expressions, lit("Sunset"))

# Build a compound expression, ensuring each column is not null
filter_condition = reduce(
lambda accumulated_columns, current_column: accumulated_columns & col(current_column).is_not_null, flight_columns[1:], col(flight_columns[0]).is_not_null
)

# Combine the two tables.
combined_airlines = (
skyone_airline.union_all(sunset_airline)
.alias(
"departure_airport_code",
"flight_number",
"email_address",
"departure_time",
"arrival_time",
"arrival_airport_code",
"confirmation_code",
"airline"
)
.filter(
col("email_address").is_not_null &
col("departure_time").is_not_null &
col("departure_airport_code").is_not_null &
col("arrival_time").is_not_null &
col("arrival_airport_code").is_not_null &
col("flight_number").is_not_null &
col("confirmation_code").is_not_null
)
.alias(*flight_columns, "airline")
.filter(filter_condition)
)

# Insert the combined record into the sink table.
Expand Down

0 comments on commit ad519ac

Please sign in to comment.