in data-prepper-plugins/otel-trace-raw-prepper/src/main/java/com/amazon/dataprepper/plugins/prepper/oteltrace/OTelTraceRawPrepper.java [222:267]
private List<RawSpan> getTracesToFlushByGarbageCollection() {
final List<RawSpan> recordsToFlush = new LinkedList<>();
if (shouldGarbageCollect()) {
final boolean isLockAcquired = traceFlushLock.tryLock();
if (isLockAcquired) {
try {
final long now = System.currentTimeMillis();
lastTraceFlushTime = now;
final Iterator<Map.Entry<String, RawSpanSet>> entryIterator = traceIdRawSpanSetMap.entrySet().iterator();
while (entryIterator.hasNext()) {
final Map.Entry<String, RawSpanSet> entry = entryIterator.next();
final String traceId = entry.getKey();
final TraceGroup traceGroup = traceIdTraceGroupCache.getIfPresent(traceId);
final RawSpanSet rawSpanSet = entry.getValue();
final long traceTime = rawSpanSet.getTimeSeen();
if (now - traceTime >= traceFlushInterval || isShuttingDown) {
final Set<RawSpan> rawSpans = rawSpanSet.getRawSpans();
if (traceGroup != null) {
rawSpans.forEach(rawSpan -> {
rawSpan.setTraceGroup(traceGroup);
recordsToFlush.add(rawSpan);
});
} else {
rawSpans.forEach(rawSpan -> {
recordsToFlush.add(rawSpan);
LOG.warn("Missing trace group for SpanId: {}", rawSpan.getSpanId());
});
}
entryIterator.remove();
}
}
if (recordsToFlush.size() > 0) {
LOG.info("Flushing {} records due to GC", recordsToFlush.size());
}
} finally {
traceFlushLock.unlock();
}
}
}
return recordsToFlush;
}