in uforwarder/src/main/java/com/uber/data/kafka/consumerproxy/worker/processor/SimpleOutboundMessageLimiter.java [113:160]
public void publishMetrics() {
int nPartitions = topicPartitionToScopeAndInflight.size();
topicPartitionToScopeAndInflight.forEach(
(tp, scopeAndInflight) -> {
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_SIZE)
.update(scopeAndInflight.inflight.intValue());
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_SIZE_ONE_MINUTE_MAX)
.update(inflightTracker.oneMinuteMax() / (double) nPartitions);
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_SIZE_ONE_MINUTE_MIN)
.update(inflightTracker.oneMinuteMin() / (double) nPartitions);
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_LIMIT)
.update(longFixedInflightLimiter.getMetrics().getLimit() / (double) nPartitions);
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_QUEUE)
.update(
asyncStaticLimiterAdapter.getMetrics().getAsyncQueueSize()
/ (double) nPartitions);
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_ADAPTIVE_LIMIT)
.update(adaptiveInflightLimiter.getMetrics().getLimit() / (double) nPartitions);
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_SHADOW_ADAPTIVE_LIMIT)
.update(shadowAdaptiveInflightLimiter.getMetrics().getLimit() / (double) nPartitions);
scopeAndInflight
.scope
.gauge(MetricNames.OUTBOUND_CACHE_ADAPTIVE_LIMIT_ENABLED)
.update(useFixedLimiter() ? 0 : 1);
publishMetrics(
scopeAndInflight.scope,
adaptiveInflightLimiter.getMetrics().getExtraMetrics(),
MetricNames.OUTBOUND_CACHE_ADAPTIVE_LIMIT);
publishMetrics(
scopeAndInflight.scope,
shadowAdaptiveInflightLimiter.getMetrics().getExtraMetrics(),
MetricNames.OUTBOUND_CACHE_SHADOW_ADAPTIVE_LIMIT);
});
}