From 2e389b58dbdf85ca0a3473905af5d6c1d5209ad0 Mon Sep 17 00:00:00 2001 From: Don Tregonning Date: Thu, 18 Apr 2019 08:05:54 -0700 Subject: [PATCH] add support for eventhubs --- build.gradle | 5 +- gradle/wrapper/gradle-wrapper.jar | Bin 54712 -> 54708 bytes gradle/wrapper/gradle-wrapper.properties | 4 +- out/production/main/META-INF/MANIFEST.MF | 3 + out/production/resources/log4j2.xml | 21 +++ .../java/generator/CommandLineParams.java | 33 ++++- src/main/java/generator/DataGenerator.java | 53 ++++++-- src/main/java/generator/EPSThread.java | 124 ++++++++++++++---- .../java/generator/MetricsCalculator.java | 4 +- src/main/main.iml | 11 ++ 10 files changed, 211 insertions(+), 47 deletions(-) create mode 100644 out/production/main/META-INF/MANIFEST.MF create mode 100644 out/production/resources/log4j2.xml create mode 100644 src/main/main.iml diff --git a/build.gradle b/build.gradle index 3cec651..b7b078c 100644 --- a/build.gradle +++ b/build.gradle @@ -9,10 +9,13 @@ repositories { dependencies { compile 'args4j:args4j-site:2.0.25' compile 'args4j:args4j:2.33' - compile 'org.apache.kafka:kafka-clients:2.0.0' + compile 'org.apache.kafka:kafka-clients:2.2.0' compile 'org.slf4j:slf4j-simple:1.7.5' compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.9.1' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.9.1' + // https://mvnrepository.com/artifact/com.microsoft.azure/azure-eventhubs + compile group: 'com.microsoft.azure', name: 'azure-eventhubs', version: '2.3.0' + compile group: 'com.google.code.gson', name: 'gson', version: '2.3.1' } jar { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 125efdb97014369da22641a1fa7e5fcc83908465..7a3265ee94c0ab25cf079ac8ccdf87f41d455d42 100644 GIT binary patch delta 673 zcmYL{T}YEr9LC@CS2iPM?-E7dDatC-SEkOe=9HWi7A|cvi!7}mR_kk$kr%=)BqA>g zwTIC+k+YNsInN&s|1%lPmm+E8Eh%)}BF_bAu%@Q_3^@dEz<l3g#bM?1)#;Dy%C z*08Rfp_Kwr&n_$#^@k<$_J3tb8p5lYQ&ern_>2i&vk|sgm5ME*;j$Po#HGGU$o4ut zVzaems!Pipl+CHkkB+&cBs+!K)|La8vj8h|Dk*JgD;Jh|m0CK5^vxfpvMzy<`9q9g zTsTe!4=kSDxVNA(A0`(X>6r&_J$qqD%BYVhRPN=n5NEx+gf+&k=v55*itr`UOy_+Z zD@AfLs!*sG-+Ig8NXU4}<)fRpKDZOVqE@=yFHq}OaQd@KrU4Wmd>#aSeGwT?5UXcak+u^JQr9RKSG`{j%M7A*Dx9H<8Jl?#zYtUjVpqWOon)o1DZFX^1&JE}ZvX%Q delta 808 zcmYk4Ur3Wt7{fZq;;t%~p!AeWqn$SuWK6&}b{U5T3MY4>>(Pu>BC z|Lj&Ndi)idK^|{xs@soZzXj?*GMj=aP{>wsSDlKQfI_<&SkQzrCd12Zl{#koW{!~~ zab0O~dFBKSY1f?wuV)U>YTBmL&6dz7bId`Pz6bW80Z*(3J>PX>J+umVJyQ*@btgX0 zDwLnO5lX|;umL%_@%h$V0n2k!(7~IT^5UU3Pr$;wb!ab$hZzL|o-0Tam7UIbc+V-| zOQ#-3~gWLBt{jJ3$A z&BlX}!tTHmaA||3A)6h$o!ZG?dV-7B2&MRRsDL=Ry7bmtUu2LEE-7V8z{6m zDB7kB8F@d2?8CxZmYh^Kg7y&`x|jGwCy*TF6P-|thQHs$PxKp-qSf@>3*D3h-E#(b qqB8R%61_qFK8*YJqH0-hjeYF@f0;jrdFx5ITEYG09Q8?mEcg!;FBCuk diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 21bb438..8640c3b 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Tue Sep 12 13:53:46 PDT 2017 +#Thu Apr 11 13:17:47 PDT 2019 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-all.zip diff --git a/out/production/main/META-INF/MANIFEST.MF b/out/production/main/META-INF/MANIFEST.MF new file mode 100644 index 0000000..4805f3e --- /dev/null +++ b/out/production/main/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: generator.DataGenerator + diff --git a/out/production/resources/log4j2.xml b/out/production/resources/log4j2.xml new file mode 100644 index 0000000..aee6051 --- /dev/null +++ b/out/production/resources/log4j2.xml @@ -0,0 +1,21 @@ + + + + + + + + + %d %p %c{1.} [%t] %m%n + + + + + + + > + + + + + \ No newline at end of file diff --git a/src/main/java/generator/CommandLineParams.java b/src/main/java/generator/CommandLineParams.java index 30f0a77..c4e8817 100644 --- a/src/main/java/generator/CommandLineParams.java +++ b/src/main/java/generator/CommandLineParams.java @@ -22,6 +22,29 @@ public class CommandLineParams { @Option(name="-worker-thread-count", usage="Event generating worker threads, default = 4") public String workerThreadCount; + @Option(name="-output-kafka", usage="output to Kafka, default = true") + public String outputToKafka; + + @Option(name="-output-stdout", usage="output to STDOUT, default = true") + public String outputToStdout; + + @Option(name="-output-eventhubs", usage="output to Eventhubs, default = false") + public String outputToEventhubs; + + //EventHub Specific Options + @Option(name="-eventhub.name", usage="EventHub name") + public String eventHubName; + + @Option(name="-eventhub.namespace", usage="EventHub namespace name") + public String eventHubNameSpace; + + @Option(name="-eventhub.saskeyname", usage="Sas Key name used for EventHubs") + public String eventHubSaskeyname; + + @Option(name="-eventhub.saskey", usage="Sas Key used for EventHubs") + public String eventHubSaskey; + + //Kafka Specific Options @Option(name="-topic", usage="(Required)Kafka Topic to send messages to") public String topic; @@ -46,9 +69,6 @@ public class CommandLineParams { @Option(name="-event-format", usage="event format selector, default = json") public String eventFormat; - @Option(name="-output-stdout", usage="output to STDOUT, default = true") - public String outputToStdout; - @Option(name="-generate-kafka-headers", usage="Will generate kafka headers for testing, default = false") public String includeKafkaHeaders; @@ -58,6 +78,10 @@ public class CommandLineParams { public String toString() { return "[Command Line Parameters]" + "{ message-count: " + messageCount + + ", eventhub.name: " + eventHubName + + ", eventhub.namespace: " + eventHubNameSpace + + ", eventhub.saskey: " + eventHubSaskey + + ", message-size: " + messageSize + ", message-size: " + messageSize + ", topic : " + topic + ", eps: " + eps @@ -70,8 +94,9 @@ public String toString() { + ", kafka-buffer-memory: " + kafkaBufferMemory + ", event-format: " + eventFormat + ", output-stdout: " + outputToStdout + + ", output-eventhubs: " + outputToEventhubs + ", include-kafka-headers: " + includeKafkaHeaders + ", header-gen-profile" + headerGenProfile + "}"; } -} +} \ No newline at end of file diff --git a/src/main/java/generator/DataGenerator.java b/src/main/java/generator/DataGenerator.java index ea39a24..6e1a1ee 100644 --- a/src/main/java/generator/DataGenerator.java +++ b/src/main/java/generator/DataGenerator.java @@ -39,28 +39,46 @@ public static void main(String[] args) { System.exit(2); } - if (params.outputToStdout == "false") { - // Check for required parameters - if (params.bootStrapServers == null || params.topic == null) { - logger.error("Missing required commandline parameter - quiting kafka-data-gen - Exit Status 1"); - System.exit(1); - } - + if (params.outputToEventhubs == params.outputToKafka) { + logger.error("Output to Kafka and Output to EventHubs can't be enabled/disabled at the same time"); + System.exit(1); } + //Set defaults for non required params. And log printout of value default or configured if(params.workerThreadCount == null) { params.workerThreadCount = "4";} if(params.eps == null) { params.eps = "0";} if(params.eventFormat == null) { params.eventFormat = "json";} if(params.messageSize == null) { params.messageSize = "256";} if(params.messageCount == null) { params.messageSize = "10000";} - if(params.includeKafkaHeaders == null) { params.includeKafkaHeaders = "false";} - if(params.headerGenProfile == null) { params.headerGenProfile = "-1";} + + Properties props = new Properties(); + + if (Boolean.parseBoolean(params.outputToKafka) == true) { + // Check for required parameters + if (params.bootStrapServers == null || params.topic == null) { + logger.error("Missing required commandline parameter - quiting kafka-data-gen - Exit Status 1"); + System.exit(1); + } + + if(params.includeKafkaHeaders == null) { params.includeKafkaHeaders = "false";} + if(params.headerGenProfile == null) { params.headerGenProfile = "-1";} + props = parseKafkaArguments(params, props); + + } + if (Boolean.parseBoolean(params.outputToEventhubs) == true) { + // Check for required parameters + if (params.eventHubName == null || params.eventHubNameSpace == null + || params.eventHubSaskey == null || params.eventHubSaskeyname == null) { + logger.error("Missing required commandline parameters for eventhub - quiting kafka-data-gen - Exit Status 1"); + System.exit(1); + } + System.out.println("Configuring EventHubs Props"); + props = parseEventHubsArguments(params, props); + } logger.info(params); //Create and configure Kafka Producer variables. Store in Properties object - Properties props = new Properties(); - props = parseKafkaArguments(params, props); EPSToken epsToken = new EPSToken(); @@ -99,6 +117,19 @@ public static void main(String[] args) { logger.info("Program run and complete without interruption"); } + public static Properties parseEventHubsArguments(CommandLineParams params, Properties props) { + try { + props.put("eventhub.name", params.eventHubName); + props.put("eventhub.namespace", params.eventHubNameSpace); + props.put("eventhub.saskeyname", params.eventHubSaskeyname); + props.put("eventhub.saskey", params.eventHubSaskey); + } catch (java.lang.NullPointerException e) { + logger.error(e.getMessage()); + logger.error("Config Value Not provided"); + } + return props; + } + public static Properties parseKafkaArguments(CommandLineParams params, Properties props) { try { props.put("bootstrap.servers", params.bootStrapServers); diff --git a/src/main/java/generator/EPSThread.java b/src/main/java/generator/EPSThread.java index c3165f1..a150160 100644 --- a/src/main/java/generator/EPSThread.java +++ b/src/main/java/generator/EPSThread.java @@ -1,12 +1,26 @@ package generator; -import org.apache.kafka.common.header.Headers; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.Random; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + class EPSThread implements Runnable { private static Logger logger = LogManager.getLogger(EPSThread.class); private static EPSToken epsTokenObj; @@ -51,7 +65,7 @@ public void run() { logger.info("Total Message count reached, cleaning up program for exit."); } - } else if(thrd.getName().compareTo("MetricsCalculatorThread") == 0) { + } else if(thrd.getName().compareTo("MetricsCalculatorThread") == 0 && Boolean.parseBoolean(params.outputToKafka) == true) { if(Boolean.parseBoolean(params.outputToStdout) != true) { do { Thread.sleep(5000); @@ -60,34 +74,66 @@ public void run() { } } else { - if(Boolean.parseBoolean(params.outputToStdout) == true) { - do { - if (epsTokenObj.takeToken()) { - shipEvent(epsTokenObj, params); + if(Boolean.parseBoolean(params.outputToKafka) == true) { + if (Boolean.parseBoolean(params.outputToStdout) == true) { + do { + if (epsTokenObj.takeToken()) { + shipEvent(epsTokenObj, params); + } + } while (epsTokenObj.complete() == false); + } else { + Producer producer = new KafkaProducer<>(props); + if (!metricsCalc.addProducer(producer)) { + logger.warn("Error adding producer for metrics Calculator, Metric Calculations may be incorrect" + thrd.getName()); } - } while (epsTokenObj.complete() == false); - } - else { - Producer producer = new KafkaProducer<>(props); - if (!metricsCalc.addProducer(producer)) { - logger.warn("Error adding producer for metrics Calculator, Metric Calculations may be incorrect" + thrd.getName()); + + do { + if (epsTokenObj.takeToken()) { + shipEvent(producer, epsTokenObj, params); + } + } while (epsTokenObj.complete() == false); + + producer.close(); } + } + else if(Boolean.parseBoolean(params.outputToEventhubs) == true){ + final ConnectionStringBuilder connStr = new ConnectionStringBuilder() + .setNamespaceName(props.getProperty("eventhub.namespace")) + .setEventHubName(props.getProperty("eventhub.name")) + .setSasKeyName(props.getProperty("eventhub.saskeyname")) + .setSasKey(props.getProperty("eventhub.saskey")); + logger.info(connStr); + + + final Gson gson = new GsonBuilder().create(); + + // The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance. + // This enables the user to segregate their thread pool based on the work load. + // This pool can then be shared across multiple EventHubClient instances. + // The following sample uses a single thread executor, as there is only one EventHubClient instance, + // handling different flavors of ingestion to Event Hubs here. + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + // Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive. + // It is always a best practice to reuse these instances. The following sample shows this. + final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService); do { if (epsTokenObj.takeToken()) { - shipEvent(producer, epsTokenObj, params); + shipEventhubEvent(ehClient, epsTokenObj, params); } } while (epsTokenObj.complete() == false); - producer.close(); + ehClient.closeSync(); + executorService.shutdown(); } } - } catch (InterruptedException exc) { + } catch (InterruptedException | EventHubException | IOException exc ) { System.out.println("Thread Interrupted"); } } - public static void shipEvent(Producer producer,EPSToken epsTokenObj , CommandLineParams params) { + public static void shipEvent(Producer producer, EPSToken epsTokenObj, CommandLineParams params) { int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); //TODO: Smarter Live Logging, hardcoded 10000 value. 10% of total messages? @@ -106,6 +152,40 @@ public static void shipEvent(Producer producer,EPSToken epsToken } } + public static void shipEventhubEvent(EventHubClient ehClient, EPSToken epsTokenObj , CommandLineParams params) { + try { + int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); + if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); } + byte[] event = createEvent(params, sequenceNumber); + + //byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset()); + EventData sendEvent = EventData.create(event); + + // Send - not tied to any partition + // Event Hubs service will round-robin the events across all Event Hubs partitions. + // This is the recommended & most reliable way to send to Event Hubs. + ehClient.sendSync(sendEvent); + + logger.debug("Event batched" + event.toString()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void shipEvent(EPSToken epsTokenObj , CommandLineParams params) { + int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); + + if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); } + + byte[] event = createEvent(params, sequenceNumber); + try { + String s = new String(event); + System.out.println(s); + } catch (Exception e) { + e.printStackTrace(); + } + } + public static ProducerRecord includeKafkaHeaders(ProducerRecord record, int sequenceNumber) { Random random = new Random(); //same header values @@ -208,19 +288,7 @@ else if(Integer.parseInt(params.headerGenProfile) == -1) { return record; } - public static void shipEvent(EPSToken epsTokenObj , CommandLineParams params) { - int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); - - if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); } - byte[] event = createEvent(params, sequenceNumber); - try { - String s = new String(event); - System.out.println(s); - } catch (Exception e) { - e.printStackTrace(); - } - } public static byte[] createEvent(CommandLineParams params, int eventKey) { String s = ""; diff --git a/src/main/java/generator/MetricsCalculator.java b/src/main/java/generator/MetricsCalculator.java index dd000ce..85a1127 100644 --- a/src/main/java/generator/MetricsCalculator.java +++ b/src/main/java/generator/MetricsCalculator.java @@ -77,7 +77,9 @@ public double getKafkaProducerMetrics(String metricName, String metricGroup) { String name = entry.getKey().name(); String group = entry.getKey().group(); if (name.equalsIgnoreCase(metricName) && group.equalsIgnoreCase(metricGroup)) { - consolidatedMetricValue += entry.getValue().value(); + Object metricValue = entry.getValue().metricValue(); + if (metricValue instanceof Double) + consolidatedMetricValue += (Double)metricValue; } } } diff --git a/src/main/main.iml b/src/main/main.iml new file mode 100644 index 0000000..908ad4f --- /dev/null +++ b/src/main/main.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file