Skip to content

Commit

Permalink
See #31. Removed deprecated registerCatalog method.
Browse files Browse the repository at this point in the history
  • Loading branch information
j3-signalroom committed Oct 17, 2024
1 parent fde3b1b commit 4653acd
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 142 deletions.
12 changes: 9 additions & 3 deletions java/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ repositories {
}

// --- Dependency version numbers
val flinkVersion: String = "1.20.0"
val flinkVersion: String = "1.19.1"
val kafkaVersion: String = "3.7.0"
val junitVersion: String = "5.10.0"
val awssdkVersion: String = "2.26.29"
val awssdkVersion: String = "2.27.14"
var icebergVersion: String = "1.6.1"

dependencies {
implementation("org.apache.hadoop:hadoop-common:3.3.6")
implementation("org.apache.kafka:kafka-clients:${kafkaVersion}")
implementation("org.apache.flink:flink-java:${flinkVersion}")
compileOnly("org.apache.flink:flink-streaming-java:${flinkVersion}")
Expand All @@ -33,14 +34,19 @@ dependencies {
implementation("org.apache.flink:flink-connector-kafka:3.2.0-1.19")
implementation("org.apache.flink:flink-connector-datagen:${flinkVersion}")
implementation("org.apache.flink:flink-json:${flinkVersion}")
compileOnly("org.apache.flink:flink-s3-fs-hadoop:${flinkVersion}")
implementation("org.slf4j:slf4j-log4j12:2.0.7")
implementation("software.amazon.awssdk:secretsmanager:${awssdkVersion}")
implementation("software.amazon.awssdk:ssm:${awssdkVersion}")
implementation("software.amazon.awssdk:glue:${awssdkVersion}")
implementation("software.amazon.awssdk:s3:${awssdkVersion}")
implementation("org.json:json:20240303")
runtimeOnly("org.apache.iceberg:iceberg-core:${icebergVersion}")
runtimeOnly("org.apache.iceberg:iceberg-aws:${icebergVersion}")
implementation("org.apache.iceberg:iceberg-snowflake:${icebergVersion}")
implementation("net.snowflake:snowflake-jdbc:3.19.0")
implementation("org.apache.iceberg:iceberg-flink-runtime-1.19:${icebergVersion}")
implementation("net.snowflake:snowflake-jdbc:3.19.0")

testImplementation("org.apache.flink:flink-test-utils:${flinkVersion}")
testImplementation("org.junit.jupiter:junit-jupiter:${junitVersion}")
testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}")
Expand Down
26 changes: 1 addition & 25 deletions java/app/src/main/java/kickstarter/Common.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,16 @@
*/
package kickstarter;

import java.util.*;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.Catalog;


public class Common {
private static final String OPT_SERVICE_ACCOUNT_USER = "--service-account-user";
public static final String ARG_SERVICE_ACCOUNT_USER = "--service-account-user";


/**
* This method Loops through the `args` parameter and checks for the `OPT_SERVICE_ACCOUNT_USER`
* option.
*
* @param args list of strings passed to the main method.
* @return true if the flag is found, false otherwise.
*/
public static String getAppOptions(final String[] args) {
String serviceAccountUser = "";

// --- Loop through the args parameter and check for the `OPT_SERVICE_ACCOUNT_USER` option
Iterator <String> iterator = List.of(args).iterator();
while (iterator.hasNext()) {
String arg = iterator.next();
if(arg.equalsIgnoreCase(OPT_SERVICE_ACCOUNT_USER)) {
if(iterator.hasNext()) {
serviceAccountUser = iterator.next();
}
}
}
return serviceAccountUser;
}

/**
* @return returns a new instance of the Jackson ObjectMapper with the JavaTimeModule
* registered.
Expand Down
262 changes: 156 additions & 106 deletions java/app/src/main/java/kickstarter/DataGeneratorApp.java

Large diffs are not rendered by default.

13 changes: 10 additions & 3 deletions java/app/src/main/java/kickstarter/FlightImporterApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package kickstarter;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.kafka.sink.*;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
Expand All @@ -38,7 +39,7 @@

public class FlightImporterApp {
private static final Logger logger = LoggerFactory.getLogger(FlightImporterApp.class);


/**
* The main method in a Flink application serves as the entry point of the program, where
Expand All @@ -53,6 +54,12 @@ public class FlightImporterApp {
*/
@SuppressWarnings("rawtypes")
public static void main(String[] args) throws Exception {
/*
* Retrieve the arguments from the command line arguments
*/
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String serviceAccountUser = params.get(Common.ARG_SERVICE_ACCOUNT_USER);

// --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Expand All @@ -63,7 +70,7 @@ public static void main(String[] args) throws Exception {
*/
DataStream<Properties> dataStreamConsumerProperties =
env.fromData(new Properties())
.map(new KafkaClientPropertiesLookup(true, Common.getAppOptions(args)))
.map(new KafkaClientPropertiesLookup(true, serviceAccountUser))
.name("kafka_consumer_properties");
Properties consumerProperties = new Properties();

Expand Down Expand Up @@ -92,7 +99,7 @@ public static void main(String[] args) throws Exception {
*/
DataStream<Properties> dataStreamProducerProperties =
env.fromData(new Properties())
.map(new KafkaClientPropertiesLookup(false, Common.getAppOptions(args)))
.map(new KafkaClientPropertiesLookup(false, serviceAccountUser))
.name("kafka_producer_properties");
Properties producerProperties = new Properties();

Expand Down
11 changes: 9 additions & 2 deletions java/app/src/main/java/kickstarter/FlyerStatsApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
package kickstarter;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.*;
import org.apache.flink.connector.kafka.source.KafkaSource;
Expand Down Expand Up @@ -43,6 +44,12 @@ public class FlyerStatsApp {
* decide whether to retry the task execution.
*/
public static void main(String[] args) throws Exception {
/*
* Retrieve the arguments from the command line arguments
*/
MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
String serviceAccountUser = params.get(Common.ARG_SERVICE_ACCOUNT_USER);

// --- Create a blank Flink execution environment (a.k.a. the Flink job graph -- the DAG)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Expand All @@ -53,7 +60,7 @@ public static void main(String[] args) throws Exception {
*/
DataStream<Properties> dataStreamConsumerProperties =
env.fromData(new Properties())
.map(new KafkaClientPropertiesLookup(true, Common.getAppOptions(args)))
.map(new KafkaClientPropertiesLookup(true, serviceAccountUser))
.name("kafka_consumer_properties");
Properties consumerProperties = new Properties();

Expand Down Expand Up @@ -86,7 +93,7 @@ public static void main(String[] args) throws Exception {
*/
DataStream<Properties> dataStreamProducerProperties =
env.fromData(new Properties())
.map(new KafkaClientPropertiesLookup(false, Common.getAppOptions(args)))
.map(new KafkaClientPropertiesLookup(false, serviceAccountUser))
.name("kafka_producer_properties");
Properties producerProperties = new Properties();

Expand Down
4 changes: 3 additions & 1 deletion java/app/src/main/java/kickstarter/model/AirlineData.java
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package kickstarter.model;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.*;

import java.io.*;
import java.math.*;
import java.util.*;


public class AirlineData {
public class AirlineData implements Serializable {
@JsonProperty("email_address")
private String email_address;

Expand Down
4 changes: 3 additions & 1 deletion java/app/src/main/java/kickstarter/model/FlightData.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
package kickstarter.model;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.*;

import java.io.*;
import java.util.*;


public class FlightData {
public class FlightData implements Serializable {
@JsonProperty("email_address")
private String email_address;

Expand Down
4 changes: 3 additions & 1 deletion java/app/src/main/java/kickstarter/model/FlyerStatsData.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
package kickstarter.model;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.*;

import java.io.*;
import java.time.*;
import java.time.format.*;
import java.util.*;


public class FlyerStatsData {
public class FlyerStatsData implements Serializable {
@JsonProperty("email_address")
private String email_address;

Expand Down

0 comments on commit 4653acd

Please sign in to comment.