public void consume()

in apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/LogReportServiceClient.java [86:134]


    public void consume(final List<LogData.Builder> dataList) {
        if (CollectionUtil.isEmpty(dataList)) {
            return;
        }

        if (GRPCChannelStatus.CONNECTED.equals(status)) {
            GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false);

            StreamObserver<LogData> logDataStreamObserver = logReportServiceStub
                .withDeadlineAfter(Collector.GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                .collect(
                    new StreamObserver<Commands>() {
                        @Override
                        public void onNext(final Commands commands) {

                        }

                        @Override
                        public void onError(final Throwable throwable) {
                            status.finished();
                            LOGGER.error(throwable, "Try to send {} log data to collector, with unexpected exception.",
                                         dataList.size()
                            );
                            ServiceManager.INSTANCE
                                .findService(GRPCChannelManager.class)
                                .reportError(throwable);
                        }

                        @Override
                        public void onCompleted() {
                            status.finished();
                        }
                    });

            boolean isFirst = true;
            for (final LogData.Builder logData : dataList) {
                if (isFirst) {
                    // Only set service name of the first element in one stream
                    // https://github.com/apache/skywalking-data-collect-protocol/blob/master/logging/Logging.proto
                    // Log collecting protocol defines LogData#service is required in the first element only.
                    logData.setService(Config.Agent.SERVICE_NAME);
                    isFirst = false;
                }
                logDataStreamObserver.onNext(logData.build());
            }
            logDataStreamObserver.onCompleted();
            status.wait4Finish();
        }
    }