diff --git a/java/app/build.gradle.kts b/java/app/build.gradle.kts index 891608e..231cfc4 100644 --- a/java/app/build.gradle.kts +++ b/java/app/build.gradle.kts @@ -15,13 +15,14 @@ repositories { } // --- Dependency version numbers -val flinkVersion: String = "1.20.0" +val flinkVersion: String = "1.19.1" val kafkaVersion: String = "3.7.0" val junitVersion: String = "5.10.0" -val awssdkVersion: String = "2.26.29" +val awssdkVersion: String = "2.27.14" var icebergVersion: String = "1.6.1" dependencies { + implementation("org.apache.hadoop:hadoop-common:3.3.6") implementation("org.apache.kafka:kafka-clients:${kafkaVersion}") implementation("org.apache.flink:flink-java:${flinkVersion}") compileOnly("org.apache.flink:flink-streaming-java:${flinkVersion}") @@ -33,14 +34,19 @@ dependencies { implementation("org.apache.flink:flink-connector-kafka:3.2.0-1.19") implementation("org.apache.flink:flink-connector-datagen:${flinkVersion}") implementation("org.apache.flink:flink-json:${flinkVersion}") + compileOnly("org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}") implementation("org.slf4j:slf4j-log4j12:2.0.7") implementation("software.amazon.awssdk:secretsmanager:${awssdkVersion}") implementation("software.amazon.awssdk:ssm:${awssdkVersion}") + implementation("software.amazon.awssdk:glue:${awssdkVersion}") + implementation("software.amazon.awssdk:s3:${awssdkVersion}") implementation("org.json:json:20240303") runtimeOnly("org.apache.iceberg:iceberg-core:${icebergVersion}") + runtimeOnly("org.apache.iceberg:iceberg-aws:${icebergVersion}") implementation("org.apache.iceberg:iceberg-snowflake:${icebergVersion}") - implementation("net.snowflake:snowflake-jdbc:3.19.0") implementation("org.apache.iceberg:iceberg-flink-runtime-1.19:${icebergVersion}") + implementation("net.snowflake:snowflake-jdbc:3.19.0") + testImplementation("org.apache.flink:flink-test-utils:${flinkVersion}") testImplementation("org.junit.jupiter:junit-jupiter:${junitVersion}") testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}") diff --git a/java/app/src/main/java/kickstarter/Common.java b/java/app/src/main/java/kickstarter/Common.java index f1fcbbb..c58915b 100644 --- a/java/app/src/main/java/kickstarter/Common.java +++ b/java/app/src/main/java/kickstarter/Common.java @@ -8,7 +8,6 @@ */ package kickstarter; -import java.util.*; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import org.apache.flink.table.api.TableEnvironment; @@ -16,32 +15,9 @@ public class Common { - private static final String OPT_SERVICE_ACCOUNT_USER = "--service-account-user"; + public static final String ARG_SERVICE_ACCOUNT_USER = "--service-account-user"; - /** - * This method Loops through the `args` parameter and checks for the `OPT_SERVICE_ACCOUNT_USER` - * option. - * - * @param args list of strings passed to the main method. - * @return true if the flag is found, false otherwise. - */ - public static String getAppOptions(final String[] args) { - String serviceAccountUser = ""; - - // --- Loop through the args parameter and check for the `OPT_SERVICE_ACCOUNT_USER` option - Iterator iterator = List.of(args).iterator(); - while (iterator.hasNext()) { - String arg = iterator.next(); - if(arg.equalsIgnoreCase(OPT_SERVICE_ACCOUNT_USER)) { - if(iterator.hasNext()) { - serviceAccountUser = iterator.next(); - } - } - } - return serviceAccountUser; - } - /** * @return returns a new instance of the Jackson ObjectMapper with the JavaTimeModule * registered. diff --git a/java/app/src/main/java/kickstarter/DataGeneratorApp.java b/java/app/src/main/java/kickstarter/DataGeneratorApp.java index 54e1d26..3c246db 100644 --- a/java/app/src/main/java/kickstarter/DataGeneratorApp.java +++ b/java/app/src/main/java/kickstarter/DataGeneratorApp.java @@ -7,7 +7,9 @@ */ package kickstarter; +import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy; import org.apache.flink.connector.base.DeliveryGuarantee; @@ -18,10 +20,19 @@ import org.apache.flink.streaming.api.environment.*; import org.apache.flink.table.api.*; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; -import org.apache.flink.table.catalog.CatalogDatabaseImpl; -import org.apache.iceberg.flink.FlinkCatalog; +import org.apache.flink.table.data.*; +import org.apache.flink.table.types.logical.*; +import org.apache.flink.types.*; +import org.apache.flink.table.catalog.*; +import org.apache.flink.table.catalog.Catalog; +import org.apache.iceberg.catalog.*; +import org.apache.iceberg.flink.*; +import org.apache.iceberg.flink.sink.FlinkSink; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Types.*; +// import org.apache.iceberg.aws.glue.GlueCatalog; +import org.apache.hadoop.conf.Configuration; import java.util.*; -import java.util.stream.StreamSupport; import kickstarter.model.*; @@ -44,6 +55,17 @@ public class DataGeneratorApp { * decide whether to retry the task execution. */ public static void main(String[] args) throws Exception { + /* + * Retrieve the arguments from the command line arguments + */ + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String serviceAccountUser = params.get(Common.ARG_SERVICE_ACCOUNT_USER); + + System.out.println(args); + System.out.println(serviceAccountUser); + + serviceAccountUser = "flink_kickstarter"; + // --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -76,7 +98,7 @@ public static void main(String[] args) throws Exception { */ DataStream dataStreamProducerProperties = env.fromData(new Properties()) - .map(new KafkaClientPropertiesLookup(false, Common.getAppOptions(args))) + .map(new KafkaClientPropertiesLookup(false, serviceAccountUser)) .name("kafka_producer_properties"); Properties producerProperties = new Properties(); @@ -109,32 +131,12 @@ public static void main(String[] args) throws Exception { Types.POJO(AirlineData.class) ); - // --- Define the schema for the in-memory table - Schema schema = - Schema.newBuilder() - .column("email_address", DataTypes.STRING()) - .column("departure_time", DataTypes.TIMESTAMP(0)) - .column("departure_airport_code", DataTypes.STRING()) - .column("arrival_time", DataTypes.TIMESTAMP(0)) - .column("arrival_airport_code", DataTypes.STRING()) - .column("flight_duration", DataTypes.BIGINT()) - .column("flight_number", DataTypes.STRING()) - .column("confirmation_code", DataTypes.STRING()) - .column("ticket_price", DataTypes.DECIMAL(10, 2)) - .column("aircraft", DataTypes.STRING()) - .column("booking_agency_email", DataTypes.STRING()) - .build(); - /* * Sets up a Flink POJO source to consume data */ DataStream skyOneStream = env.fromSource(skyOneSource, WatermarkStrategy.noWatermarks(), "skyone_source"); - // --- Convert DataStream to Table - Table skyOneTable = tblEnv.fromDataStream(skyOneStream, schema); - tblEnv.createTemporaryView("SkyOneTable", skyOneTable); - /* * Sets up a Flink Kafka sink to produce data to the Kafka topic `airline.skyone` with the * specified serializer @@ -176,11 +178,6 @@ public static void main(String[] args) throws Exception { DataStream sunsetStream = env.fromSource(sunsetSource, WatermarkStrategy.noWatermarks(), "sunset_source"); - - // --- Convert DataStream to Table - Table sunsetTable = tblEnv.fromDataStream(sunsetStream, schema); - tblEnv.createTemporaryView("SunsetTable", sunsetTable); - /* * Sets up a Flink Kafka sink to produce data to the Kafka topic `airline.sunset` with the * specified serializer @@ -208,54 +205,38 @@ public static void main(String[] args) throws Exception { */ sunsetStream.sinkTo(sunsetSink).name("sunset_sink"); + String catalogName = "apache_kickstarter"; + String bucketName = serviceAccountUser.replace("_", "-"); // --- To follow S3 bucket naming convention, replace underscores with hyphens if exist + String warehousePath = "s3a://" + bucketName + "/warehouse"; + String catalogImpl = "org.apache.iceberg.aws.glue.GlueCatalog"; + String ioImpl = "org.apache.iceberg.aws.s3.S3FileIO"; + String databaseName = "airlines"; + + // --- Configure the AWS Glue Catalog Properties + Map catalogProperties = new HashMap<>(); + catalogProperties.put("warehouse", warehousePath); + catalogProperties.put("catalog-impl", catalogImpl); + catalogProperties.put("io-impl", ioImpl); + catalogProperties.put("catalog-type", "glue"); + /* - * Define the CREATE CATALOG Flink SQL statement to register the Iceberg catalog - * using the HadoopCatalog to store metadata in AWS S3 (i.e., s3a://), a Hadoop- - * compatible filesystem. Then execute the Flink SQL statement to register the - * Iceberg catalog + * */ - String catalogName = "apache_kickstarter"; - String bucketName = Common.getAppOptions(args).replace("_", "-"); // --- To follow S3 bucket naming convention, replace underscores with hyphens if exist - try { - if(!Common.isCatalogExist(tblEnv, catalogName)) { - /* - * Execute the CREATE CATALOG Flink SQL statement to register the Iceberg catalog. - */ - tblEnv.executeSql( - "CREATE CATALOG " + catalogName + " WITH (" - + "'type' = 'iceberg'," - + "'catalog-type' = 'hadoop'," - + "'warehouse' = 's3a://" + bucketName + "/warehouse'," - + "'property-version' = '1'," - + "'io-impl' = 'org.apache.iceberg.hadoop.HadoopFileIO'" - + ");" - ); - } else { - System.out.println("The " + catalogName + " catalog already exists."); - } - } catch(final Exception e) { - System.out.println("A critical error occurred to during the processing of the catalog because " + e.getMessage()); - e.printStackTrace(); - System.exit(1); - } + CatalogLoader catalogLoader = CatalogLoader.custom(catalogName, catalogProperties, new Configuration(false), catalogImpl); + - // --- Use the Iceberg catalog + CatalogDescriptor catalogDescriptor = CatalogDescriptor.of(catalogName, org.apache.flink.configuration.Configuration.fromMap(catalogProperties)); + tblEnv.createCatalog(catalogName, catalogDescriptor); tblEnv.useCatalog(catalogName); + org.apache.flink.table.catalog.Catalog catalog = tblEnv.getCatalog("apache_kickstarter").orElseThrow(() -> new RuntimeException("Catalog not found")); // --- Print the current catalog name System.out.println("Current catalog: " + tblEnv.getCurrentCatalog()); // --- Check if the database exists. If not, create it - String databaseName = "airlines"; - - // Check if the namespace exists, if not, create it try { - org.apache.flink.table.catalog.Catalog catalog = tblEnv.getCatalog("apache_kickstarter").orElseThrow(() -> new RuntimeException("Catalog not found")); - if (catalog instanceof FlinkCatalog) { - FlinkCatalog flinkCatalog = (FlinkCatalog) catalog; - if (!flinkCatalog.databaseExists(databaseName)) { - flinkCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap<>(), "The Airlines flight data database."), false); - } + if (!catalog.databaseExists(databaseName)) { + catalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap<>(), "The Airlines flight data database."), false); } tblEnv.useDatabase(databaseName); } catch (Exception e) { @@ -267,34 +248,80 @@ public static void main(String[] args) throws Exception { // --- Print the current database name System.out.println("Current database: " + tblEnv.getCurrentDatabase()); - // --- Set up the arrays for the table names and tables - String tableNames[] = {"skyone_airline", "sunset_airline"}; - Table tables[] = {skyOneTable, sunsetTable}; - int tableIndex = -1; - - /* - * Check if the table exists. If not, create it. Then insert the data into - * the table(s). - */ - for (String tableName : tableNames) { - tableIndex++; - - // --- Create the table path - String tablePath = databaseName + "." + tableName; - - try { - TableResult result = tblEnv.executeSql("SHOW TABLES IN " + databaseName); - @SuppressWarnings("null") - boolean tableExists = StreamSupport.stream(Spliterators - .spliteratorUnknownSize(result.collect(), Spliterator.ORDERED), false) - .anyMatch(row -> row.getField(0).equals(tableName)); - if(!tableExists) { - tblEnv.executeSql( - "CREATE TABLE " + tablePath + " (" + // --- Define the schema for the AirlineData + Schema schema = + new Schema(NestedField.required(1, "email_address", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(2, "departure_time", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(3, "departure_airport_code", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(4, "arrival_time", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(5, "arrival_airport_code", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(6, "flight_duration", org.apache.iceberg.types.Types.LongType.get()), + NestedField.required(7, "flight_number", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(8, "confirmation_code", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(9, "ticket_price", org.apache.iceberg.types.Types.DecimalType.of(10, 2)), + NestedField.required(10, "aircraft", org.apache.iceberg.types.Types.StringType.get()), + NestedField.required(11, "booking_agency_email", org.apache.iceberg.types.Types.StringType.get())); + + // --- Define the RowType for the RowData + RowType rowType = RowType.of( + new LogicalType[] { + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.BIGINT().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.DECIMAL(10, 2).getLogicalType(), + DataTypes.STRING().getLogicalType(), + DataTypes.STRING().getLogicalType() + }, + new String[] { + "email_address", + "departure_time", + "departure_airport_code", + "arrival_time", + "arrival_airport_code", + "flight_duration", + "flight_number", + "confirmation_code", + "ticket_price", + "aircraft", + "booking_agency_email" + } + ); + + // --- Convert DataStream to DataStream + DataStream skyOneRowData = skyOneStream.map(new MapFunction() { + @Override + public RowData map(AirlineData airlineData) throws Exception { + GenericRowData rowData = new GenericRowData(RowKind.INSERT, rowType.getFieldCount()); + rowData.setField(0, airlineData.getEmailAddress()); + rowData.setField(1, airlineData.getDepartureTime()); + rowData.setField(2, airlineData.getDepartureAirportCode()); + rowData.setField(3, airlineData.getArrivalTime()); + rowData.setField(4, airlineData.getArrivalAirportCode()); + rowData.setField(5, airlineData.getFlightDuration()); + rowData.setField(6, airlineData.getFlightNumber()); + rowData.setField(7, airlineData.getConfirmationCode()); + rowData.setField(8, airlineData.getTicketPrice()); + rowData.setField(9, airlineData.getAircraft()); + rowData.setField(10, airlineData.getBookingAgencyEmail()); + return rowData; + } + }); + + TableIdentifier tableIdentifier = TableIdentifier.of(databaseName, "skyone_airline"); + + // Create the table if it does not exist + if (!catalog.tableExists(ObjectPath.fromString(databaseName + "." + "skyone_airline"))) { + tblEnv.executeSql( + "CREATE TABLE " + databaseName + "." + "skyone_airline" + " (" + "email_address STRING, " - + "departure_time TIMESTAMP(0), " + + "departure_time STRING, " + "departure_airport_code STRING, " - + "arrival_time TIMESTAMP(0), " + + "arrival_time STRING, " + "arrival_airport_code STRING, " + "flight_duration BIGINT," + "flight_number STRING, " @@ -303,26 +330,49 @@ public static void main(String[] args) throws Exception { + "aircraft STRING, " + "booking_agency_email STRING) " + "WITH (" + + "'connector' = 'iceberg'," + "'write.format.default' = 'parquet'," + "'write.target-file-size-bytes' = '134217728'," + "'partitioning' = 'arrival_airport_code'," + "'format-version' = '2');" ); - } else { - System.out.println("The " + tablePath + " table already exists."); - } - } catch(final Exception e) { - System.out.println("A critical error occurred to during the processing of the table because " + e.getMessage()); - e.printStackTrace(); - System.exit(1); - } - - /* - * Convert datastream into table and then insert data into physical table - */ - tables[tableIndex].executeInsert(tablePath); } + // --- + TableLoader tableLoaderSkyOne = TableLoader.fromCatalog(catalogLoader, tableIdentifier); + + FlinkSink.forRowData(skyOneRowData).tableLoader(tableLoaderSkyOne).upsert(true).append(); + + // --- Convert DataStream to DataStream + DataStream sunsetRowData = sunsetStream.map(new MapFunction() { + @Override + public RowData map(AirlineData airlineData) throws Exception { + GenericRowData rowData = new GenericRowData(RowKind.INSERT, rowType.getFieldCount()); + rowData.setField(0, airlineData.getEmailAddress()); + rowData.setField(1, airlineData.getDepartureTime()); + rowData.setField(2, airlineData.getDepartureAirportCode()); + rowData.setField(3, airlineData.getArrivalTime()); + rowData.setField(4, airlineData.getArrivalAirportCode()); + rowData.setField(5, airlineData.getFlightDuration()); + rowData.setField(6, airlineData.getFlightNumber()); + rowData.setField(7, airlineData.getConfirmationCode()); + rowData.setField(8, airlineData.getTicketPrice()); + rowData.setField(9, airlineData.getAircraft()); + rowData.setField(10, airlineData.getBookingAgencyEmail()); + return rowData; + } + }); + + // --- + TableLoader tableLoaderSunset = TableLoader.fromCatalog(catalogLoader, TableIdentifier.of(databaseName, "sunset_airline")); + + // --- Initialize a FlinkSink.Builder to export the data from input data stream with RowDatas into iceberg table + FlinkSink + .forRowData(sunsetRowData) + .tableLoader(tableLoaderSunset) + .upsert(true) + .append(); + try { // --- Execute the Flink job graph (DAG) env.execute("DataGeneratorApp"); diff --git a/java/app/src/main/java/kickstarter/FlightImporterApp.java b/java/app/src/main/java/kickstarter/FlightImporterApp.java index 4b677c1..d70bfac 100644 --- a/java/app/src/main/java/kickstarter/FlightImporterApp.java +++ b/java/app/src/main/java/kickstarter/FlightImporterApp.java @@ -22,6 +22,7 @@ package kickstarter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.connector.kafka.sink.*; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -38,7 +39,7 @@ public class FlightImporterApp { private static final Logger logger = LoggerFactory.getLogger(FlightImporterApp.class); - + /** * The main method in a Flink application serves as the entry point of the program, where @@ -53,6 +54,12 @@ public class FlightImporterApp { */ @SuppressWarnings("rawtypes") public static void main(String[] args) throws Exception { + /* + * Retrieve the arguments from the command line arguments + */ + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String serviceAccountUser = params.get(Common.ARG_SERVICE_ACCOUNT_USER); + // --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -63,7 +70,7 @@ public static void main(String[] args) throws Exception { */ DataStream dataStreamConsumerProperties = env.fromData(new Properties()) - .map(new KafkaClientPropertiesLookup(true, Common.getAppOptions(args))) + .map(new KafkaClientPropertiesLookup(true, serviceAccountUser)) .name("kafka_consumer_properties"); Properties consumerProperties = new Properties(); @@ -92,7 +99,7 @@ public static void main(String[] args) throws Exception { */ DataStream dataStreamProducerProperties = env.fromData(new Properties()) - .map(new KafkaClientPropertiesLookup(false, Common.getAppOptions(args))) + .map(new KafkaClientPropertiesLookup(false, serviceAccountUser)) .name("kafka_producer_properties"); Properties producerProperties = new Properties(); diff --git a/java/app/src/main/java/kickstarter/FlyerStatsApp.java b/java/app/src/main/java/kickstarter/FlyerStatsApp.java index 6b8a6dc..735cd31 100644 --- a/java/app/src/main/java/kickstarter/FlyerStatsApp.java +++ b/java/app/src/main/java/kickstarter/FlyerStatsApp.java @@ -10,6 +10,7 @@ package kickstarter; import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.*; import org.apache.flink.connector.kafka.source.KafkaSource; @@ -43,6 +44,12 @@ public class FlyerStatsApp { * decide whether to retry the task execution. */ public static void main(String[] args) throws Exception { + /* + * Retrieve the arguments from the command line arguments + */ + MultipleParameterTool params = MultipleParameterTool.fromArgs(args); + String serviceAccountUser = params.get(Common.ARG_SERVICE_ACCOUNT_USER); + // --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG) StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -53,7 +60,7 @@ public static void main(String[] args) throws Exception { */ DataStream dataStreamConsumerProperties = env.fromData(new Properties()) - .map(new KafkaClientPropertiesLookup(true, Common.getAppOptions(args))) + .map(new KafkaClientPropertiesLookup(true, serviceAccountUser)) .name("kafka_consumer_properties"); Properties consumerProperties = new Properties(); @@ -86,7 +93,7 @@ public static void main(String[] args) throws Exception { */ DataStream dataStreamProducerProperties = env.fromData(new Properties()) - .map(new KafkaClientPropertiesLookup(false, Common.getAppOptions(args))) + .map(new KafkaClientPropertiesLookup(false, serviceAccountUser)) .name("kafka_producer_properties"); Properties producerProperties = new Properties(); diff --git a/java/app/src/main/java/kickstarter/model/AirlineData.java b/java/app/src/main/java/kickstarter/model/AirlineData.java index 5818f92..fa3afc7 100644 --- a/java/app/src/main/java/kickstarter/model/AirlineData.java +++ b/java/app/src/main/java/kickstarter/model/AirlineData.java @@ -1,11 +1,13 @@ package kickstarter.model; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.*; + +import java.io.*; import java.math.*; import java.util.*; -public class AirlineData { +public class AirlineData implements Serializable { @JsonProperty("email_address") private String email_address; diff --git a/java/app/src/main/java/kickstarter/model/FlightData.java b/java/app/src/main/java/kickstarter/model/FlightData.java index 01b7087..df75289 100644 --- a/java/app/src/main/java/kickstarter/model/FlightData.java +++ b/java/app/src/main/java/kickstarter/model/FlightData.java @@ -8,10 +8,12 @@ package kickstarter.model; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.*; + +import java.io.*; import java.util.*; -public class FlightData { +public class FlightData implements Serializable { @JsonProperty("email_address") private String email_address; diff --git a/java/app/src/main/java/kickstarter/model/FlyerStatsData.java b/java/app/src/main/java/kickstarter/model/FlyerStatsData.java index 0643b5c..bfe4056 100644 --- a/java/app/src/main/java/kickstarter/model/FlyerStatsData.java +++ b/java/app/src/main/java/kickstarter/model/FlyerStatsData.java @@ -8,12 +8,14 @@ package kickstarter.model; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.*; + +import java.io.*; import java.time.*; import java.time.format.*; import java.util.*; -public class FlyerStatsData { +public class FlyerStatsData implements Serializable { @JsonProperty("email_address") private String email_address;