public Logs queryLogs()

in oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBLogQueryDAO.java [66:133]


    public Logs queryLogs(String serviceId, String serviceInstanceId, String endpointId,
                          TraceScopeCondition relatedTrace, Order queryOrder, int from, int limit,
                          Duration duration, List<Tag> tags, List<String> keywordsOfContent,
                          List<String> excludingKeywordsOfContent) throws IOException {
        final boolean isColdStage = duration != null && duration.isColdStage();
        final QueryBuilder<StreamQuery> query = new QueryBuilder<StreamQuery>() {
            @Override
            public void apply(StreamQuery query) {
                if (StringUtil.isNotEmpty(serviceId)) {
                    query.and(eq(AbstractLogRecord.SERVICE_ID, serviceId));
                }

                if (StringUtil.isNotEmpty(serviceInstanceId)) {
                    query.and(eq(AbstractLogRecord.SERVICE_INSTANCE_ID, serviceInstanceId));
                }
                if (StringUtil.isNotEmpty(endpointId)) {
                    query.and(eq(AbstractLogRecord.ENDPOINT_ID, endpointId));
                }
                if (Objects.nonNull(relatedTrace)) {
                    if (StringUtil.isNotEmpty(relatedTrace.getTraceId())) {
                        query.and(eq(AbstractLogRecord.TRACE_ID, relatedTrace.getTraceId()));
                    }
                    if (StringUtil.isNotEmpty(relatedTrace.getSegmentId())) {
                        query.and(eq(AbstractLogRecord.TRACE_SEGMENT_ID, relatedTrace.getSegmentId()));
                    }
                    if (Objects.nonNull(relatedTrace.getSpanId())) {
                        query.and(eq(AbstractLogRecord.SPAN_ID, (long) relatedTrace.getSpanId()));
                    }
                }

                if (CollectionUtils.isNotEmpty(tags)) {
                    List<String> tagsConditions = new ArrayList<>(tags.size());
                    for (final Tag tag : tags) {
                        tagsConditions.add(tag.toString());
                    }
                    query.and(having(LogRecord.TAGS, tagsConditions));
                }
            }
        };

        StreamQueryResponse resp = queryDebuggable(isColdStage, LogRecord.INDEX_NAME, TAGS, getTimestampRange(duration), query);

        Logs logs = new Logs();

        for (final RowEntity rowEntity : resp.getElements()) {
            Log log = new Log();
            log.setServiceId(rowEntity.getTagValue(AbstractLogRecord.SERVICE_ID));
            log.setServiceInstanceId(
                    rowEntity.getTagValue(AbstractLogRecord.SERVICE_INSTANCE_ID));
            log.setEndpointId(
                    rowEntity.getTagValue(AbstractLogRecord.ENDPOINT_ID));
            if (log.getEndpointId() != null) {
                log.setEndpointName(
                        IDManager.EndpointID.analysisId(log.getEndpointId()).getEndpointName());
            }
            log.setTraceId(rowEntity.getTagValue(AbstractLogRecord.TRACE_ID));
            log.setTimestamp(((Number) rowEntity.getTagValue(AbstractLogRecord.TIMESTAMP)).longValue());
            log.setContentType(ContentType.instanceOf(
                    ((Number) rowEntity.getTagValue(AbstractLogRecord.CONTENT_TYPE)).intValue()));
            log.setContent(rowEntity.getTagValue(AbstractLogRecord.CONTENT));
            byte[] dataBinary = rowEntity.getTagValue(AbstractLogRecord.TAGS_RAW_DATA);
            if (dataBinary != null && dataBinary.length > 0) {
                parserDataBinary(dataBinary, log.getTags());
            }
            logs.getLogs().add(log);
        }
        return logs;
    }