Skip to content

Commit

Permalink
[all] OpenTelemetry metrics integration in venice-router (#1303)
Browse files Browse the repository at this point in the history
Initial integration of openTelemetry in venice-router.

Changes:
- Before this change, MetricsRepository holding MetricsConfig was passed around from service creation throughout the service code to carrying the config and the metrics. Introduced VeniceMetricsRepository to extend MetricsRepository and also hold VeniceOpenTelemetryMetricsRepository. Introduced VeniceMetricsConfig to include tehuti's MetricConfig and Otel config. Either MetricsRepository or VeniceMetricsRepository can be passed in during router service creation and if VeniceMetricsRepository is used and if otel configs are enabled, otel metrics will be used along with tehuti metrics.
- VeniceOpenTelemetryMetricsRepository holds histogramMap and counterMap to create a counter only once regardless for multiple request types and store name: Those should be dimensions instead.
- Added new dimensions: venice.store.name, venice.cluster.name, venice.request.method, http.response.status_code, http.response.status_code_category, venice.response.status_code_category, venice.request.validation_outcome, venice.client.type, venice.request.retry_type, venice.request.retry_abort_reason
- added new otel metrics for some of the existing router metrics: venice.router.call_time, venice.router.incoming_call_count, venice.router.call_count, venice.router.retry_count, venice.router.aborted_retry_count, venice.router.retry_delay, venice.router.call_key_count
- introduced configs to enable/disable this feature and to export the metrics
otel.venice.enabled => To enable/disable opentelemetry in venice
otel.venice.export.to.log => Export the metrics to logs, for debug purposes
otel.venice.export.to.endpoint => Export metrics to the configured endpoint
otel.venice.metrics.naming.format => VeniceOpenTelemetryMetricNamingFormat => snake_case (default), pascal_case, camel_case
otel.venice.custom.dimensions.map => Can be configured to pass in custom dimensions or system dimensions.
- It also uses existing otel configs to configure the endpoints like below and other opentelemetry supported configs
otel.exporter.otlp.metrics.protocol
otel.exporter.otlp.metrics.endpoint
otel.exporter.otlp.metrics.temporality.preference
otel.exporter.otlp.metrics.headers
otel.exporter.otlp.metrics.default.histogram.aggregation
otel.exporter.otlp.metrics.default.histogram.aggregation.max.scale
otel.exporter.otlp.metrics.default.histogram.aggregation.max.buckets

Optimizations in line for the next iteration:
- Prepopulate the additional dimensions rather than populating it during record()
- Transformer for the metric/dimension names rather than just predefined formats
- Revisit the structuring of MetricEntityState to not have to do a lookup of tehuti metric everytime record() is called
- Revisit otel histogram default maxScale and maxBuckets config
- Skip or Noop for otel record() for pre aggregations in code. For instance: if storeName is total or total.<clusterName>
  • Loading branch information
m-nagarajan authored Nov 26, 2024
1 parent cc6e3db commit ce23e38
Show file tree
Hide file tree
Showing 76 changed files with 3,139 additions and 248 deletions.
6 changes: 6 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def hadoopVersion = '2.10.2'
def apacheSparkVersion = '3.3.3'
def antlrVersion = '4.8'
def scala = '2.12'
def openTelemetryVersion = '1.43.0'

ext.libraries = [
alpnAgent: "org.mortbay.jetty.alpn:jetty-alpn-agent:${alpnAgentVersion}",
Expand Down Expand Up @@ -141,6 +142,11 @@ ext.libraries = [
zkclient: 'com.101tec:zkclient:0.7', // For Kafka AdminUtils
zookeeper: 'org.apache.zookeeper:zookeeper:3.6.3',
zstd: 'com.github.luben:zstd-jni:1.5.2-3',
opentelemetryApi: "io.opentelemetry:opentelemetry-api:${openTelemetryVersion}",
opentelemetrySdk: "io.opentelemetry:opentelemetry-sdk:${openTelemetryVersion}",
opentelemetryExporterLogging: "io.opentelemetry:opentelemetry-exporter-logging:${openTelemetryVersion}",
opentelemetryExporterOtlp: "io.opentelemetry:opentelemetry-exporter-otlp:${openTelemetryVersion}",
opentelemetryExporterCommon: "io.opentelemetry:opentelemetry-exporter-common:${openTelemetryVersion}"
]

group = 'com.linkedin.venice'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ public AggHostLevelIngestionStats(
boolean unregisterMetricForDeletedStoreEnabled,
Time time) {
super(
serverConfig.getClusterName(),
metricsRepository,
new HostLevelStoreIngestionStatsSupplier(serverConfig, ingestionTaskMap, time),
metadataRepository,
unregisterMetricForDeletedStoreEnabled);
unregisterMetricForDeletedStoreEnabled,
false);
}

static class HostLevelStoreIngestionStatsSupplier implements StatsSupplier<HostLevelIngestionStats> {
Expand All @@ -44,14 +46,15 @@ static class HostLevelStoreIngestionStatsSupplier implements StatsSupplier<HostL
}

@Override
public HostLevelIngestionStats get(MetricsRepository metricsRepository, String storeName) {
public HostLevelIngestionStats get(MetricsRepository metricsRepository, String storeName, String clusterName) {
throw new VeniceException("Should not be called.");
}

@Override
public HostLevelIngestionStats get(
MetricsRepository metricsRepository,
String storeName,
String clusterName,
HostLevelIngestionStats totalStats) {
return new HostLevelIngestionStats(
metricsRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public AggKafkaConsumerServiceStats(
metricsRepository,
new KafkaConsumerServiceStatsSupplier(getMaxElapsedTimeSinceLastPollInConsumerPool),
metadataRepository,
isUnregisterMetricForDeletedStoreEnabled);
isUnregisterMetricForDeletedStoreEnabled,
true);
}

public void recordTotalConsumerIdleTime(double idleTime) {
Expand Down Expand Up @@ -107,14 +108,15 @@ static class KafkaConsumerServiceStatsSupplier implements StatsSupplier<KafkaCon
}

@Override
public KafkaConsumerServiceStats get(MetricsRepository metricsRepository, String storeName) {
public KafkaConsumerServiceStats get(MetricsRepository metricsRepository, String storeName, String clusterName) {
throw new VeniceException("Should not be called.");
}

@Override
public KafkaConsumerServiceStats get(
MetricsRepository metricsRepository,
String storeName,
String clusterName,
KafkaConsumerServiceStats totalStats) {
return new KafkaConsumerServiceStats(
metricsRepository,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void recordRocksDBOpenFailure() {
}

static class StorageEngineStatsReporter extends AbstractVeniceStatsReporter<StorageEngineStats> {
public StorageEngineStatsReporter(MetricsRepository metricsRepository, String storeName) {
public StorageEngineStatsReporter(MetricsRepository metricsRepository, String storeName, String clusterName) {
super(metricsRepository, storeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* collection/visualization system.
*/
public class DIVStatsReporter extends AbstractVeniceStatsReporter<DIVStats> {
public DIVStatsReporter(MetricsRepository metricsRepository, String storeName) {
public DIVStatsReporter(MetricsRepository metricsRepository, String storeName, String clusterName) {
super(metricsRepository, storeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
public class IngestionStatsReporter extends AbstractVeniceStatsReporter<IngestionStats> {
private static final Logger LOGGER = LogManager.getLogger(IngestionStatsReporter.class);

public IngestionStatsReporter(MetricsRepository metricsRepository, String storeName) {
public IngestionStatsReporter(MetricsRepository metricsRepository, String storeName, String clusterName) {
super(metricsRepository, storeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ public VeniceVersionedStatsReporter(
registerSensor("current_version", new AsyncGauge((ignored1, ignored2) -> currentVersion, "current_version"));
registerSensor("future_version", new AsyncGauge((ignored1, ignored2) -> futureVersion, "future_version"));

this.currentStatsReporter = statsSupplier.get(metricsRepository, storeName + "_current");
this.currentStatsReporter = statsSupplier.get(metricsRepository, storeName + "_current", (String) null);
if (!isSystemStore) {
this.futureStatsReporter = statsSupplier.get(metricsRepository, storeName + "_future");
this.totalStatsReporter = statsSupplier.get(metricsRepository, storeName + "_total");
this.futureStatsReporter = statsSupplier.get(metricsRepository, storeName + "_future", (String) null);
this.totalStatsReporter = statsSupplier.get(metricsRepository, storeName + "_total", (String) null);
} else {
this.futureStatsReporter = null;
this.totalStatsReporter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ public HeartbeatMonitoringService(
metricsRepository,
metadataRepository,
() -> new HeartbeatStat(new MetricConfig(), regionNames),
(aMetricsRepository, storeName) -> new HeartbeatStatReporter(aMetricsRepository, storeName, regionNames),
(aMetricsRepository, storeName, clusterName) -> new HeartbeatStatReporter(
aMetricsRepository,
storeName,
regionNames),
leaderHeartbeatTimeStamps,
followerHeartbeatTimeStamps);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public void testDIVReporterCanReport() {
metricsRepository.addReporter(reporter);

String storeName = Utils.getUniqueString("store");
DIVStatsReporter divStatsReporter = new DIVStatsReporter(metricsRepository, storeName);
DIVStatsReporter divStatsReporter = new DIVStatsReporter(metricsRepository, storeName, null);

assertEquals(reporter.query("." + storeName + "--success_msg.DIVStatsGauge").value(), (double) NULL_DIV_STATS.code);

Expand Down
5 changes: 5 additions & 0 deletions internal/venice-client-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ dependencies {
implementation libraries.log4j2api
implementation libraries.zstd
implementation libraries.conscrypt
implementation libraries.opentelemetryApi
implementation libraries.opentelemetrySdk
implementation libraries.opentelemetryExporterLogging
implementation libraries.opentelemetryExporterOtlp
implementation libraries.opentelemetryExporterCommon

testImplementation project(':internal:venice-test-common')
testImplementation project(':clients:venice-thin-client')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,56 @@ public abstract class AbstractVeniceAggStats<T extends AbstractVeniceStats> {
protected final Map<String, T> storeStats = new VeniceConcurrentHashMap<>();

private StatsSupplier<T> statsFactory;

private final MetricsRepository metricsRepository;
private final String clusterName;

private AbstractVeniceAggStats(MetricsRepository metricsRepository, StatsSupplier<T> statsSupplier, T totalStats) {
private AbstractVeniceAggStats(
String clusterName,
MetricsRepository metricsRepository,
StatsSupplier<T> statsSupplier,
T totalStats) {
this.clusterName = clusterName;
this.metricsRepository = metricsRepository;
this.statsFactory = statsSupplier;
this.totalStats = totalStats;
}

public AbstractVeniceAggStats(MetricsRepository metricsRepository, StatsSupplier<T> statsSupplier) {
this(metricsRepository, statsSupplier, statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT, null));
}

public AbstractVeniceAggStats(MetricsRepository metricsRepository) {
public AbstractVeniceAggStats(String clusterName, MetricsRepository metricsRepository) {
this.metricsRepository = metricsRepository;
this.clusterName = clusterName;
}

public void setStatsSupplier(StatsSupplier<T> statsSupplier) {
this.statsFactory = statsSupplier;
this.totalStats = statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT, null);
this.totalStats = statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT, clusterName, null);
}

/**
* clusterName is used to create per cluster aggregate stats and {@link com.linkedin.venice.stats.dimensions.VeniceMetricsDimensions} <br>
* If perClusterAggregate is true, it will create per cluster aggregates with storeName as "total.<clusterName>"
*/
public AbstractVeniceAggStats(
String clusterName,
MetricsRepository metricsRepository,
StatsSupplier<T> statsSupplier) {
this(
StatsSupplier<T> statsSupplier,
boolean perClusterAggregate) {
if (perClusterAggregate && clusterName == null) {
throw new IllegalArgumentException("perClusterAggregate cannot be true when clusterName is null");
}
this.clusterName = clusterName;
this.metricsRepository = metricsRepository;
this.statsFactory = statsSupplier;
this.totalStats = statsSupplier.get(
metricsRepository,
statsSupplier,
statsSupplier.get(metricsRepository, STORE_NAME_FOR_TOTAL_STAT + "." + clusterName, null));
perClusterAggregate ? STORE_NAME_FOR_TOTAL_STAT + "." + clusterName : STORE_NAME_FOR_TOTAL_STAT,
clusterName,
null);
}

public T getStoreStats(String storeName) {
return storeStats.computeIfAbsent(storeName, k -> statsFactory.get(metricsRepository, storeName, totalStats));
return storeStats
.computeIfAbsent(storeName, k -> statsFactory.get(metricsRepository, storeName, clusterName, totalStats));
}

public T getNullableStoreStats(String storeName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ public interface StatsSupplier<T extends AbstractVeniceStats> {
/**
* Legacy function, for implementations that do not use total stats in their constructor.
*
* @see #get(MetricsRepository, String, AbstractVeniceStats) which is the only caller.
* @see #get(MetricsRepository, String, String, AbstractVeniceStats) which is the only caller.
*/
T get(MetricsRepository metricsRepository, String storeName);
T get(MetricsRepository metricsRepository, String storeName, String clusterName);

/**
* This is the function that gets called by {@link AbstractVeniceAggStats}, and concrete classes can
* optionally implement it in order to be provided with the total stats instance.
*/
default T get(MetricsRepository metricsRepository, String storeName, T totalStats) {
return get(metricsRepository, storeName);
default T get(MetricsRepository metricsRepository, String storeName, String clusterName, T totalStats) {
return get(metricsRepository, storeName, clusterName);
}
}
Loading

0 comments on commit ce23e38

Please sign in to comment.