Skip to content

Commit

Permalink
Resolved #316.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Oct 18, 2024
1 parent 1bca2ef commit e8a5699
Show file tree
Hide file tree
Showing 13 changed files with 59 additions and 33 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ All notable changes to this project will be documented in this file.

The format is base on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.50.00.000] - TBD
## [0.50.00.000] - 2024-10-18
### Added
- Issue [#31](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/31).
- Issue [#362](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/362).
- Issue [#364](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/364).

### Changed
- Issue [#368](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/368).
- Issue [#369](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/369).

### Changed
- Issue [#368](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/368).
### Fixed
- Issue [#316](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/316).

## [0.42.00.000] - 2024-10-15
### Added
Expand Down
2 changes: 0 additions & 2 deletions KNOWNISSUES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,2 @@
# Known Issues
All notable known issues to this project will be documented in this file.

- Issue [#316](https://github.com/j3-signalroom/apache_flink-kickstarter/issues/316): The `FlyerStatsApp` Java-based Flink App prematurely shutdowns.
6 changes: 0 additions & 6 deletions aws-resources.tf
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,6 @@ resource "aws_s3_object" "warehouse" {
key = "warehouse/"
}

resource "aws_s3_object" "warehouse_airlines" {
bucket = aws_s3_bucket.iceberg_bucket.bucket
key = "warehouse/airlines/"
depends_on = [ aws_s3_object.warehouse ]
}

resource "aws_iam_role" "glue_role" {
name = "glue_service_role"

Expand Down
7 changes: 5 additions & 2 deletions java/app/src/main/java/kickstarter/DataGeneratorApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.hadoop.conf.Configuration;
import java.util.*;
import org.slf4j.*;

import kickstarter.model.*;

Expand All @@ -38,6 +39,9 @@
* respectively.
*/
public class DataGeneratorApp {
private static final Logger logger = LoggerFactory.getLogger(DataGeneratorApp.class);


/**
* The main method in a Flink application serves as the entry point of the program, where
* the Flink DAG is defined. That is, the execution environment, the creation of the data
Expand Down Expand Up @@ -277,8 +281,7 @@ public static void main(String[] args) throws Exception {
// --- Execute the Flink job graph (DAG)
env.execute("DataGeneratorApp");
} catch (Exception e) {
System.out.println("The Flink App stopped early due to the following: " + e.getMessage());
e.printStackTrace();
logger.error("The App stopped early due to the following: {}", e.getMessage());
}
}

Expand Down
4 changes: 2 additions & 2 deletions java/app/src/main/java/kickstarter/FlightImporterApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,12 @@ public static DataStream<FlightData> defineWorkflow(DataStream<AirlineData> skyO
DataStream<FlightData> skyOneFlightStream =
skyOneSource
.filter(flight -> LocalDateTime.parse(flight.getArrivalTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).isAfter(LocalDateTime.now()))
.map(AirlineData::toFlightData);
.map(flight -> flight.toFlightData("SkyOne"));

DataStream<FlightData> sunsetFlightStream =
sunsetSource
.filter(flight -> LocalDateTime.parse(flight.getArrivalTime(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")).isAfter(LocalDateTime.now()))
.map(AirlineData::toFlightData);
.map(flight -> flight.toFlightData("Sunset"));

return skyOneFlightStream.union(sunsetFlightStream);
}
Expand Down
17 changes: 13 additions & 4 deletions java/app/src/main/java/kickstarter/FlyerStatsApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.formats.json.*;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.kafka.clients.producer.ProducerConfig;
import java.time.*;
import java.util.*;
import org.slf4j.*;
Expand Down Expand Up @@ -113,6 +115,13 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

producerProperties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");

// ---Configure ObjectMapper to ignore unknown properties
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

/*
* Sets up a Flink Kafka source to consume data from the Kafka topic `airline.flight` with the
* specified deserializer
Expand Down Expand Up @@ -152,17 +161,17 @@ public static void main(String[] args) throws Exception {
KafkaSink.<FlyerStatsData>builder()
.setKafkaProducerConfig(producerProperties)
.setRecordSerializer(flyerStatsSerializer)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.build();

/*
* Defines the workflow for the Flink job graph (DAG) by connecting the data streams and
* applying transformations to the data streams
*/
defineWorkflow(flightDataStream)
.sinkTo(flyerStatsSink)
.name("flyer_stats_sink")
.uid("flyer_stats_sink");
.sinkTo(flyerStatsSink)
.name("flyer_stats_sink")
.uid("flyer_stats_sink");

try {
// --- Execute the Flink job graph (DAG)
Expand Down
3 changes: 2 additions & 1 deletion java/app/src/main/java/kickstarter/model/AirlineData.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public String toString() {
", booking_agency_email='" + booking_agency_email + '\'' +
'}';
}
public FlightData toFlightData() {
public FlightData toFlightData(final String airline) {
FlightData flightData = new FlightData();

flightData.setEmailAddress(getEmailAddress());
Expand All @@ -189,6 +189,7 @@ public FlightData toFlightData() {
flightData.setArrivalAirportCode(getArrivalAirportCode());
flightData.setFlightNumber(getFlightNumber());
flightData.setConfirmationCode(getConfirmationCode());
flightData.setAirline(airline);

return flightData;
}
Expand Down
24 changes: 19 additions & 5 deletions java/app/src/main/java/kickstarter/model/FlightData.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ public class FlightData implements Serializable {
@JsonProperty("flight_number")
private String flight_number;

@JsonProperty("confirmation_code")
@JsonProperty("confirmation_code")
private String confirmation_code;

@JsonProperty("airline")
private String airline;


public FlightData() {
// --- Do nothing
Expand Down Expand Up @@ -92,8 +95,16 @@ public String getConfirmationCode() {
return this.confirmation_code;
}

public void setConfirmationCode(String referenceNumber) {
this.confirmation_code = referenceNumber;
public void setConfirmationCode(String confirmationCode) {
this.confirmation_code = confirmationCode;
}

public String getAirline() {
return this.airline;
}

public void setAirline(String airline) {
this.airline = airline;
}

@Override
Expand All @@ -107,7 +118,8 @@ public boolean equals(Object o) {
Objects.equals(this.arrival_time, that.arrival_time) &&
Objects.equals(this.arrival_airport_code, that.arrival_airport_code) &&
Objects.equals(this.flight_number, that.flight_number) &&
Objects.equals(this.confirmation_code, that.confirmation_code);
Objects.equals(this.confirmation_code, that.confirmation_code) &&
Objects.equals(this.airline, that.airline);
}

@Override
Expand All @@ -118,7 +130,8 @@ public int hashCode() {
this.arrival_time,
this.arrival_airport_code,
this.flight_number,
this.confirmation_code);
this.confirmation_code,
this.airline);
}

@Override
Expand All @@ -131,6 +144,7 @@ public String toString() {
", arrival_airport_code='" + this.arrival_airport_code + '\'' +
", flight_number='" + this.flight_number + '\'' +
", confirmation_code='" + this.confirmation_code + '\'' +
", airline='" + this.airline + '\'' +
'}';
}
}
4 changes: 2 additions & 2 deletions java/app/src/test/java/kickstarter/FlightImporterAppTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ void defineWorkflow_shouldConvertDataFromTwoStreams() throws Exception {

FlightImporterApp.defineWorkflow(skyOneStream, sunsetStream).collectAsync(collector);
env.executeAsync();
assertContains(collector, Arrays.asList(skyOneFlight.toFlightData(), sunsetFlight.toFlightData()));
assertContains(collector, Arrays.asList(skyOneFlight.toFlightData("SkyOne"), sunsetFlight.toFlightData("Sunset")));
}

@Test
Expand All @@ -79,6 +79,6 @@ void defineWorkflow_shouldFilterOutFlightsInThePast() throws Exception {

env.executeAsync();

assertContains(collector, Arrays.asList(newSkyOneFlight.toFlightData(), newSunsetFlight.toFlightData()));
assertContains(collector, Arrays.asList(newSkyOneFlight.toFlightData("SkyOne"), newSunsetFlight.toFlightData("Sunset")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,10 @@ void toFlightData_shouldConvertToAFlightDataObject() {
expected.setArrivalAirportCode(skyOne.getArrivalAirportCode());
expected.setFlightNumber(skyOne.getFlightNumber());
expected.setConfirmationCode(skyOne.getConfirmationCode());
expected.setAirline("SkyOne");

FlightData actual = skyOne.toFlightData();

FlightData actual = skyOne.toFlightData("SkyOne");

assertEquals(expected, actual);
}
Expand Down
4 changes: 4 additions & 0 deletions java/app/src/test/java/kickstarter/model/FlightDataTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ void setters_shouldPopulateExpectedFields() {
actual.setArrivalAirportCode(expected.getArrivalAirportCode());
actual.setFlightNumber(expected.getFlightNumber());
actual.setConfirmationCode(expected.getConfirmationCode());
actual.setAirline(expected.getAirline());

assertEquals(expected.getEmailAddress(), actual.getEmailAddress());
assertEquals(expected.getDepartureTime(), actual.getDepartureTime());
Expand All @@ -40,6 +41,7 @@ void setters_shouldPopulateExpectedFields() {
assertEquals(expected.getArrivalAirportCode(), actual.getArrivalAirportCode());
assertEquals(expected.getFlightNumber(), actual.getFlightNumber());
assertEquals(expected.getConfirmationCode(), actual.getConfirmationCode());
assertEquals(expected.getAirline(), actual.getAirline());
}

@Test
Expand All @@ -53,6 +55,7 @@ void equals_shouldReturnTrue_forTwoEquivalentFlights() {
flight2.setArrivalAirportCode(flight1.getArrivalAirportCode());
flight2.setFlightNumber(flight1.getFlightNumber());
flight2.setConfirmationCode(flight1.getConfirmationCode());
flight2.setAirline(flight1.getAirline());

assertNotSame(flight1, flight2);
assertEquals(flight1, flight2);
Expand Down Expand Up @@ -81,6 +84,7 @@ void toString_shouldReturnTheExpectedResults() {
", arrival_airport_code='" + flightData.getArrivalAirportCode() + '\'' +
", flight_number='" + flightData.getFlightNumber() + '\'' +
", confirmation_code='" + flightData.getConfirmationCode() + '\'' +
", airline='" + flightData.getAirline() + '\'' +
'}';

System.out.println(flightData.toString());
Expand Down
2 changes: 1 addition & 1 deletion python/kickstarter/flight_importer_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ def main(args):
arrival_airport_code STRING,
flight_number STRING,
confirmation STRING,
source STRING
airline STRING
) WITH (
'write.format.default' = 'parquet',
'write.target-file-size-bytes' = '134217728',
Expand Down
8 changes: 4 additions & 4 deletions python/kickstarter/model/flight_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class FlightData():
arrival_airport_code: str | None
flight_number: str | None
confirmation_code: str | None
source: str | None
airline: str | None


def get_duration(self):
Expand All @@ -35,7 +35,7 @@ def to_row(self):
arrival_airport_code=self.arrival_airport_code,
flight_number=self.flight_number,
confirmation_code=self.confirmation_code,
source=self.source)
airline=self.airline)

@classmethod
def from_row(cls, row: Row):
Expand All @@ -47,7 +47,7 @@ def from_row(cls, row: Row):
arrival_airport_code=row.arrival_airport_code,
flight_number=row.flight_number,
confirmation_code=row.confirmation_code,
source=row.source,
airline=row.airline,
)

@staticmethod
Expand All @@ -61,7 +61,7 @@ def get_value_type_info():
"arrival_airport_code",
"flight_number",
"confirmation_code",
"source",
"airline",
],
field_types=[
Types.STRING(),
Expand Down

0 comments on commit e8a5699

Please sign in to comment.