in oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java [341:387]
private void appendAttachedEventsToSpan(List<Span> spans, List<SpanAttachedEventRecord> events) throws InvalidProtocolBufferException {
if (CollectionUtils.isEmpty(events)) {
return;
}
// sort by start time
events.sort((e1, e2) -> {
final int second = Long.compare(e1.getStartTimeSecond(), e2.getStartTimeSecond());
if (second == 0) {
return Long.compare(e1.getStartTimeNanos(), e2.getStartTimeNanos());
}
return second;
});
final HashMap<String, Span> spanMatcher = new HashMap<>();
for (SpanAttachedEventRecord record : events) {
if (!StringUtils.isNumeric(record.getTraceSpanId())) {
continue;
}
SpanAttachedEvent event = SpanAttachedEvent.parseFrom(record.getDataBinary());
final String spanMatcherKey = record.getTraceSegmentId() + "_" + record.getTraceSpanId();
Span span = spanMatcher.get(spanMatcherKey);
if (span == null) {
// find the matches span
final int eventSpanId = Integer.parseInt(record.getTraceSpanId());
span = spans.stream().filter(s -> Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
(s.getSpanId() == eventSpanId)).findFirst().orElse(null);
if (span == null) {
continue;
}
// if the event is server side, then needs to change to the upstream span
final String direction = getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
final String type = getSpanAttachedEventTagValue(event.getTagsList(), "data_type");
if (("request".equals(type) && "inbound".equals(direction)) || ("response".equals(type) && "outbound".equals(direction))) {
final String parentSpanId = span.getSegmentSpanId();
span = spans.stream().filter(s -> s.getSegmentParentSpanId().equals(parentSpanId)
&& Objects.equals(s.getType(), SpanType.Entry.name())).findFirst().orElse(span);
}
spanMatcher.put(spanMatcherKey, span);
}
span.getAttachedEvents().add(parseEvent(event));
}
}