in apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/meter/MeterSender.java [64:111]
public void send(Map<MeterId, BaseMeter> meterMap, MeterService meterService) {
if (status == GRPCChannelStatus.CONNECTED) {
StreamObserver<MeterData> reportStreamObserver = null;
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
try {
reportStreamObserver = meterReportServiceStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
}
@Override
public void onError(Throwable throwable) {
status.finished();
if (LOGGER.isErrorEnable()) {
LOGGER.error(throwable, "Send meters to collector fail with a grpc internal exception.");
}
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
final StreamObserver<MeterData> reporter = reportStreamObserver;
transform(meterMap, meterData -> reporter.onNext(meterData));
} catch (Throwable e) {
if (!(e instanceof StatusRuntimeException)) {
LOGGER.error(e, "Report meters to backend fail.");
return;
}
final StatusRuntimeException statusRuntimeException = (StatusRuntimeException) e;
if (statusRuntimeException.getStatus().getCode() == Status.Code.UNIMPLEMENTED) {
LOGGER.warn("Backend doesn't support meter, it will be disabled");
meterService.shutdown();
}
} finally {
if (reportStreamObserver != null) {
reportStreamObserver.onCompleted();
}
status.wait4Finish();
}
}
}