in oap-server/server-receiver-plugin/envoy-metrics-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/envoy/AccessLogServiceGRPCHandler.java [104:199]
public StreamObserver<StreamAccessLogsMessage> streamAccessLogs(
StreamObserver<StreamAccessLogsResponse> responseObserver, boolean alwaysAnalyzeIdentity) {
return new StreamObserver<StreamAccessLogsMessage>() {
private volatile boolean isFirst = true;
private Role role;
private StreamAccessLogsMessage.Identifier identifier;
@Override
public void onNext(StreamAccessLogsMessage message) {
HistogramMetrics.Timer timer = histogram.createTimer();
try {
if (isFirst || alwaysAnalyzeIdentity && message.hasIdentifier()) {
identifier = message.getIdentifier();
isFirst = false;
role = Role.NONE;
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
role = analysis.identify(identifier, role);
}
}
StreamAccessLogsMessage.LogEntriesCase logCase = message.getLogEntriesCase();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Messaged is identified from Envoy[{}], role[{}] in [{}]. Received msg {}", identifier
.getNode()
.getId(), role, logCase, message
);
}
final ServiceMeshMetrics.Builder sourceResult = ServiceMeshMetrics.newBuilder();
switch (logCase) {
case HTTP_LOGS:
final HTTPServiceMeshMetrics.Builder httpMetrics = HTTPServiceMeshMetrics.newBuilder();
final StreamAccessLogsMessage.HTTPAccessLogEntries httpLogs = message.getHttpLogs();
counter.inc(httpLogs.getLogEntryCount());
for (final HTTPAccessLogEntry httpLog : httpLogs.getLogEntryList()) {
AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
for (ALSHTTPAnalysis analysis : envoyHTTPAnalysisList) {
result = analysis.analysis(result, identifier, httpLog, role);
}
if (result.hasResult()) {
httpMetrics.addAllMetrics(result.getMetrics().getHttpMetrics().getMetricsList());
}
}
sourceResult.setHttpMetrics(httpMetrics);
break;
case TCP_LOGS:
final TCPServiceMeshMetrics.Builder tcpMetrics = TCPServiceMeshMetrics.newBuilder();
final StreamAccessLogsMessage.TCPAccessLogEntries tcpLogs = message.getTcpLogs();
counter.inc(tcpLogs.getLogEntryCount());
for (final TCPAccessLogEntry tcpLog : tcpLogs.getLogEntryList()) {
AccessLogAnalyzer.Result result = AccessLogAnalyzer.Result.builder().build();
for (TCPAccessLogAnalyzer analyzer : envoyTCPAnalysisList) {
result = analyzer.analysis(result, identifier, tcpLog, role);
}
if (result.hasResult()) {
tcpMetrics.addAllMetrics(result.getMetrics().getTcpMetrics().getMetricsList());
}
}
sourceResult.setTcpMetrics(tcpMetrics);
break;
default: // Ignored
}
sourceDispatcherCounter.inc(
sourceResult.getHttpMetrics().getMetricsCount() +
sourceResult.getTcpMetrics().getMetricsCount());
TelemetryDataDispatcher.process(sourceResult.build());
} finally {
timer.finish();
}
}
@Override
public void onError(Throwable throwable) {
Status status = Status.fromThrowable(throwable);
if (Status.CANCELLED.getCode() == status.getCode()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Envoy client cancelled sending access logs", throwable);
}
return;
}
LOGGER.error("Error in receiving access log from envoy", throwable);
}
@Override
public void onCompleted() {
responseObserver.onNext(StreamAccessLogsResponse.newBuilder().build());
responseObserver.onCompleted();
}
};
}