private void appendEvents()

in oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java [390:446]


    private void appendEvents(List<Span> spans, List<SpanAttachedEventRecord> events) throws InvalidProtocolBufferException {
        if (CollectionUtils.isEmpty(spans) || CollectionUtils.isEmpty(events)) {
            return;
        }

        final List<Tuple2<Integer, Span>> spanWithIndex = IntStream.range(0, spans.size()).mapToObj(i -> Tuple.of(i, spans.get(i))).collect(Collectors.toList());

        // sort by start time
        events.sort((e1, e2) -> {
            final int second = Long.compare(e1.getStartTimeSecond(), e2.getStartTimeSecond());
            if (second == 0) {
                return Long.compare(e1.getStartTimeNanos(), e2.getStartTimeNanos());
            }
            return second;
        });

        final Map<String, List<SpanAttachedEventRecord>> namedEvents = events.stream()
            .collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent, Collectors.toList()));

        final Map<String, Tuple2<Span.Builder, Integer>> spanCache = new HashMap<>();
        for (Map.Entry<String, List<SpanAttachedEventRecord>> namedEntry : namedEvents.entrySet()) {
            for (int i = 1; i <= namedEntry.getValue().size(); i++) {
                final SpanAttachedEventRecord record = namedEntry.getValue().get(i - 1);
                String eventName = record.getEvent() + (namedEntry.getValue().size() == 1 ? "" : "-" + i);
                final SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());

                // find matched span
                Tuple2<Span.Builder, Integer> spanBuilder = spanCache.get(record.getTraceSpanId());
                if (spanBuilder == null) {
                    Tuple2<Integer, Span> matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), record.getTraceSpanId())).
                        findFirst().orElse(null);
                    if (matchesSpan == null) {
                        continue;
                    }

                    // if the event is server side, then needs to change to the upstream span
                    final String direction = getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
                    final String type = getSpanAttachedEventTagValue(event.getTagsList(), "data_type");
                    if (("request".equals(type) && "inbound".equals(direction)) || ("response".equals(type) && "outbound".equals(direction))) {
                        final String parentSpanId = matchesSpan._2.id();
                        matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.parentId(), parentSpanId)
                            && Objects.equals(s._2.kind(), Span.Kind.SERVER)).findFirst().orElse(matchesSpan);
                    }

                    spanBuilder = Tuple.of(matchesSpan._2.toBuilder(), matchesSpan._1);
                    spanCache.put(record.getTraceSpanId(), spanBuilder);
                }

                appendEventDebuggable(spanBuilder._1, eventName, event);
            }
        }

        // re-build modified spans
        for (Map.Entry<String, Tuple2<Span.Builder, Integer>> entry : spanCache.entrySet()) {
            spans.set(entry.getValue()._2, entry.getValue()._1.build());
        }
    }