diff --git a/build.gradle b/build.gradle index 3cec651..b7b078c 100644 --- a/build.gradle +++ b/build.gradle @@ -9,10 +9,13 @@ repositories { dependencies { compile 'args4j:args4j-site:2.0.25' compile 'args4j:args4j:2.33' - compile 'org.apache.kafka:kafka-clients:2.0.0' + compile 'org.apache.kafka:kafka-clients:2.2.0' compile 'org.slf4j:slf4j-simple:1.7.5' compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.9.1' compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.9.1' + // https://mvnrepository.com/artifact/com.microsoft.azure/azure-eventhubs + compile group: 'com.microsoft.azure', name: 'azure-eventhubs', version: '2.3.0' + compile group: 'com.google.code.gson', name: 'gson', version: '2.3.1' } jar { diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 125efdb..7a3265e 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 21bb438..8640c3b 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ -#Tue Sep 12 13:53:46 PDT 2017 +#Thu Apr 11 13:17:47 PDT 2019 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-all.zip diff --git a/out/production/main/META-INF/MANIFEST.MF b/out/production/main/META-INF/MANIFEST.MF new file mode 100644 index 0000000..4805f3e --- /dev/null +++ b/out/production/main/META-INF/MANIFEST.MF @@ -0,0 +1,3 @@ +Manifest-Version: 1.0 +Main-Class: generator.DataGenerator + diff --git a/out/production/resources/log4j2.xml b/out/production/resources/log4j2.xml new file mode 100644 index 0000000..aee6051 --- /dev/null +++ b/out/production/resources/log4j2.xml @@ -0,0 +1,21 @@ + + + + + + + + + %d %p %c{1.} [%t] %m%n + + + + + + + > + + + + + \ No newline at end of file diff --git a/src/main/java/generator/CommandLineParams.java b/src/main/java/generator/CommandLineParams.java index 30f0a77..c4e8817 100644 --- a/src/main/java/generator/CommandLineParams.java +++ b/src/main/java/generator/CommandLineParams.java @@ -22,6 +22,29 @@ public class CommandLineParams { @Option(name="-worker-thread-count", usage="Event generating worker threads, default = 4") public String workerThreadCount; + @Option(name="-output-kafka", usage="output to Kafka, default = true") + public String outputToKafka; + + @Option(name="-output-stdout", usage="output to STDOUT, default = true") + public String outputToStdout; + + @Option(name="-output-eventhubs", usage="output to Eventhubs, default = false") + public String outputToEventhubs; + + //EventHub Specific Options + @Option(name="-eventhub.name", usage="EventHub name") + public String eventHubName; + + @Option(name="-eventhub.namespace", usage="EventHub namespace name") + public String eventHubNameSpace; + + @Option(name="-eventhub.saskeyname", usage="Sas Key name used for EventHubs") + public String eventHubSaskeyname; + + @Option(name="-eventhub.saskey", usage="Sas Key used for EventHubs") + public String eventHubSaskey; + + //Kafka Specific Options @Option(name="-topic", usage="(Required)Kafka Topic to send messages to") public String topic; @@ -46,9 +69,6 @@ public class CommandLineParams { @Option(name="-event-format", usage="event format selector, default = json") public String eventFormat; - @Option(name="-output-stdout", usage="output to STDOUT, default = true") - public String outputToStdout; - @Option(name="-generate-kafka-headers", usage="Will generate kafka headers for testing, default = false") public String includeKafkaHeaders; @@ -58,6 +78,10 @@ public class CommandLineParams { public String toString() { return "[Command Line Parameters]" + "{ message-count: " + messageCount + + ", eventhub.name: " + eventHubName + + ", eventhub.namespace: " + eventHubNameSpace + + ", eventhub.saskey: " + eventHubSaskey + + ", message-size: " + messageSize + ", message-size: " + messageSize + ", topic : " + topic + ", eps: " + eps @@ -70,8 +94,9 @@ public String toString() { + ", kafka-buffer-memory: " + kafkaBufferMemory + ", event-format: " + eventFormat + ", output-stdout: " + outputToStdout + + ", output-eventhubs: " + outputToEventhubs + ", include-kafka-headers: " + includeKafkaHeaders + ", header-gen-profile" + headerGenProfile + "}"; } -} +} \ No newline at end of file diff --git a/src/main/java/generator/DataGenerator.java b/src/main/java/generator/DataGenerator.java index ea39a24..6e1a1ee 100644 --- a/src/main/java/generator/DataGenerator.java +++ b/src/main/java/generator/DataGenerator.java @@ -39,28 +39,46 @@ public static void main(String[] args) { System.exit(2); } - if (params.outputToStdout == "false") { - // Check for required parameters - if (params.bootStrapServers == null || params.topic == null) { - logger.error("Missing required commandline parameter - quiting kafka-data-gen - Exit Status 1"); - System.exit(1); - } - + if (params.outputToEventhubs == params.outputToKafka) { + logger.error("Output to Kafka and Output to EventHubs can't be enabled/disabled at the same time"); + System.exit(1); } + //Set defaults for non required params. And log printout of value default or configured if(params.workerThreadCount == null) { params.workerThreadCount = "4";} if(params.eps == null) { params.eps = "0";} if(params.eventFormat == null) { params.eventFormat = "json";} if(params.messageSize == null) { params.messageSize = "256";} if(params.messageCount == null) { params.messageSize = "10000";} - if(params.includeKafkaHeaders == null) { params.includeKafkaHeaders = "false";} - if(params.headerGenProfile == null) { params.headerGenProfile = "-1";} + + Properties props = new Properties(); + + if (Boolean.parseBoolean(params.outputToKafka) == true) { + // Check for required parameters + if (params.bootStrapServers == null || params.topic == null) { + logger.error("Missing required commandline parameter - quiting kafka-data-gen - Exit Status 1"); + System.exit(1); + } + + if(params.includeKafkaHeaders == null) { params.includeKafkaHeaders = "false";} + if(params.headerGenProfile == null) { params.headerGenProfile = "-1";} + props = parseKafkaArguments(params, props); + + } + if (Boolean.parseBoolean(params.outputToEventhubs) == true) { + // Check for required parameters + if (params.eventHubName == null || params.eventHubNameSpace == null + || params.eventHubSaskey == null || params.eventHubSaskeyname == null) { + logger.error("Missing required commandline parameters for eventhub - quiting kafka-data-gen - Exit Status 1"); + System.exit(1); + } + System.out.println("Configuring EventHubs Props"); + props = parseEventHubsArguments(params, props); + } logger.info(params); //Create and configure Kafka Producer variables. Store in Properties object - Properties props = new Properties(); - props = parseKafkaArguments(params, props); EPSToken epsToken = new EPSToken(); @@ -99,6 +117,19 @@ public static void main(String[] args) { logger.info("Program run and complete without interruption"); } + public static Properties parseEventHubsArguments(CommandLineParams params, Properties props) { + try { + props.put("eventhub.name", params.eventHubName); + props.put("eventhub.namespace", params.eventHubNameSpace); + props.put("eventhub.saskeyname", params.eventHubSaskeyname); + props.put("eventhub.saskey", params.eventHubSaskey); + } catch (java.lang.NullPointerException e) { + logger.error(e.getMessage()); + logger.error("Config Value Not provided"); + } + return props; + } + public static Properties parseKafkaArguments(CommandLineParams params, Properties props) { try { props.put("bootstrap.servers", params.bootStrapServers); diff --git a/src/main/java/generator/EPSThread.java b/src/main/java/generator/EPSThread.java index c3165f1..a150160 100644 --- a/src/main/java/generator/EPSThread.java +++ b/src/main/java/generator/EPSThread.java @@ -1,12 +1,26 @@ package generator; -import org.apache.kafka.common.header.Headers; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; import org.apache.kafka.clients.producer.*; import java.util.Properties; import java.util.Random; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubException; + +import java.io.IOException; +import java.nio.charset.Charset; +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + class EPSThread implements Runnable { private static Logger logger = LogManager.getLogger(EPSThread.class); private static EPSToken epsTokenObj; @@ -51,7 +65,7 @@ public void run() { logger.info("Total Message count reached, cleaning up program for exit."); } - } else if(thrd.getName().compareTo("MetricsCalculatorThread") == 0) { + } else if(thrd.getName().compareTo("MetricsCalculatorThread") == 0 && Boolean.parseBoolean(params.outputToKafka) == true) { if(Boolean.parseBoolean(params.outputToStdout) != true) { do { Thread.sleep(5000); @@ -60,34 +74,66 @@ public void run() { } } else { - if(Boolean.parseBoolean(params.outputToStdout) == true) { - do { - if (epsTokenObj.takeToken()) { - shipEvent(epsTokenObj, params); + if(Boolean.parseBoolean(params.outputToKafka) == true) { + if (Boolean.parseBoolean(params.outputToStdout) == true) { + do { + if (epsTokenObj.takeToken()) { + shipEvent(epsTokenObj, params); + } + } while (epsTokenObj.complete() == false); + } else { + Producer producer = new KafkaProducer<>(props); + if (!metricsCalc.addProducer(producer)) { + logger.warn("Error adding producer for metrics Calculator, Metric Calculations may be incorrect" + thrd.getName()); } - } while (epsTokenObj.complete() == false); - } - else { - Producer producer = new KafkaProducer<>(props); - if (!metricsCalc.addProducer(producer)) { - logger.warn("Error adding producer for metrics Calculator, Metric Calculations may be incorrect" + thrd.getName()); + + do { + if (epsTokenObj.takeToken()) { + shipEvent(producer, epsTokenObj, params); + } + } while (epsTokenObj.complete() == false); + + producer.close(); } + } + else if(Boolean.parseBoolean(params.outputToEventhubs) == true){ + final ConnectionStringBuilder connStr = new ConnectionStringBuilder() + .setNamespaceName(props.getProperty("eventhub.namespace")) + .setEventHubName(props.getProperty("eventhub.name")) + .setSasKeyName(props.getProperty("eventhub.saskeyname")) + .setSasKey(props.getProperty("eventhub.saskey")); + logger.info(connStr); + + + final Gson gson = new GsonBuilder().create(); + + // The Executor handles all asynchronous tasks and this is passed to the EventHubClient instance. + // This enables the user to segregate their thread pool based on the work load. + // This pool can then be shared across multiple EventHubClient instances. + // The following sample uses a single thread executor, as there is only one EventHubClient instance, + // handling different flavors of ingestion to Event Hubs here. + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4); + + // Each EventHubClient instance spins up a new TCP/SSL connection, which is expensive. + // It is always a best practice to reuse these instances. The following sample shows this. + final EventHubClient ehClient = EventHubClient.createSync(connStr.toString(), executorService); do { if (epsTokenObj.takeToken()) { - shipEvent(producer, epsTokenObj, params); + shipEventhubEvent(ehClient, epsTokenObj, params); } } while (epsTokenObj.complete() == false); - producer.close(); + ehClient.closeSync(); + executorService.shutdown(); } } - } catch (InterruptedException exc) { + } catch (InterruptedException | EventHubException | IOException exc ) { System.out.println("Thread Interrupted"); } } - public static void shipEvent(Producer producer,EPSToken epsTokenObj , CommandLineParams params) { + public static void shipEvent(Producer producer, EPSToken epsTokenObj, CommandLineParams params) { int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); //TODO: Smarter Live Logging, hardcoded 10000 value. 10% of total messages? @@ -106,6 +152,40 @@ public static void shipEvent(Producer producer,EPSToken epsToken } } + public static void shipEventhubEvent(EventHubClient ehClient, EPSToken epsTokenObj , CommandLineParams params) { + try { + int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); + if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); } + byte[] event = createEvent(params, sequenceNumber); + + //byte[] payloadBytes = gson.toJson(payload).getBytes(Charset.defaultCharset()); + EventData sendEvent = EventData.create(event); + + // Send - not tied to any partition + // Event Hubs service will round-robin the events across all Event Hubs partitions. + // This is the recommended & most reliable way to send to Event Hubs. + ehClient.sendSync(sendEvent); + + logger.debug("Event batched" + event.toString()); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public static void shipEvent(EPSToken epsTokenObj , CommandLineParams params) { + int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); + + if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); } + + byte[] event = createEvent(params, sequenceNumber); + try { + String s = new String(event); + System.out.println(s); + } catch (Exception e) { + e.printStackTrace(); + } + } + public static ProducerRecord includeKafkaHeaders(ProducerRecord record, int sequenceNumber) { Random random = new Random(); //same header values @@ -208,19 +288,7 @@ else if(Integer.parseInt(params.headerGenProfile) == -1) { return record; } - public static void shipEvent(EPSToken epsTokenObj , CommandLineParams params) { - int sequenceNumber = epsTokenObj.getMessageKeyAndInc(); - - if(sequenceNumber % 100000 == 0) { logger.info("Current message with sequence number: " + sequenceNumber); } - byte[] event = createEvent(params, sequenceNumber); - try { - String s = new String(event); - System.out.println(s); - } catch (Exception e) { - e.printStackTrace(); - } - } public static byte[] createEvent(CommandLineParams params, int eventKey) { String s = ""; diff --git a/src/main/java/generator/MetricsCalculator.java b/src/main/java/generator/MetricsCalculator.java index dd000ce..85a1127 100644 --- a/src/main/java/generator/MetricsCalculator.java +++ b/src/main/java/generator/MetricsCalculator.java @@ -77,7 +77,9 @@ public double getKafkaProducerMetrics(String metricName, String metricGroup) { String name = entry.getKey().name(); String group = entry.getKey().group(); if (name.equalsIgnoreCase(metricName) && group.equalsIgnoreCase(metricGroup)) { - consolidatedMetricValue += entry.getValue().value(); + Object metricValue = entry.getValue().metricValue(); + if (metricValue instanceof Double) + consolidatedMetricValue += (Double)metricValue; } } } diff --git a/src/main/main.iml b/src/main/main.iml new file mode 100644 index 0000000..908ad4f --- /dev/null +++ b/src/main/main.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file