Skip to content

Commit

Permalink
Optimize observability and metrics tags processing (#3902)
Browse files Browse the repository at this point in the history
Both `KeyValues` and `Tags` unconditionally maintain the invariant that
they wrap distinct key-value pairs so the deduplication logic can be
removed. 
`Scannable#tagsDeduplicated` has been marked as deprecated.
  • Loading branch information
Stephan202 authored Nov 25, 2024
1 parent da340cd commit 7cc701c
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 13 deletions.
6 changes: 6 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,24 @@ configurations {
dependencies {
// Use the baseline to avoid using new APIs in the benchmarks
compileOnly libs.reactor.perfBaseline.core
compileOnly libs.reactor.perfBaseline.coreMicrometer
compileOnly libs.jsr305

implementation "org.openjdk.jmh:jmh-core:$jmhVersion"
implementation libs.reactor.perfBaseline.extra, {
exclude group: 'io.projectreactor', module: 'reactor-core'
}
implementation platform(libs.micrometer.bom)
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmhVersion"

current project(':reactor-core')
current project(':reactor-core-micrometer')
baseline libs.reactor.perfBaseline.core, {
changing = true
}
baseline libs.reactor.perfBaseline.coreMicrometer, {
changing = true
}
}

task jmhProfilers(type: JavaExec, description:'Lists the available profilers for the jmh task', group: 'Development') {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright (c) 2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.observability.micrometer;

import io.micrometer.core.instrument.Tags;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

@BenchmarkMode({Mode.AverageTime})
@Warmup(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
@State(Scope.Benchmark)
public class MicrometerMeterListenerConfigurationResolveTagsBenchmark {
@Param({"1|1", "1|2", "1|5", "1|10", "2|2", "2|5", "2|10", "5|5", "5|10", "10|10"})
private String testCase;

private Publisher<Void> publisher;

@Setup(Level.Iteration)
public void setup() {
String[] arguments = testCase.split("\\|", -1);
int distinctTagCount = Integer.parseInt(arguments[0]);
int totalTagCount = Integer.parseInt(arguments[1]);

publisher = addTags(Mono.empty(), distinctTagCount, totalTagCount);
}

@SuppressWarnings("unused")
@Benchmark
public Tags measureThroughput() {
return MicrometerMeterListenerConfiguration.resolveTags(publisher, Tags.of("k", "v"));
}

private static <T> Mono<T> addTags(Mono<T> source, int distinct, int total) {
if (total == 0) {
return source;
}

return addTags(source.tag("k-" + total % distinct, "v-" + total), distinct, total - 1);
}
}
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
# Baselines, should be updated on every release
baseline-core-api = "3.7.0"
baselinePerfCore = "3.7.0"
baselinePerfCoreMicrometer = "1.2.0"
baselinePerfExtra = "3.5.2"

# Other shared versions
Expand Down Expand Up @@ -46,6 +47,7 @@ kotlin-stdlib = { module = "org.jetbrains.kotlin:kotlin-stdlib", version.ref = "
reactiveStreams = { module = "org.reactivestreams:reactive-streams", version.ref = "reactiveStreams" }
reactiveStreams-tck = { module = "org.reactivestreams:reactive-streams-tck", version.ref = "reactiveStreams" }
reactor-perfBaseline-core = { module = "io.projectreactor:reactor-core", version.ref = "baselinePerfCore" }
reactor-perfBaseline-coreMicrometer = { module = "io.projectreactor:reactor-core-micrometer", version.ref = "baselinePerfCoreMicrometer" }
reactor-perfBaseline-extra = { module = "io.projectreactor.addons:reactor-extra", version.ref = "baselinePerfExtra" }

[plugins]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -93,9 +93,9 @@ static Tags resolveTags(Publisher<?> source, Tags tags) {
Scannable scannable = Scannable.from(source);

if (scannable.isScanAvailable()) {
List<Tag> discoveredTags = scannable.tagsDeduplicated()
.entrySet().stream()
.map(e -> Tag.of(e.getKey(), e.getValue()))
// `Tags#and` deduplicates tags by key, retaining the last value as required.
List<Tag> discoveredTags = scannable.tags()
.map(t -> Tag.of(t.getT1(), t.getT2()))
.collect(Collectors.toList());
return tags.and(discoveredTags);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,9 +70,9 @@ static KeyValues resolveKeyValues(Publisher<?> source, KeyValues tags) {
Scannable scannable = Scannable.from(source);

if (scannable.isScanAvailable()) {
List<KeyValue> discoveredTags = scannable.tagsDeduplicated()
.entrySet().stream()
.map(e -> KeyValue.of(e.getKey(), e.getValue()))
// `KeyValues#and` deduplicates tags by key, retaining the last value as required.
List<KeyValue> discoveredTags = scannable.tags()
.map(e -> KeyValue.of(e.getT1(), e.getT2()))
.collect(Collectors.toList());
return tags.and(discoveredTags);
}
Expand Down
4 changes: 3 additions & 1 deletion reactor-core/src/main/java/reactor/core/Scannable.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -627,7 +627,9 @@ default Stream<Tuple2<String, String>> tags() {
*
* @return a {@link Map} of deduplicated tags from this {@link Scannable} and its reachable parents
* @see #tags()
* @deprecated Micrometer APIs generally deduplicate tags and key-value pairs by default, so for related use cases prefer {@link #tags()}.
*/
@Deprecated
default Map<String, String> tagsDeduplicated() {
return tags().collect(Collectors.toMap(Tuple2::getT1, Tuple2::getT2,
(s1, s2) -> s2, LinkedHashMap::new));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2018-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -295,9 +295,9 @@ static Tags resolveTags(Publisher<?> source, Tags tags) {
Scannable scannable = Scannable.from(source);

if (scannable.isScanAvailable()) {
List<Tag> discoveredTags = scannable.tagsDeduplicated()
.entrySet().stream()
.map(e -> Tag.of(e.getKey(), e.getValue()))
// `Tags#and` deduplicates tags by key, retaining the last value as required.
List<Tag> discoveredTags = scannable.tags()
.map(t -> Tag.of(t.getT1(), t.getT2()))
.collect(Collectors.toList());
return tags.and(discoveredTags);
}
Expand Down

0 comments on commit 7cc701c

Please sign in to comment.