in apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java [89:140]
public void consume(List<TraceSegment> data) {
if (CONNECTED.equals(status)) {
final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);
StreamObserver<SegmentObject> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(
Config.Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
).collect(new StreamObserver<Commands>() {
@Override
public void onNext(Commands commands) {
ServiceManager.INSTANCE.findService(CommandService.class)
.receiveCommand(commands);
}
@Override
public void onError(
Throwable throwable) {
status.finished();
if (LOGGER.isErrorEnable()) {
LOGGER.error(
throwable,
"Send UpstreamSegment to collector fail with a grpc internal exception."
);
}
ServiceManager.INSTANCE
.findService(GRPCChannelManager.class)
.reportError(throwable);
}
@Override
public void onCompleted() {
status.finished();
}
});
try {
for (TraceSegment segment : data) {
SegmentObject upstreamSegment = segment.transform();
upstreamSegmentStreamObserver.onNext(upstreamSegment);
}
} catch (Throwable t) {
LOGGER.error(t, "Transform and send UpstreamSegment to collector fail.");
}
upstreamSegmentStreamObserver.onCompleted();
status.wait4Finish();
segmentUplinkedCounter += data.size();
} else {
segmentAbandonedCounter += data.size();
}
printUplinkStatus();
}