private Stream adaptMetrics()

in oap-server/server-receiver-plugin/otel-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/otel/otlp/OpenTelemetryMetricRequestProcessor.java [237:362]


    private Stream<? extends Metric> adaptMetrics(
        final Map<String, String> nodeLabels,
        final io.opentelemetry.proto.metrics.v1.Metric metric) {
        if (metric.hasGauge()) {
            return metric.getGauge().getDataPointsList().stream().filter(point -> 
                (point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
                         .map(point -> new Gauge(
                             metric.getName(),
                             mergeLabels(
                                 nodeLabels,
                                 buildLabels(point.getAttributesList())
                             ),
                             point.hasAsDouble() ? point.getAsDouble()
                                 : point.getAsInt(),
                             point.getTimeUnixNano() / 1000000
                         ));
        }
        if (metric.hasSum()) {
            final Sum sum = metric.getSum();
            if (sum
                .getAggregationTemporality() == AGGREGATION_TEMPORALITY_UNSPECIFIED) {
                return Stream.empty();
            }
            if (sum
                .getAggregationTemporality() == AGGREGATION_TEMPORALITY_DELTA) {
                return sum.getDataPointsList().stream().filter(point -> 
                    (point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
                          .map(point -> new Gauge(
                              metric.getName(),
                              mergeLabels(
                                  nodeLabels,
                                  buildLabels(point.getAttributesList())
                              ),
                              point.hasAsDouble() ? point.getAsDouble()
                                  : point.getAsInt(),
                              point.getTimeUnixNano() / 1000000
                          ));
            }
            if (sum.getIsMonotonic()) {
                return sum.getDataPointsList().stream().filter(point ->
                    (point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
                          .map(point -> new Counter(
                              metric.getName(),
                              mergeLabels(
                                  nodeLabels,
                                  buildLabels(point.getAttributesList())
                              ),
                              point.hasAsDouble() ? point.getAsDouble()
                                  : point.getAsInt(),
                              point.getTimeUnixNano() / 1000000
                          ));
            } else {
                return sum.getDataPointsList().stream().filter(point ->
                    (point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
                          .map(point -> new Gauge(
                              metric.getName(),
                              mergeLabels(
                                  nodeLabels,
                                  buildLabels(point.getAttributesList())
                              ),
                              point.hasAsDouble() ? point.getAsDouble()
                                  : point.getAsInt(),
                              point.getTimeUnixNano() / 1000000
                          ));
            }
        }
        if (metric.hasHistogram()) {
            return metric.getHistogram().getDataPointsList().stream().filter(point ->
                    (point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
                         .map(point -> new Histogram(
                             metric.getName(),
                             mergeLabels(
                                 nodeLabels,
                                 buildLabels(point.getAttributesList())
                             ),
                             point.getCount(),
                             point.getSum(),
                             buildBuckets(
                                 point.getBucketCountsList(),
                                 point.getExplicitBoundsList()
                             ),
                             point.getTimeUnixNano() / 1000000
                         ));
        }
        if (metric.hasExponentialHistogram()) {
            return metric.getExponentialHistogram().getDataPointsList().stream().filter(point ->
                    (point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
                         .map(point -> new Histogram(
                             metric.getName(),
                             mergeLabels(
                                 nodeLabels,
                                 buildLabels(point.getAttributesList())
                             ),
                             point.getCount(),
                             point.getSum(),
                             buildBucketsFromExponentialHistogram(
                                 point.getPositive().getOffset(),
                                 point.getPositive().getBucketCountsList(),
                                 point.getNegative().getOffset(),
                                 point.getNegative().getBucketCountsList(),
                                 point.getScale()
                             ),
                             point.getTimeUnixNano() / 1000000
                         ));
        }
        if (metric.hasSummary()) {
            return metric.getSummary().getDataPointsList().stream().filter(point ->
                    (point.getFlags() & DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE) != DataPointFlags.FLAG_NO_RECORDED_VALUE_VALUE)
                         .map(point -> new Summary(
                             metric.getName(),
                             mergeLabels(
                                 nodeLabels,
                                 buildLabels(point.getAttributesList())
                             ),
                             point.getCount(),
                             point.getSum(),
                             point.getQuantileValuesList().stream().collect(
                                 toMap(
                                     SummaryDataPoint.ValueAtQuantile::getQuantile,
                                     SummaryDataPoint.ValueAtQuantile::getValue
                                 )),
                             point.getTimeUnixNano() / 1000000
                         ));
        }
        throw new UnsupportedOperationException("Unsupported type");
    }