in oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java [390:446]
private void appendEvents(List<Span> spans, List<SpanAttachedEventRecord> events) throws InvalidProtocolBufferException {
if (CollectionUtils.isEmpty(spans) || CollectionUtils.isEmpty(events)) {
return;
}
final List<Tuple2<Integer, Span>> spanWithIndex = IntStream.range(0, spans.size()).mapToObj(i -> Tuple.of(i, spans.get(i))).collect(Collectors.toList());
// sort by start time
events.sort((e1, e2) -> {
final int second = Long.compare(e1.getStartTimeSecond(), e2.getStartTimeSecond());
if (second == 0) {
return Long.compare(e1.getStartTimeNanos(), e2.getStartTimeNanos());
}
return second;
});
final Map<String, List<SpanAttachedEventRecord>> namedEvents = events.stream()
.collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent, Collectors.toList()));
final Map<String, Tuple2<Span.Builder, Integer>> spanCache = new HashMap<>();
for (Map.Entry<String, List<SpanAttachedEventRecord>> namedEntry : namedEvents.entrySet()) {
for (int i = 1; i <= namedEntry.getValue().size(); i++) {
final SpanAttachedEventRecord record = namedEntry.getValue().get(i - 1);
String eventName = record.getEvent() + (namedEntry.getValue().size() == 1 ? "" : "-" + i);
final SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
// find matched span
Tuple2<Span.Builder, Integer> spanBuilder = spanCache.get(record.getTraceSpanId());
if (spanBuilder == null) {
Tuple2<Integer, Span> matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), record.getTraceSpanId())).
findFirst().orElse(null);
if (matchesSpan == null) {
continue;
}
// if the event is server side, then needs to change to the upstream span
final String direction = getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
final String type = getSpanAttachedEventTagValue(event.getTagsList(), "data_type");
if (("request".equals(type) && "inbound".equals(direction)) || ("response".equals(type) && "outbound".equals(direction))) {
final String parentSpanId = matchesSpan._2.id();
matchesSpan = spanWithIndex.stream().filter(s -> Objects.equals(s._2.parentId(), parentSpanId)
&& Objects.equals(s._2.kind(), Span.Kind.SERVER)).findFirst().orElse(matchesSpan);
}
spanBuilder = Tuple.of(matchesSpan._2.toBuilder(), matchesSpan._1);
spanCache.put(record.getTraceSpanId(), spanBuilder);
}
appendEventDebuggable(spanBuilder._1, eventName, event);
}
}
// re-build modified spans
for (Map.Entry<String, Tuple2<Span.Builder, Integer>> entry : spanCache.entrySet()) {
spans.set(entry.getValue()._2, entry.getValue()._1.build());
}
}