public StreamObserver streamAccessLogs()

in oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java [104:199]


    public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(
        StreamObserver<StreamAccessLogsResponse> responseObserver, boolean alwaysAnalyzeIdentity) {
        return new StreamObserver<StreamAccessLogsMessage>() {
            private volatile boolean isFirst = true;
            private Role role;
            private StreamAccessLogsMessage.Identifier identifier;

            @Override
            public void onNext(StreamAccessLogsMessage message) {
                HistogramMetrics.Timer timer = histogram.createTimer();
                try {
                    if (isFirst || alwaysAnalyzeIdentity && message.hasIdentifier()) {
                        identifier = message.getIdentifier();
                        isFirst = false;
                        role = Role.NONE;
                        for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
                            role = analysis.identify(identifier, role);
                        }
                    }

                    StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase();

                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug(
                            "Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", identifier
                                .getNode()
                                .getId(), role, logCase, message
                        );
                    }

                    final ServiceMeshMetrics.Builder sourceResult = ServiceMeshMetrics.newBuilder();
                    switch (logCase) {
                        case HTTP_LOGS:
                            final HTTPServiceMeshMetrics.Builder httpMetrics = HTTPServiceMeshMetrics.newBuilder();
                            final StreamAccessLogsMessage.HTTPAccessLogEntries httpLogs = message.getHttpLogs();

                            counter.inc(httpLogs.getLogEntryCount());

                            for (final HTTPAccessLogEntry httpLog : httpLogs.getLogEntryList()) {
                                AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
                                for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
                                    result = analysis.analysis(result, identifier, httpLog, role);
                                }
                                if (result.hasResult()) {
                                    httpMetrics.addAllMetrics(result.getMetrics().getHttpMetrics().getMetricsList());
                                }
                            }
                            sourceResult.setHttpMetrics(httpMetrics);
                            break;
                        case TCP_LOGS:
                            final TCPServiceMeshMetrics.Builder tcpMetrics = TCPServiceMeshMetrics.newBuilder();
                            final StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();

                            counter.inc(tcpLogs.getLogEntryCount());

                            for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
                                AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
                                for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
                                    result = analyzer.analysis(result, identifier, tcpLog, role);
                                }
                                if (result.hasResult()) {
                                    tcpMetrics.addAllMetrics(result.getMetrics().getTcpMetrics().getMetricsList());
                                }
                            }
                            sourceResult.setTcpMetrics(tcpMetrics);
                            break;
                        default: // Ignored
                    }
                    sourceDispatcherCounter.inc(
                        sourceResult.getHttpMetrics().getMetricsCount() +
                            sourceResult.getTcpMetrics().getMetricsCount());
                    TelemetryDataDispatcher.process(sourceResult.build());
                } finally {
                    timer.finish();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                Status status = Status.fromThrowable(throwable);
                if (Status.CANCELLED.getCode() == status.getCode()) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Envoy client cancelled sending access logs", throwable);
                    }
                    return;
                }
                LOGGER.error("Error in receiving access log from envoy", throwable);
            }

            @Override
            public void onCompleted() {
                responseObserver.onNext(StreamAccessLogsResponse.newBuilder().build());
                responseObserver.onCompleted();
            }
        };
    }