in runtime/src/main/java/com/google/cloud/verticals/foundations/dataharmonization/diagnostics/NanoFunctionCallInstrument.java [121:178]
public synchronized Table<String, String, FunctionData> export() {
Map<Long, Collection<long[]>> rowsByThreadId =
data.stream()
.collect(
Collectors.groupingBy(
NanoFunctionCallInstrument::threadID, toCollection(ArrayList::new)));
Map<Long, FunctionData> samples = new HashMap<>();
// Treat each thread separately.
for (Collection<long[]> thread : rowsByThreadId.values()) {
Deque<long[]> callStack = new ArrayDeque<>();
Deque<Duration> dependantTimes = new ArrayDeque<>();
// Each sample is either the start or end of the call.
for (long[] row : thread) {
if (isStart(row)) {
callStack.push(row);
dependantTimes.push(Duration.ZERO);
continue;
}
// Only process anything when the call ends.
// The top of the callStack has the info of when the call started.
// The top of dependantTimes stack has the sum of all the times of all the calls that were
// above us in the stack.
long[] popRow = callStack.pop();
Duration depTime = dependantTimes.pop();
if (funcId(popRow) != funcId(row)) {
throw new VerifyException(
String.format("Stack out of sync - expected %d got %d", funcId(row), funcId(popRow)));
}
Duration duration = time(row).minus(time(popRow));
Duration selfTime = duration.minus(depTime);
if (!dependantTimes.isEmpty()) {
dependantTimes.push(dependantTimes.pop().plus(duration));
}
// We want caller -> callee pairs to help narrow down performance issues.
long parent = callStack.isEmpty() ? -1 : funcId(callStack.peek());
long stackPairId = encode(parent, funcId(row));
if (!samples.containsKey(stackPairId)) {
samples.put(stackPairId, new FunctionData(selfTime, duration));
} else {
samples.get(stackPairId).add(selfTime, duration);
}
}
}
// Convert the ids into function names.
return samples.entrySet().stream()
.map(e -> new AbstractMap.SimpleEntry<>(decode(e.getKey()), e.getValue()))
.collect(toImmutableTable(e -> e.getKey()[0], e -> e.getKey()[1], Entry::getValue));
}