From f80c973b60cae4ba468ec8f17649eb202e5122f9 Mon Sep 17 00:00:00 2001 From: Mike Harder Date: Fri, 15 Nov 2019 12:03:01 -0800 Subject: [PATCH] [Java] Compute ops/s for each parallel workstream (#5) * Add comment * Add .vscode to python .gitignore * Compute ops/s for each parallel workstream * Improve status printing Fix bug in opsPerSec calculation --- .../azure/perfstress/PerfStressProgram.java | 68 +++++++++++-------- .../java/com/azure/perfstress/SleepTest.java | 40 +++++++++++ 2 files changed, 80 insertions(+), 28 deletions(-) create mode 100644 java/azure-perfstress/src/main/java/com/azure/perfstress/SleepTest.java diff --git a/java/azure-perfstress/src/main/java/com/azure/perfstress/PerfStressProgram.java b/java/azure-perfstress/src/main/java/com/azure/perfstress/PerfStressProgram.java index b07a035b8ce78..2c47f8e2394e2 100644 --- a/java/azure-perfstress/src/main/java/com/azure/perfstress/PerfStressProgram.java +++ b/java/azure-perfstress/src/main/java/com/azure/perfstress/PerfStressProgram.java @@ -10,6 +10,7 @@ import java.util.concurrent.ForkJoinPool; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; +import java.util.stream.IntStream; import com.beust.jcommander.JCommander; import com.beust.jcommander.JCommander.Builder; @@ -22,7 +23,7 @@ import reactor.core.publisher.Mono; public class PerfStressProgram { - private static AtomicInteger _completedOperations = new AtomicInteger(); + private static int[] _completedOperations; private static long[] _lastCompletionNanoTimes; public static void Run(Class[] classes, String[] args) { @@ -30,6 +31,7 @@ public static void Run(Class[] classes, String[] args) { try { classList.add(Class.forName("com.azure.perfstress.NoOpTest")); + classList.add(Class.forName("com.azure.perfstress.SleepTest")); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } @@ -83,7 +85,7 @@ public static void Run(Class testClass, PerfStressOptions options) { System.out.println(); System.out.println(); - Disposable setupStatus = PrintStatus("=== Setup ===", () -> ".", false); + Disposable setupStatus = PrintStatus("=== Setup ===", () -> ".", false, false); Disposable cleanupStatus = null; PerfStressTest[] tests = new PerfStressTest[options.Parallel]; @@ -117,7 +119,7 @@ public static void Run(Class testClass, PerfStressOptions options) { } finally { if (!options.NoCleanup) { if (cleanupStatus == null) { - cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false); + cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false, false); } Flux.just(tests).flatMap(t -> t.CleanupAsync()).blockLast(); @@ -126,7 +128,7 @@ public static void Run(Class testClass, PerfStressOptions options) { } finally { if (!options.NoCleanup) { if (cleanupStatus == null) { - cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false); + cleanupStatus = PrintStatus("=== Cleanup ===", () -> ".", false, false); } tests[0].GlobalCleanupAsync().block(); @@ -139,7 +141,7 @@ public static void Run(Class testClass, PerfStressOptions options) { } public static void RunTests(PerfStressTest[] tests, boolean sync, int parallel, int durationSeconds, String title) { - _completedOperations.set(0); + _completedOperations = new int[parallel]; _lastCompletionNanoTimes = new long[parallel]; long endNanoTime = System.nanoTime() + ((long) durationSeconds * 1000000000); @@ -147,81 +149,91 @@ public static void RunTests(PerfStressTest[] tests, boolean sync, int paralle int[] lastCompleted = new int[] { 0 }; Disposable progressStatus = PrintStatus( "=== " + title + " ===" + System.lineSeparator() + "Current\t\tTotal", () -> { - int totalCompleted = _completedOperations.get(); + int totalCompleted = IntStream.of(_completedOperations).sum(); int currentCompleted = totalCompleted - lastCompleted[0]; lastCompleted[0] = totalCompleted; return currentCompleted + "\t\t" + totalCompleted; - }, true); + }, true, true); if (sync) { ForkJoinPool forkJoinPool = new ForkJoinPool(parallel); try { forkJoinPool.submit(() -> { - Arrays.stream(tests).parallel().forEach(t -> RunLoop(t, endNanoTime)); + IntStream.range(0, parallel).parallel().forEach(i -> RunLoop(tests[i], i, endNanoTime)); }).get(); } catch (InterruptedException | ExecutionException e) { throw new RuntimeException(e); } } else { - Flux.just(tests).flatMap(t -> RunLoopAsync(t, endNanoTime)).blockLast(); + Flux.range(0, parallel).flatMap(i -> RunLoopAsync(tests[i], i, endNanoTime)).blockLast(); } progressStatus.dispose(); System.out.println("=== Results ==="); - double averageElapsedSeconds = (Arrays.stream(_lastCompletionNanoTimes).average().orElse(Double.NaN)) - / 1000000000; - double operationsPerSecond = _completedOperations.get() / averageElapsedSeconds; + int totalOperations = IntStream.of(_completedOperations).sum(); + double operationsPerSecond = IntStream.range(0, parallel) + .mapToDouble(i -> ((double)_completedOperations[i]) / (_lastCompletionNanoTimes[i] / 1000000000)) + .sum(); double secondsPerOperation = 1 / operationsPerSecond; + double weightedAverageSeconds = totalOperations / operationsPerSecond; - System.out.printf("Completed %d operations in an average of %.2fs (%.2f ops/s, %.3f s/op)%n", - _completedOperations.get(), averageElapsedSeconds, operationsPerSecond, secondsPerOperation); + System.out.printf("Completed %d operations in a weighted-average of %.2fs (%.2f ops/s, %.3f s/op)%n", + totalOperations, weightedAverageSeconds, operationsPerSecond, secondsPerOperation); System.out.println(); } - private static void RunLoop(PerfStressTest test, long endNanoTime) { + private static void RunLoop(PerfStressTest test, int index, long endNanoTime) { long startNanoTime = System.nanoTime(); while (System.nanoTime() < endNanoTime) { test.Run(); - int count = _completedOperations.incrementAndGet(); - _lastCompletionNanoTimes[count % _lastCompletionNanoTimes.length] = System.nanoTime() - startNanoTime; + _completedOperations[index]++; + _lastCompletionNanoTimes[index] = System.nanoTime() - startNanoTime; } } - private static Mono RunLoopAsync(PerfStressTest test, long endNanoTime) { + private static Mono RunLoopAsync(PerfStressTest test, int index, long endNanoTime) { long startNanoTime = System.nanoTime(); return Flux.just(1) .repeat() .flatMap(i -> test.RunAsync().then(Mono.just(1)), 1) .doOnNext(v -> { - int count = _completedOperations.incrementAndGet(); - _lastCompletionNanoTimes[count % _lastCompletionNanoTimes.length] = System.nanoTime() - startNanoTime; + _completedOperations[index]++; + _lastCompletionNanoTimes[index] = System.nanoTime() - startNanoTime; }) .take(Duration.ofNanos(endNanoTime - startNanoTime)) .then(); } - private static Disposable PrintStatus(String header, Supplier status, boolean newLine) { + private static Disposable PrintStatus(String header, Supplier status, boolean newLine, boolean printFinalStatus) { System.out.println(header); boolean[] needsExtraNewline = new boolean[] { false }; return Flux.interval(Duration.ofSeconds(1)).doFinally(s -> { + if (printFinalStatus) { + PrintStatusHelper(status, newLine, needsExtraNewline); + } + if (needsExtraNewline[0]) { System.out.println(); } System.out.println(); }).subscribe(i -> { - Object obj = status.get(); - if (newLine) { - System.out.println(obj); - } else { - System.out.print(obj); - needsExtraNewline[0] = true; - } + PrintStatusHelper(status, newLine, needsExtraNewline); }); } + + private static void PrintStatusHelper(Supplier status, boolean newLine, boolean[] needsExtraNewline) { + Object obj = status.get(); + if (newLine) { + System.out.println(obj); + } else { + System.out.print(obj); + needsExtraNewline[0] = true; + } + } } \ No newline at end of file diff --git a/java/azure-perfstress/src/main/java/com/azure/perfstress/SleepTest.java b/java/azure-perfstress/src/main/java/com/azure/perfstress/SleepTest.java new file mode 100644 index 0000000000000..0d02c5a5a8102 --- /dev/null +++ b/java/azure-perfstress/src/main/java/com/azure/perfstress/SleepTest.java @@ -0,0 +1,40 @@ +package com.azure.perfstress; + +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicInteger; + +public class SleepTest extends PerfStressTest { + private static final AtomicInteger _instanceCount = new AtomicInteger(); + private final int _secondsPerOperation; + + public SleepTest(PerfStressOptions options) { + super(options); + + int instanceCount = _instanceCount.incrementAndGet(); + _secondsPerOperation = Pow(2, instanceCount); + } + + private static int Pow(int value, int exponent) { + int power = 1; + for (int i=0; i < exponent; i++) { + power *= value; + } + return power; + } + + @Override + public void Run() { + try { + Thread.sleep(_secondsPerOperation * 1000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public Mono RunAsync() { + return Mono.delay(Duration.ofSeconds(_secondsPerOperation)).then(); + } +} \ No newline at end of file