in agents/agents-features/agents-features-trace/src/commonMain/kotlin/ai/koog/agents/features/tracing/feature/Tracing.kt [109:438]
override fun install(
config: TraceFeatureConfig,
pipeline: AIAgentGraphPipeline,
): Tracing {
logger.info { "Start installing feature: ${Tracing::class.simpleName}" }
if (config.messageProcessors.isEmpty()) {
logger.warn {
"Tracing Feature. No feature out stream providers are defined. Trace streaming has no target."
}
}
val tracing = Tracing()
//region Intercept Agent Events
pipeline.interceptAgentStarting(this) intercept@{ eventContext ->
val event = AgentStartingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
agentId = eventContext.agent.id,
runId = eventContext.runId,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptAgentCompleted(this) intercept@{ eventContext ->
val event = AgentCompletedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
agentId = eventContext.agentId,
runId = eventContext.runId,
result = eventContext.result?.toString(),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptAgentExecutionFailed(this) intercept@{ eventContext ->
val event = AgentExecutionFailedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
agentId = eventContext.agentId,
runId = eventContext.runId,
error = eventContext.throwable.toAgentError(),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptAgentClosing(this) intercept@{ eventContext ->
val event = AgentClosingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
agentId = eventContext.agentId,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
//endregion Intercept Agent Events
//region Intercept Strategy Events
pipeline.interceptStrategyStarting(this) intercept@{ eventContext ->
val strategy = eventContext.strategy as AIAgentGraphStrategy
@OptIn(InternalAgentsApi::class)
val event = GraphStrategyStartingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
strategyName = eventContext.strategy.name,
graph = strategy.startNodeToGraph(),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptStrategyCompleted(this) intercept@{ eventContext ->
val event = StrategyCompletedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
strategyName = eventContext.strategy.name,
result = eventContext.result?.toString(),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
//endregion Intercept Strategy Events
//region Intercept Node Events
pipeline.interceptNodeExecutionStarting(this) intercept@{ eventContext ->
val event = NodeExecutionStartingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
nodeName = eventContext.node.name,
input = nodeDataToJsonElement(eventContext.input, eventContext.inputType),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptNodeExecutionCompleted(this) intercept@{ eventContext ->
val event = NodeExecutionCompletedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
nodeName = eventContext.node.name,
input = nodeDataToJsonElement(eventContext.input, eventContext.inputType),
output = nodeDataToJsonElement(eventContext.output, eventContext.outputType),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptNodeExecutionFailed(this) intercept@{ eventContext ->
val event = NodeExecutionFailedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
nodeName = eventContext.node.name,
input = nodeDataToJsonElement(eventContext.input, eventContext.inputType),
error = eventContext.throwable.toAgentError(),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
//endregion Intercept Node Events
//region Intercept Subgraph Events
pipeline.interceptSubgraphExecutionStarting(this) intercept@{ eventContext ->
val event = SubgraphExecutionStartingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
subgraphName = eventContext.subgraph.name,
input = nodeDataToJsonElement(eventContext.input, eventContext.inputType),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptSubgraphExecutionCompleted(this) intercept@{ eventContext ->
val event = SubgraphExecutionCompletedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
subgraphName = eventContext.subgraph.name,
input = nodeDataToJsonElement(eventContext.input, eventContext.inputType),
output = nodeDataToJsonElement(eventContext.output, eventContext.outputType),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptSubgraphExecutionFailed(this) intercept@{ eventContext ->
val event = SubgraphExecutionFailedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.context.runId,
subgraphName = eventContext.subgraph.name,
input = nodeDataToJsonElement(eventContext.input, eventContext.inputType),
error = eventContext.throwable.toAgentError(),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
//endregion Intercept Subgraph Events
//region Intercept LLM Call Events
pipeline.interceptLLMCallStarting(this) intercept@{ eventContext ->
val event = LLMCallStartingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
prompt = eventContext.prompt,
model = eventContext.model.toModelInfo(),
tools = eventContext.tools.map { it.name },
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptLLMCallCompleted(this) intercept@{ eventContext ->
val event = LLMCallCompletedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
prompt = eventContext.prompt,
model = eventContext.model.toModelInfo(),
responses = eventContext.responses,
moderationResponse = eventContext.moderationResponse,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
//endregion Intercept LLM Call Events
//region Intercept LLM Streaming Events
pipeline.interceptLLMStreamingStarting(this) intercept@{ eventContext ->
val event = LLMStreamingStartingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
prompt = eventContext.prompt,
model = eventContext.model.toModelInfo(),
tools = eventContext.tools.map { it.name },
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptLLMStreamingCompleted(this) intercept@{ eventContext ->
val event = LLMStreamingCompletedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
prompt = eventContext.prompt,
model = eventContext.model.toModelInfo(),
tools = eventContext.tools.map { it.name },
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptLLMStreamingFrameReceived(this) intercept@{ eventContext ->
val event = LLMStreamingFrameReceivedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
prompt = eventContext.prompt,
model = eventContext.model.toModelInfo(),
frame = eventContext.streamFrame,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptLLMStreamingFailed(this) intercept@{ eventContext ->
val event = LLMStreamingFailedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
prompt = eventContext.prompt,
model = eventContext.model.toModelInfo(),
error = eventContext.error.toAgentError(),
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
//endregion Intercept LLM Streaming Events
//region Intercept Tool Call Events
pipeline.interceptToolCallStarting(this) intercept@{ eventContext ->
val event = ToolCallStartingEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
toolCallId = eventContext.toolCallId,
toolName = eventContext.toolName,
toolArgs = eventContext.toolArgs,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptToolValidationFailed(this) intercept@{ eventContext ->
val event = ToolValidationFailedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
toolCallId = eventContext.toolCallId,
toolName = eventContext.toolName,
toolArgs = eventContext.toolArgs,
toolDescription = eventContext.toolDescription,
message = eventContext.message,
error = eventContext.error,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptToolCallFailed(this) intercept@{ eventContext ->
val event = ToolCallFailedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
toolCallId = eventContext.toolCallId,
toolName = eventContext.toolName,
toolArgs = eventContext.toolArgs,
toolDescription = eventContext.toolDescription,
error = eventContext.error,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
pipeline.interceptToolCallCompleted(this) intercept@{ eventContext ->
val event = ToolCallCompletedEvent(
eventId = eventContext.eventId,
executionInfo = eventContext.executionInfo,
runId = eventContext.runId,
toolCallId = eventContext.toolCallId,
toolName = eventContext.toolName,
toolArgs = eventContext.toolArgs,
toolDescription = eventContext.toolDescription,
result = eventContext.toolResult,
timestamp = pipeline.clock.now().toEpochMilliseconds()
)
processMessage(config, event)
}
//endregion Intercept Tool Call Events
return tracing
}