in oap-server/server-query-plugin/logql-plugin/src/main/java/org/apache/skywalking/oap/query/logql/handler/LogQLApiHandler.java [119:216]
public HttpResponse rangeQuery(
@Param("start") Long start,
@Param("end") Long end,
@Param("query") String query,
@Param("limit") Integer limit,
@Param("direction") LogDirection direction) throws IOException {
LogRangeQueryRsp logRangeQueryRsp = new LogRangeQueryRsp();
logRangeQueryRsp.setStatus(ResultStatus.SUCCESS);
LogQLLexer lexer = new LogQLLexer(CharStreams.fromString(query));
lexer.addErrorListener(new ParseErrorListener());
LogQLParser parser = new LogQLParser(new CommonTokenStream(lexer));
parser.addErrorListener(new ParseErrorListener());
ParseTree tree;
try {
tree = parser.root();
} catch (ParseCancellationException e) {
return badResponse(e.getMessage());
}
LogQLExprVisitor visitor = new LogQLExprVisitor();
LogQLParseResult parseResult = visitor.visit(tree);
Map<String, String> labelMap = parseResult.getLabelMap();
String serviceId = labelMap.containsKey(LabelName.SERVICE.getLabel()) ?
IDManager.ServiceID.buildId(labelMap.get(LabelName.SERVICE.getLabel()), true) : null;
String serviceInstanceId = null, endpointId = null;
if (StringUtil.isNotEmpty(serviceId)) {
serviceInstanceId = labelMap.containsKey(LabelName.SERVICE_INSTANCE.getLabel()) ?
IDManager.ServiceInstanceID.buildId(
serviceId, labelMap.get(LabelName.SERVICE_INSTANCE.getLabel())) : null;
endpointId = labelMap.containsKey(LabelName.ENDPOINT.getLabel()) ?
IDManager.EndpointID.buildId(serviceId, labelMap.get(LabelName.ENDPOINT.getLabel())) : null;
}
String traceId = labelMap.get(LabelName.TRACE_ID.getLabel());
TraceScopeCondition traceScopeCondition = new TraceScopeCondition();
if (StringUtil.isNotEmpty(traceId)) {
traceScopeCondition.setTraceId(traceId);
}
List<Tag> tags = labelMap.entrySet().stream()
// labels in stream selector all belongs to log tag except labels define in LabelName
.filter(entry -> !LabelName.containsLabel(entry.getKey()))
.map(entry -> new Tag(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());
Duration duration = DurationUtils.timestamp2Duration(nano2Millis(start), nano2Millis(end));
Logs logs = logQueryService.queryLogs(
serviceId,
serviceInstanceId,
endpointId,
traceScopeCondition,
new Pagination(1, limit),
direction.getOrder(),
duration,
tags,
parseResult.getKeywordsOfContent(),
parseResult.getExcludingKeywordsOfContent()
);
if (StringUtil.isNotEmpty(logs.getErrorReason())) {
return badResponse(logs.getErrorReason());
}
final StreamLog responseData = new StreamLog();
responseData.setResultType(ResultType.STREAMS);
logRangeQueryRsp.setData(responseData);
logs.getLogs().stream()
.collect(
Collectors.groupingBy(
log -> log.getServiceId() + log.getServiceInstanceId() + log.getEndpointName() + log.getTraceId()))
.forEach((streamKey, logList) -> {
StreamLog.Result result = new StreamLog.Result();
Map<String, String> labels = new HashMap<>();
labels.put(LabelName.SERVICE.getLabel(), logList.get(0).getServiceName());
labels.put(LabelName.SERVICE_INSTANCE.getLabel(), logList.get(0).getServiceInstanceName());
labels.put(LabelName.ENDPOINT.getLabel(), logList.get(0).getEndpointName());
labels.put(LabelName.TRACE_ID.getLabel(), logList.get(0).getTraceId());
result.setStream(labels);
List<TimeValuePair> timeValuePairs = new ArrayList<>();
for (final Log log : logList) {
timeValuePairs.add(new TimeValuePair(
String.valueOf(millis2Nano(log.getTimestamp())),
log.getContent()
));
}
result.setValues(timeValuePairs);
responseData.getResult().add(result);
});
return successResponse(logRangeQueryRsp);
}