public void consume()

in oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java [137:248]


    public void consume(List<ExportData> data) {
        if (CollectionUtils.isNotEmpty(data)) {
            GRPCStreamStatus status = new GRPCStreamStatus();
            StreamObserver<ExportMetricValue> streamObserver =
                exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
                                       .export(
                                           new StreamObserver<ExportResponse>() {
                                               @Override
                                               public void onNext(
                                                   ExportResponse response) {

                                               }

                                               @Override
                                               public void onError(
                                                   Throwable throwable) {
                                                   log.error("Export metrics to {}:{} fails.",
                                                             setting.getGRPCTargetHost(),
                                                             setting.getGRPCTargetPort(), throwable
                                                   );
                                                   status.done();
                                               }

                                               @Override
                                               public void onCompleted() {
                                                   status.done();
                                               }
                                           });
            AtomicInteger exportNum = new AtomicInteger();

            data.forEach(row -> {
                ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();

                Metrics metrics = row.getMetrics();
                if (metrics instanceof LongValueHolder) {
                    long value = ((LongValueHolder) metrics).getValue();
                    MetricValue.Builder valueBuilder = MetricValue.newBuilder();
                    valueBuilder.setLongValue(value);
                    builder.addMetricValues(valueBuilder);
                } else if (metrics instanceof IntValueHolder) {
                    long value = ((IntValueHolder) metrics).getValue();
                    MetricValue.Builder valueBuilder = MetricValue.newBuilder();
                    valueBuilder.setLongValue(value);
                    builder.addMetricValues(valueBuilder);
                } else if (metrics instanceof LabeledValueHolder) {
                    DataTable values = ((LabeledValueHolder) metrics).getValue();
                    values.keys().forEach(key -> {
                        MetricValue.Builder valueBuilder = MetricValue.newBuilder();
                        valueBuilder.setLongValue(values.get(key));
                        DataLabel labels = new DataLabel();
                        labels.put(key);
                        labels.forEach((labelName, LabelValue) -> {
                            KeyValue.Builder kvBuilder = KeyValue.newBuilder();
                            kvBuilder.setKey(labelName);
                            kvBuilder.setValue(LabelValue);
                            valueBuilder.addLabels(kvBuilder);
                        });
                        builder.addMetricValues(valueBuilder);
                    });
                } else {
                    return;
                }

                MetricsMetaInfo meta = row.getMeta();
                builder.setMetricName(meta.getMetricsName());
                builder.setEventType(
                    ExportEvent.EventType.INCREMENT.equals(row.getEventType()) ? EventType.INCREMENT : EventType.TOTAL);
                String entityName = getEntityName(meta);
                if (entityName == null) {
                    return;
                }
                builder.setEntityName(entityName);
                builder.setEntityId(meta.getId());

                builder.setTimeBucket(metrics.getTimeBucket());

                streamObserver.onNext(builder.build());
                exportNum.getAndIncrement();
            });

            streamObserver.onCompleted();

            long sleepTime = 0;
            long cycle = 100L;

            //For memory safe of oap, we must wait for the peer confirmation.
            while (!status.isDone()) {
                try {
                    sleepTime += cycle;
                    Thread.sleep(cycle);
                } catch (InterruptedException e) {
                }

                if (sleepTime > 2000L) {
                    log.warn(
                        "Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(),
                        setting.getGRPCTargetHost(),
                        setting
                            .getGRPCTargetPort(), sleepTime
                    );
                    cycle = 2000L;
                }
            }

            log.debug(
                "Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(),
                setting
                    .getGRPCTargetPort(), sleepTime
            );
        }
        fetchSubscriptionList();
    }