Skip to content

Commit

Permalink
Optimize Traces#extractOperatorAssemblyInformationParts
Browse files Browse the repository at this point in the history
This logic may be executed many times, e.g. if a hot code path uses
`{Mono,Flux}#log` or Micrometer instrumentation. The added benchmark
shows that for large stack traces the new implementation is several
orders of magnitude more efficient in terms of compute and memory
resource utilization.

While there, improve two existing benchmarks by utilizing the black hole
to which benchmark method return values are implicitly sent.
  • Loading branch information
Stephan202 committed Nov 21, 2024
1 parent da340cd commit f89a23d
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,8 +49,8 @@ public static void main(String[] args) throws Exception {

@SuppressWarnings("unused")
@Benchmark
public void measureThroughput() {
Flux.range(0, rangeSize)
public Boolean measureThroughput() {
return Flux.range(0, rangeSize)
.all(i -> i < Integer.MAX_VALUE)
.block();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -49,8 +49,8 @@ public static void main(String[] args) throws Exception {

@SuppressWarnings("unused")
@Benchmark
public void measureThroughput() {
Flux.range(0, rangeSize)
public Boolean measureThroughput() {
return Flux.range(0, rangeSize)
.all(i -> i < Integer.MAX_VALUE)
.block();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;

@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class TracesBenchmark {
@Param({"0", "10", "100", "1000"})
private int reactorLeadingLines;

@Param({"0", "10", "100", "1000"})
private int trailingLines;

private String stackTrace;

@Setup(Level.Iteration)
public void setup() {
stackTrace = createLargeStackTrace(reactorLeadingLines, trailingLines);
}

@SuppressWarnings("unused")
@Benchmark
public String measureThroughput() {
return Traces.extractOperatorAssemblyInformation(stackTrace);
}

private static String createLargeStackTrace(int reactorLeadingLines, int trailingLines) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < reactorLeadingLines; i++) {
sb.append("\tat reactor.core.publisher.Flux.someOperation(Flux.java:42)\n");
}
sb.append("\tat some.user.package.SomeUserClass.someOperation(SomeUserClass.java:1234)\n");
for (int i = 0; i < trailingLines; i++) {
sb.append("\tat any.package.AnyClass.anyOperation(AnyClass.java:1)\n");
}
return sb.toString();
}
}
123 changes: 83 additions & 40 deletions reactor-core/src/main/java/reactor/core/publisher/Traces.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,10 @@

package reactor.core.publisher;

import java.util.List;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import reactor.util.annotation.Nullable;

/**
* Utilities around manipulating stack traces and displaying assembly traces.
Expand All @@ -29,6 +28,7 @@
* @author Sergei Egorov
*/
final class Traces {
private static final String PUBLISHER_PACKAGE_PREFIX = "reactor.core.publisher.";

/**
* If set to true, the creation of FluxOnAssembly will capture the raw stacktrace
Expand Down Expand Up @@ -57,7 +57,6 @@ final class Traces {
static boolean shouldSanitize(String stackTraceRow) {
return stackTraceRow.startsWith("java.util.function")
|| stackTraceRow.startsWith("reactor.core.publisher.Mono.onAssembly")
|| stackTraceRow.equals("reactor.core.publisher.Mono.onAssembly")
|| stackTraceRow.equals("reactor.core.publisher.Flux.onAssembly")
|| stackTraceRow.equals("reactor.core.publisher.ParallelFlux.onAssembly")
|| stackTraceRow.startsWith("reactor.core.publisher.SignalLogger")
Expand Down Expand Up @@ -103,7 +102,7 @@ static String extractOperatorAssemblyInformation(String source) {
}

static boolean isUserCode(String line) {
return !line.startsWith("reactor.core.publisher") || line.contains("Test");
return !line.startsWith(PUBLISHER_PACKAGE_PREFIX) || line.contains("Test");
}

/**
Expand All @@ -129,48 +128,92 @@ static boolean isUserCode(String line) {
* from the assembly stack trace.
*/
static String[] extractOperatorAssemblyInformationParts(String source) {
String[] uncleanTraces = source.split("\n");
final List<String> traces = Stream.of(uncleanTraces)
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
Iterator<String> traces = trimmedNonemptyLines(source);

if (traces.isEmpty()) {
if (!traces.hasNext()) {
return new String[0];
}

int i = 0;
while (i < traces.size() && !isUserCode(traces.get(i))) {
i++;
}
String prevLine = null;
String currentLine = traces.next();

String apiLine;
String userCodeLine;
if (i == 0) {
//no line was a reactor API line
apiLine = "";
userCodeLine = traces.get(0);
}
else if (i == traces.size()) {
//we skipped ALL lines, meaning they're all reactor API lines. We'll fully display the last one
apiLine = "";
userCodeLine = traces.get(i-1).replaceFirst("reactor.core.publisher.", "");
}
else {
//currently on user code line, previous one is API
apiLine = traces.get(i - 1);
userCodeLine = traces.get(i);
if (isUserCode(currentLine)) {
// No line is a Reactor API line.
return new String[]{currentLine};
}

//now we want something in the form "Flux.map ⇢ user.code.Class.method(Class.java:123)"
if (apiLine.isEmpty()) return new String[] { userCodeLine };
while (traces.hasNext()) {
prevLine = currentLine;
currentLine = traces.next();

if (isUserCode(currentLine)) {
// Currently on user code line, previous one is API. Attempt to create something in the form
// "Flux.map ⇢ user.code.Class.method(Class.java:123)".
int linePartIndex = prevLine.indexOf('(');
String apiLine = linePartIndex > 0 ?
prevLine.substring(0, linePartIndex) :
prevLine;

int linePartIndex = apiLine.indexOf('(');
if (linePartIndex > 0) {
apiLine = apiLine.substring(0, linePartIndex);
return new String[]{dropPublisherPackagePrefix(apiLine), "at " + currentLine};
}
}
apiLine = apiLine.replaceFirst("reactor.core.publisher.", "");

return new String[] { apiLine, "at " + userCodeLine };
// We skipped ALL lines, meaning they're all Reactor API lines. We'll fully display the last
// one.
return new String[]{dropPublisherPackagePrefix(currentLine)};
}

private static String dropPublisherPackagePrefix(String line) {
return line.startsWith(PUBLISHER_PACKAGE_PREFIX)
? line.substring(PUBLISHER_PACKAGE_PREFIX.length())
: line;
}

/**
* Returns an iterator over all trimmed non-empty lines in the given source string.
*
* @implNote This implementation attempts to minimize allocations.
*/
private static Iterator<String> trimmedNonemptyLines(String source) {
return new Iterator<String>() {
private int index = 0;
@Nullable
private String next = getNextLine();

@Override
public boolean hasNext() {
return next != null;
}

@Override
public String next() {
String current = next;
if (current == null) {
throw new NoSuchElementException();
}
next = getNextLine();
return current;
}

@Nullable
private String getNextLine() {
if (index >= source.length()) {
return null;
}

while (index < source.length()) {
int end = source.indexOf('\n', index);
if (end == -1) {
end = source.length();
}
String line = source.substring(index, end).trim();
index = end + 1;
if (!line.isEmpty()) {
return line;
}
}
return null;
}
};
}
}

0 comments on commit f89a23d

Please sign in to comment.