in oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java [237:362]
private Stream<? extends Metric> adaptMetrics(
final Map<String, String> nodeLabels,
final io.opentelemetry.proto.metrics.v1.Metric metric) {
if (metric.hasGauge()) {
return metric.getGauge().getDataPointsList().stream().filter(point ->
(point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasSum()) {
final Sum sum = metric.getSum();
if (sum
.getAggregationTemporality() == AGGREGATION_TEMPORALITY_UNSPECIFIED) {
return Stream.empty();
}
if (sum
.getAggregationTemporality() == AGGREGATION_TEMPORALITY_DELTA) {
return sum.getDataPointsList().stream().filter(point ->
(point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
}
if (sum.getIsMonotonic()) {
return sum.getDataPointsList().stream().filter(point ->
(point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.map(point -> new Counter(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
} else {
return sum.getDataPointsList().stream().filter(point ->
(point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
}
}
if (metric.hasHistogram()) {
return metric.getHistogram().getDataPointsList().stream().filter(point ->
(point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.map(point -> new Histogram(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.getCount(),
point.getSum(),
buildBuckets(
point.getBucketCountsList(),
point.getExplicitBoundsList()
),
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasExponentialHistogram()) {
return metric.getExponentialHistogram().getDataPointsList().stream().filter(point ->
(point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.map(point -> new Histogram(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.getCount(),
point.getSum(),
buildBucketsFromExponentialHistogram(
point.getPositive().getOffset(),
point.getPositive().getBucketCountsList(),
point.getNegative().getOffset(),
point.getNegative().getBucketCountsList(),
point.getScale()
),
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasSummary()) {
return metric.getSummary().getDataPointsList().stream().filter(point ->
(point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
.map(point -> new Summary(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.getCount(),
point.getSum(),
point.getQuantileValuesList().stream().collect(
toMap(
SummaryDataPoint.ValueAtQuantile::getQuantile,
SummaryDataPoint.ValueAtQuantile::getValue
)),
point.getTimeUnixNano() / 1000000
));
}
throw new UnsupportedOperationException("Unsupported type");
}