Skip to content

Commit

Permalink
Resolved #414.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Oct 28, 2024
1 parent 93bbd28 commit 72ef856
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 9 deletions.
3 changes: 2 additions & 1 deletion linux-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ WORKDIR /opt/flink/python_apps
RUN uv sync --frozen

# Zip Python files
RUN zip -r python_files.zip /opt/flink/python_apps/src/kickstarter/* -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'
WORKDIR /opt/flink/python_apps/src/kickstarter
RUN zip -r python_files.zip * -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'

# Set the entrypoint to Flink's entrypoint script
CMD ["./bin/start-cluster.sh"]
3 changes: 2 additions & 1 deletion mac-Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ WORKDIR /opt/flink/python_apps
RUN uv sync --frozen

# Zip Python files
RUN zip -r python_files.zip /opt/flink/python_apps/src/kickstarter/* -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'
WORKDIR /opt/flink/python_apps/src/kickstarter
RUN zip -r python_files.zip * -x '__pycache__/*' -x 'helper/__pycache__/*' -x 'model/__pycache__/*'

# Set the entrypoint to Flink's entrypoint script
ENTRYPOINT ["/docker-entrypoint.sh"]
2 changes: 1 addition & 1 deletion python/src/kickstarter/flight_importer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaSink, KafkaRecordSerializationSchema, KafkaOffsetsInitializer, DeliveryGuarantee
from pyflink.datastream.formats.json import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.table import StreamTableEnvironment
from pyflink.table.catalog import ObjectPath, HiveCatalog, Catalog
from pyflink.table.catalog import ObjectPath
from datetime import datetime, timezone
import argparse

Expand Down
17 changes: 11 additions & 6 deletions python/src/kickstarter/flink_kickstarter_visualization.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, StreamTableEnvironment
from pyflink.table.catalog import ObjectPath
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
import argparse
from typing import Tuple
import pandas as pd
Expand Down Expand Up @@ -33,7 +32,7 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.
Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: is a tuple of Pandas DataFrames.
"""
# Get the number of flights per month by airline, year, and month
airline_monthly_flights_table = _tbl_env.sql_query(f"""
airline_monthly_flights_table = _tbl_env.sql_query("""
select
airline,
extract(year from to_timestamp(departure_time)) as departure_year,
Expand All @@ -54,7 +53,7 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.
df_airline_monthly_flights_table = airline_monthly_flights_table.to_pandas()

# Get the top airports with the most departures by airport, airline, year, and rank
ranked_airports_table = _tbl_env.sql_query(f"""
ranked_airports_table = _tbl_env.sql_query("""
with cte_ranked as (
select
airline,
Expand Down Expand Up @@ -88,7 +87,13 @@ def load_data(_tbl_env: StreamTableEnvironment, database_name: str) -> Tuple[pd.
df_ranked_airports_table = ranked_airports_table.to_pandas()

# Get the flight data by airline and year
flight_table = _tbl_env.sql_query(f"SELECT *, extract(year from to_timestamp(departure_time)) as departure_year FROM {database_name}.flight")
flight_table = _tbl_env.sql_query(f"""
select
*,
extract(year from to_timestamp(departure_time)) as departure_year
from
{database_name}.flight
""")
df_flight_table = flight_table.to_pandas()

return df_airline_monthly_flights_table, df_ranked_airports_table, df_flight_table
Expand Down Expand Up @@ -122,7 +127,7 @@ def main(args):
env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# --- Add the Python dependency script files to the environment
env.add_python_archive("/opt/flink/python_apps/kickstarter/python_files.zip")
env.add_python_archive("/opt/flink/python_apps/src/kickstarter/python_files.zip")

# --- Create a Table Environment
tbl_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=EnvironmentSettings.new_instance().in_batch_mode().build())
Expand Down

0 comments on commit 72ef856

Please sign in to comment.