in oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCMetricsExporter.java [137:248]
public void consume(List<ExportData> data) {
if (CollectionUtils.isNotEmpty(data)) {
GRPCStreamStatus status = new GRPCStreamStatus();
StreamObserver<ExportMetricValue> streamObserver =
exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS)
.export(
new StreamObserver<ExportResponse>() {
@Override
public void onNext(
ExportResponse response) {
}
@Override
public void onError(
Throwable throwable) {
log.error("Export metrics to {}:{} fails.",
setting.getGRPCTargetHost(),
setting.getGRPCTargetPort(), throwable
);
status.done();
}
@Override
public void onCompleted() {
status.done();
}
});
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
Metrics metrics = row.getMetrics();
if (metrics instanceof LongValueHolder) {
long value = ((LongValueHolder) metrics).getValue();
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(value);
builder.addMetricValues(valueBuilder);
} else if (metrics instanceof IntValueHolder) {
long value = ((IntValueHolder) metrics).getValue();
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(value);
builder.addMetricValues(valueBuilder);
} else if (metrics instanceof LabeledValueHolder) {
DataTable values = ((LabeledValueHolder) metrics).getValue();
values.keys().forEach(key -> {
MetricValue.Builder valueBuilder = MetricValue.newBuilder();
valueBuilder.setLongValue(values.get(key));
DataLabel labels = new DataLabel();
labels.put(key);
labels.forEach((labelName, LabelValue) -> {
KeyValue.Builder kvBuilder = KeyValue.newBuilder();
kvBuilder.setKey(labelName);
kvBuilder.setValue(LabelValue);
valueBuilder.addLabels(kvBuilder);
});
builder.addMetricValues(valueBuilder);
});
} else {
return;
}
MetricsMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getMetricsName());
builder.setEventType(
ExportEvent.EventType.INCREMENT.equals(row.getEventType()) ? EventType.INCREMENT : EventType.TOTAL);
String entityName = getEntityName(meta);
if (entityName == null) {
return;
}
builder.setEntityName(entityName);
builder.setEntityId(meta.getId());
builder.setTimeBucket(metrics.getTimeBucket());
streamObserver.onNext(builder.build());
exportNum.getAndIncrement();
});
streamObserver.onCompleted();
long sleepTime = 0;
long cycle = 100L;
//For memory safe of oap, we must wait for the peer confirmation.
while (!status.isDone()) {
try {
sleepTime += cycle;
Thread.sleep(cycle);
} catch (InterruptedException e) {
}
if (sleepTime > 2000L) {
log.warn(
"Export {} metrics to {}:{}, wait {} milliseconds.", exportNum.get(),
setting.getGRPCTargetHost(),
setting
.getGRPCTargetPort(), sleepTime
);
cycle = 2000L;
}
}
log.debug(
"Exported {} metrics to {}:{} in {} milliseconds.", exportNum.get(), setting.getGRPCTargetHost(),
setting
.getGRPCTargetPort(), sleepTime
);
}
fetchSubscriptionList();
}