From 72ef85650365a6858041ade3494f28cb449301b4 Mon Sep 17 00:00:00 2001 From: "Jeffrey Jonathan Jennings (J3)" Date: Mon, 28 Oct 2024 05:58:18 +0100 Subject: [PATCH] Resolved #414. --- linux-Dockerfile | 3 ++- mac-Dockerfile | 3 ++- python/src/kickstarter/flight_importer_app.py | 2 +- .../flink_kickstarter_visualization.py | 17 +++++++++++------ 4 files changed, 16 insertions(+), 9 deletions(-) diff --git a/linux-Dockerfile b/linux-Dockerfile index d1e227f..59586f9 100644 --- a/linux-Dockerfile +++ b/linux-Dockerfile @@ -136,7 +136,8 @@ WORKDIR /opt/flink/python_apps RUN uv sync --frozen # Zip Python files -RUN zip -r python_files.zip /opt/flink/python_apps/src/kickstarter/* -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*' +WORKDIR /opt/flink/python_apps/src/kickstarter +RUN zip -r python_files.zip * -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*' # Set the entrypoint to Flink's entrypoint script CMD ["./bin/start-cluster.sh"] \ No newline at end of file diff --git a/mac-Dockerfile b/mac-Dockerfile index c32de70..1c7bc96 100644 --- a/mac-Dockerfile +++ b/mac-Dockerfile @@ -136,7 +136,8 @@ WORKDIR /opt/flink/python_apps RUN uv sync --frozen # Zip Python files -RUN zip -r python_files.zip /opt/flink/python_apps/src/kickstarter/* -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*' +WORKDIR /opt/flink/python_apps/src/kickstarter +RUN zip -r python_files.zip * -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*' # Set the entrypoint to Flink's entrypoint script ENTRYPOINT ["/docker-entrypoint.sh"] \ No newline at end of file diff --git a/python/src/kickstarter/flight_importer_app.py b/python/src/kickstarter/flight_importer_app.py index f8db168..c07a6d8 100644 --- a/python/src/kickstarter/flight_importer_app.py +++ b/python/src/kickstarter/flight_importer_app.py @@ -3,7 +3,7 @@ from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink, KafkaRecordSerializationSchema, KafkaOffsetsInitializer, DeliveryGuarantee from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema from pyflink.table import StreamTableEnvironment -from pyflink.table.catalog import ObjectPath, HiveCatalog, Catalog +from pyflink.table.catalog import ObjectPath from datetime import datetime, timezone import argparse diff --git a/python/src/kickstarter/flink_kickstarter_visualization.py b/python/src/kickstarter/flink_kickstarter_visualization.py index 89b0623..4789a30 100644 --- a/python/src/kickstarter/flink_kickstarter_visualization.py +++ b/python/src/kickstarter/flink_kickstarter_visualization.py @@ -1,6 +1,5 @@ from pyflink.datastream import StreamExecutionEnvironment -from pyflink.table import TableEnvironment, EnvironmentSettings, StreamTableEnvironment -from pyflink.table.catalog import ObjectPath +from pyflink.table import EnvironmentSettings, StreamTableEnvironment import argparse from typing import Tuple import pandas as pd @@ -33,7 +32,7 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd. Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: is a tuple of Pandas DataFrames. """ # Get the number of flights per month by airline, year, and month - airline_monthly_flights_table = _tbl_env.sql_query(f""" + airline_monthly_flights_table = _tbl_env.sql_query(""" select airline, extract(year from to_timestamp(departure_time)) as departure_year, @@ -54,7 +53,7 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd. df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas() # Get the top airports with the most departures by airport, airline, year, and rank - ranked_airports_table = _tbl_env.sql_query(f""" + ranked_airports_table = _tbl_env.sql_query(""" with cte_ranked as ( select airline, @@ -88,7 +87,13 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd. df_ranked_airports_table = ranked_airports_table.to_pandas() # Get the flight data by airline and year - flight_table = _tbl_env.sql_query(f"SELECT *, extract(year from to_timestamp(departure_time)) as departure_year FROM {database_name}.flight") + flight_table = _tbl_env.sql_query(f""" + select + *, + extract(year from to_timestamp(departure_time)) as departure_year + from + {database_name}.flight + """) df_flight_table = flight_table.to_pandas() return df_airline_monthly_flights_table, df_ranked_airports_table, df_flight_table @@ -122,7 +127,7 @@ def main(args): env.get_checkpoint_config().set_max_concurrent_checkpoints(1) # --- Add the Python dependency script files to the environment - env.add_python_archive("/opt/flink/python_apps/kickstarter/python_files.zip") + env.add_python_archive("/opt/flink/python_apps/src/kickstarter/python_files.zip") # --- Create a Table Environment tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=EnvironmentSettings.new_instance().in_batch_mode().build())