in oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java [66:133]
public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId,
TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit,
Duration duration, List<Tag> tags, List<String> keywordsOfContent,
List<String> excludingKeywordsOfContent) throws IOException {
final boolean isColdStage = duration != null && duration.isColdStage();
final QueryBuilder<StreamQuery> query = new QueryBuilder<StreamQuery>() {
@Override
public void apply(StreamQuery query) {
if (StringUtil.isNotEmpty(serviceId)) {
query.and(eq(AbstractLogRecord.SERVICE_ID, serviceId));
}
if (StringUtil.isNotEmpty(serviceInstanceId)) {
query.and(eq(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
}
if (StringUtil.isNotEmpty(endpointId)) {
query.and(eq(AbstractLogRecord.ENDPOINT_ID, endpointId));
}
if (Objects.nonNull(relatedTrace)) {
if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
query.and(eq(AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
}
if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
query.and(eq(AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
}
if (Objects.nonNull(relatedTrace.getSpanId())) {
query.and(eq(AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
}
}
if (CollectionUtils.isNotEmpty(tags)) {
List<String> tagsConditions = new ArrayList<>(tags.size());
for (final Tag tag : tags) {
tagsConditions.add(tag.toString());
}
query.and(having(LogRecord.TAGS, tagsConditions));
}
}
};
StreamQueryResponse resp = queryDebuggable(isColdStage, LogRecord.INDEX_NAME, TAGS, getTimestampRange(duration), query);
Logs logs = new Logs();
for (final RowEntity rowEntity : resp.getElements()) {
Log log = new Log();
log.setServiceId(rowEntity.getTagValue(AbstractLogRecord.SERVICE_ID));
log.setServiceInstanceId(
rowEntity.getTagValue(AbstractLogRecord.SERVICE_INSTANCE_ID));
log.setEndpointId(
rowEntity.getTagValue(AbstractLogRecord.ENDPOINT_ID));
if (log.getEndpointId() != null) {
log.setEndpointName(
IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName());
}
log.setTraceId(rowEntity.getTagValue(AbstractLogRecord.TRACE_ID));
log.setTimestamp(((Number) rowEntity.getTagValue(AbstractLogRecord.TIMESTAMP)).longValue());
log.setContentType(ContentType.instanceOf(
((Number) rowEntity.getTagValue(AbstractLogRecord.CONTENT_TYPE)).intValue()));
log.setContent(rowEntity.getTagValue(AbstractLogRecord.CONTENT));
byte[] dataBinary = rowEntity.getTagValue(AbstractLogRecord.TAGS_RAW_DATA);
if (dataBinary != null && dataBinary.length > 0) {
parserDataBinary(dataBinary, log.getTags());
}
logs.getLogs().add(log);
}
return logs;
}