in apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java [86:134]
public void consume(final List<LogData.Builder> dataList) {
if (CollectionUtil.isEmpty(dataList)) {
return;
}
if (GRPCChannelStatus.CONNECTED.equals(status)) {
GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<LogData> logDataStreamObserver = logReportServiceStub
.withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
.collect(
new StreamObserver<Commands>() {
@Override
public void onNext(final Commands commands) {
}
@Override
public void onError(final Throwable throwable) {
status.finished();
LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
dataList.size()
);
ServiceManager.INSTANCE
.findService(GRPCChannelManager.class)
.reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
boolean isFirst = true;
for (final LogData.Builder logData : dataList) {
if (isFirst) {
// Only set service name of the first element in one stream
// https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
// Log collecting protocol defines LogData#service is required in the first element only.
logData.setService(Config.Agent.SERVICE_NAME);
isFirst = false;
}
logDataStreamObserver.onNext(logData.build());
}
logDataStreamObserver.onCompleted();
status.wait4Finish();
}
}