diff --git a/benchmarks/src/main/java/reactor/core/publisher/MonoAllBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/MonoAllBenchmark.java index 97f27110ea..9642bfb714 100644 --- a/benchmarks/src/main/java/reactor/core/publisher/MonoAllBenchmark.java +++ b/benchmarks/src/main/java/reactor/core/publisher/MonoAllBenchmark.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,8 +49,8 @@ public static void main(String[] args) throws Exception { @SuppressWarnings("unused") @Benchmark - public void measureThroughput() { - Flux.range(0, rangeSize) + public Boolean measureThroughput() { + return Flux.range(0, rangeSize) .all(i -> i < Integer.MAX_VALUE) .block(); } diff --git a/benchmarks/src/main/java/reactor/core/publisher/MonoCallableBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/MonoCallableBenchmark.java index 886b9a2c43..79846a4adb 100644 --- a/benchmarks/src/main/java/reactor/core/publisher/MonoCallableBenchmark.java +++ b/benchmarks/src/main/java/reactor/core/publisher/MonoCallableBenchmark.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -49,8 +49,8 @@ public static void main(String[] args) throws Exception { @SuppressWarnings("unused") @Benchmark - public void measureThroughput() { - Flux.range(0, rangeSize) + public Boolean measureThroughput() { + return Flux.range(0, rangeSize) .all(i -> i < Integer.MAX_VALUE) .block(); } diff --git a/benchmarks/src/main/java/reactor/core/publisher/TracesBenchmark.java b/benchmarks/src/main/java/reactor/core/publisher/TracesBenchmark.java new file mode 100644 index 0000000000..68d85b37c6 --- /dev/null +++ b/benchmarks/src/main/java/reactor/core/publisher/TracesBenchmark.java @@ -0,0 +1,70 @@ +/* + * Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.concurrent.TimeUnit; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +@BenchmarkMode({Mode.AverageTime}) +@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS) +@Fork(value = 1) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@State(Scope.Benchmark) +public class TracesBenchmark { + @Param({"0", "1", "2"}) + private int reactorLeadingLines; + + @Param({"0", "1", "2"}) + private int trailingLines; + + private String stackTrace; + + @Setup(Level.Iteration) + public void setup() { + stackTrace = createStackTrace(reactorLeadingLines, trailingLines); + } + + @SuppressWarnings("unused") + @Benchmark + public String measureThroughput() { + return Traces.extractOperatorAssemblyInformation(stackTrace); + } + + private static String createStackTrace(int reactorLeadingLines, int trailingLines) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < reactorLeadingLines; i++) { + sb.append("\tat reactor.core.publisher.Flux.someOperation(Flux.java:42)\n"); + } + sb.append("\tat some.user.package.SomeUserClass.someOperation(SomeUserClass.java:1234)\n"); + for (int i = 0; i < trailingLines; i++) { + sb.append("\tat any.package.AnyClass.anyOperation(AnyClass.java:1)\n"); + } + return sb.toString(); + } +} diff --git a/reactor-core/src/main/java/reactor/core/Scannable.java b/reactor-core/src/main/java/reactor/core/Scannable.java index ca3b1143c3..b75cfaa183 100644 --- a/reactor-core/src/main/java/reactor/core/Scannable.java +++ b/reactor-core/src/main/java/reactor/core/Scannable.java @@ -449,7 +449,7 @@ default String name() { .map(s -> s.scan(Attr.NAME)) .filter(Objects::nonNull) .findFirst() - .orElse(stepName()); + .orElseGet(this::stepName); } /** diff --git a/reactor-core/src/main/java/reactor/core/publisher/Traces.java b/reactor-core/src/main/java/reactor/core/publisher/Traces.java index bac53fc126..74e6b0d9d6 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Traces.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Traces.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,11 +16,10 @@ package reactor.core.publisher; -import java.util.List; +import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; - +import reactor.util.annotation.Nullable; /** * Utilities around manipulating stack traces and displaying assembly traces. @@ -29,7 +28,6 @@ * @author Sergei Egorov */ final class Traces { - /** * If set to true, the creation of FluxOnAssembly will capture the raw stacktrace * instead of the sanitized version. @@ -47,6 +45,8 @@ final class Traces { */ static final Supplier> callSiteSupplierFactory = new CallSiteSupplierFactory(); + private static final String PUBLISHER_PACKAGE_PREFIX = "reactor.core.publisher."; + /** * Return true for strings (usually from a stack trace element) that should be * sanitized out by {@link Traces#callSiteSupplierFactory}. @@ -57,9 +57,8 @@ final class Traces { static boolean shouldSanitize(String stackTraceRow) { return stackTraceRow.startsWith("java.util.function") || stackTraceRow.startsWith("reactor.core.publisher.Mono.onAssembly") - || stackTraceRow.equals("reactor.core.publisher.Mono.onAssembly") - || stackTraceRow.equals("reactor.core.publisher.Flux.onAssembly") - || stackTraceRow.equals("reactor.core.publisher.ParallelFlux.onAssembly") + || stackTraceRow.startsWith("reactor.core.publisher.Flux.onAssembly") + || stackTraceRow.startsWith("reactor.core.publisher.ParallelFlux.onAssembly") || stackTraceRow.startsWith("reactor.core.publisher.SignalLogger") || stackTraceRow.startsWith("reactor.core.publisher.FluxOnAssembly") || stackTraceRow.startsWith("reactor.core.publisher.MonoOnAssembly.") @@ -72,7 +71,7 @@ static boolean shouldSanitize(String stackTraceRow) { } /** - * Extract operator information out of an assembly stack trace in {@link String} form + * Extracts operator information out of an assembly stack trace in {@link String} form * (see {@link Traces#callSiteSupplierFactory}). *

* Most operators will result in a line of the form {@code "Flux.map ⇢ user.code.Class.method(Class.java:123)"}, @@ -83,11 +82,10 @@ static boolean shouldSanitize(String stackTraceRow) { * (eg. {@code "Flux.map"}) *

  • The next stacktrace element is considered user code and is appended to the * result with a {@code ⇢} separator. (eg. {@code " ⇢ user.code.Class.method(Class.java:123)"})
  • - *
  • If no user code is found in the sanitized stack, then the API reference is outputed in the later format only.
  • + *
  • If no user code is found in the sanitized stack, then the API reference is output in the later format only.
  • *
  • If the sanitized stack is empty, returns {@code "[no operator assembly information]"}
  • * * - * * @param source the sanitized assembly stacktrace in String format. * @return a {@link String} representing operator and operator assembly site extracted * from the assembly stack trace. @@ -97,80 +95,164 @@ static String extractOperatorAssemblyInformation(String source) { switch (parts.length) { case 0: return "[no operator assembly information]"; + case 1: + return parts[0]; + case 2: + return parts[0] + CALL_SITE_GLUE + parts[1]; default: - return String.join(CALL_SITE_GLUE, parts); + throw new IllegalStateException("Unexpected number of assembly info parts: " + parts.length); } } static boolean isUserCode(String line) { - return !line.startsWith("reactor.core.publisher") || line.contains("Test"); + return !line.startsWith(PUBLISHER_PACKAGE_PREFIX) || line.contains("Test"); } /** - * Extract operator information out of an assembly stack trace in {@link String} form - * (see {@link Traces#callSiteSupplierFactory}) which potentially - * has a header line that one can skip by setting {@code skipFirst} to {@code true}. + * Extracts operator information out of an assembly stack trace in {@link String} array form + * (see {@link Traces#callSiteSupplierFactory}). *

    - * Most operators will result in a line of the form {@code "Flux.map ⇢ user.code.Class.method(Class.java:123)"}, - * that is: - *

      - *
    1. The top of the stack is inspected for Reactor API references, and the deepest - * one is kept, since multiple API references generally denote an alias operator. - * (eg. {@code "Flux.map"})
    2. - *
    3. The next stacktrace element is considered user code and is appended to the - * result with a {@code ⇢} separator. (eg. {@code " ⇢ user.code.Class.method(Class.java:123)"})
    4. - *
    5. If no user code is found in the sanitized stack, then the API reference is outputed in the later format only.
    6. - *
    7. If the sanitized stack is empty, returns {@code "[no operator assembly information]"}
    8. - *
    - * + * The returned array will contain 0, 1 or 2 elements, extracted in a manner as described by + * {@link #extractOperatorAssemblyInformation(String)}. * * @param source the sanitized assembly stacktrace in String format. - * @return a {@link String} representing operator and operator assembly site extracted - * from the assembly stack trace. + * @return a 0-2 element string array containing the operator and operator assembly site extracted + * from the assembly stack trace */ static String[] extractOperatorAssemblyInformationParts(String source) { - String[] uncleanTraces = source.split("\n"); - final List traces = Stream.of(uncleanTraces) - .map(String::trim) - .filter(s -> !s.isEmpty()) - .collect(Collectors.toList()); + Iterator traces = trimmedNonemptyLines(source); - if (traces.isEmpty()) { + if (!traces.hasNext()) { return new String[0]; } - int i = 0; - while (i < traces.size() && !isUserCode(traces.get(i))) { - i++; + Substring prevLine = null; + Substring currentLine = traces.next(); + + if (currentLine.isUserCode()) { + // No line is a Reactor API line. + return new String[]{currentLine.toString()}; } - String apiLine; - String userCodeLine; - if (i == 0) { - //no line was a reactor API line - apiLine = ""; - userCodeLine = traces.get(0); + while (traces.hasNext()) { + prevLine = currentLine; + currentLine = traces.next(); + + if (currentLine.isUserCode()) { + // Currently on user code line, previous one is API. Attempt to create something in the form + // "Flux.map ⇢ user.code.Class.method(Class.java:123)". + return new String[]{ + prevLine.withoutPublisherPackagePrefix().withoutLocationSuffix().toString(), + "at " + currentLine}; + } } - else if (i == traces.size()) { - //we skipped ALL lines, meaning they're all reactor API lines. We'll fully display the last one - apiLine = ""; - userCodeLine = traces.get(i-1).replaceFirst("reactor.core.publisher.", ""); + + // We skipped ALL lines, meaning they're all Reactor API lines. We'll fully display the last + // one. + return new String[]{currentLine.withoutPublisherPackagePrefix().toString()}; + } + + /** + * Returns an iterator over all trimmed non-empty lines in the given source string. + * + * @implNote This implementation attempts to minimize allocations. + */ + private static Iterator trimmedNonemptyLines(String source) { + return new Iterator() { + private int index = 0; + @Nullable + private Substring next = getNextLine(); + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public Substring next() { + Substring current = next; + if (current == null) { + throw new NoSuchElementException(); + } + next = getNextLine(); + return current; + } + + @Nullable + private Substring getNextLine() { + while (index < source.length()) { + int end = source.indexOf('\n', index); + if (end == -1) { + end = source.length(); + } + Substring line = new Substring(source, index, end).trim(); + index = end + 1; + if (!line.isEmpty()) { + return line; + } + } + return null; + } + }; + } + + // XXX: Explain. + private static final class Substring { + private final String str; + private final int start; + private final int end; + + Substring(String str, int start, int end) { + this.str = str; + this.start = start; + this.end = end; } - else { - //currently on user code line, previous one is API - apiLine = traces.get(i - 1); - userCodeLine = traces.get(i); + + Substring trim() { + int newStart = start; + while (newStart < end && str.charAt(newStart) <= ' ') { + newStart++; + } + int newEnd = end; + while (newEnd > newStart && str.charAt(newEnd - 1) <= ' ') { + newEnd--; + } + return newStart == start && newEnd == end ? this : new Substring(str, newStart, newEnd); } - //now we want something in the form "Flux.map ⇢ user.code.Class.method(Class.java:123)" - if (apiLine.isEmpty()) return new String[] { userCodeLine }; + boolean isEmpty() { + return start == end; + } + + boolean startsWith(String prefix) { + return str.startsWith(prefix, start); + } + + boolean contains(String substring) { + int index = str.indexOf(substring, start); + return index >= 0 && index < end; + } - int linePartIndex = apiLine.indexOf('('); - if (linePartIndex > 0) { - apiLine = apiLine.substring(0, linePartIndex); + boolean isUserCode() { + return !startsWith(PUBLISHER_PACKAGE_PREFIX) || contains("Test"); } - apiLine = apiLine.replaceFirst("reactor.core.publisher.", ""); - return new String[] { apiLine, "at " + userCodeLine }; + Substring withoutLocationSuffix() { + int linePartIndex = str.indexOf('(', start); + return linePartIndex > 0 && linePartIndex < end + ? new Substring(str, start, linePartIndex) + : this; + } + + Substring withoutPublisherPackagePrefix() { + return startsWith(PUBLISHER_PACKAGE_PREFIX) + ? new Substring(str, start + PUBLISHER_PACKAGE_PREFIX.length(), end) + : this; + } + + @Override + public String toString() { + return str.substring(start, end); + } } }