in agents/agents-features/agents-features-opentelemetry/src/jvmMain/kotlin/ai/koog/agents/features/opentelemetry/feature/OpenTelemetry.kt [59:697]
override fun install(
config: OpenTelemetryConfig,
pipeline: AIAgentGraphPipeline,
): OpenTelemetry {
val openTelemetry = OpenTelemetry()
val spanCollector = SpanCollector()
val spanAdapter = config.spanAdapter
val tracer = config.tracer
//region Agent
pipeline.interceptAgentStarting(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry before agent started handler" }
val messages = eventContext.agent.agentConfig.prompt.messages.toList()
val tools = (eventContext.agent as? GraphAIAgent<*, *>)?.toolRegistry?.tools?.map { it.descriptor }?.toList()
?: emptyList()
// Create CreateAgentSpan
val createAgentSpan = startCreateAgentSpan(
tracer = tracer,
parentSpan = null,
id = eventContext.eventId,
model = eventContext.agent.agentConfig.model,
agentId = eventContext.context.agentId,
messages = messages
)
spanAdapter?.onBeforeSpanStarted(createAgentSpan)
spanCollector.collectSpan(
span = createAgentSpan,
path = eventContext.executionInfo
)
// Create InvokeAgentSpan
val invokeAgentSpan = startInvokeAgentSpan(
tracer = tracer,
parentSpan = createAgentSpan,
id = eventContext.runId,
model = eventContext.agent.agentConfig.model,
agentId = eventContext.agent.id,
runId = eventContext.runId,
llmParams = eventContext.agent.agentConfig.prompt.params,
messages = messages,
tools = tools
)
spanAdapter?.onBeforeSpanStarted(invokeAgentSpan)
// Patch the agent execution info to include runId in the path.
// This is required to create a path structure that matches the span structure in the OTel feature.
spanCollector.collectSpan(
span = invokeAgentSpan,
path = eventContext.executionInfo.appendRunId(eventContext.runId)
)
}
pipeline.interceptAgentCompleted(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry agent finished handler" }
// Find parent span - InvokeAgentSpan
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.runId)
val invokeAgentSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.runId,
spanType = SpanType.INVOKE_AGENT
) ?: return@intercept
spanAdapter?.onBeforeSpanFinished(invokeAgentSpan)
endInvokeAgentSpan(
span = invokeAgentSpan,
messages = eventContext.context.config.prompt.messages.toList(),
model = eventContext.context.config.model,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = invokeAgentSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptAgentExecutionFailed(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry agent run error handler" }
// Stop all unfinished spans, except InvokeAgentSpan and AgentCreateSpan
endUnfinishedSpans(spanCollector, config.isVerbose) { span ->
span.type != SpanType.CREATE_AGENT &&
span.type != SpanType.INVOKE_AGENT &&
span.id != eventContext.eventId
}
// Finish current InvokeAgentSpan
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.runId)
val invokeAgentSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.runId,
spanType = SpanType.INVOKE_AGENT
) ?: return@intercept
invokeAgentSpan.addAttribute(
attribute = SpanAttributes.Response.FinishReasons(
listOf(SpanAttributes.Response.FinishReasonType.Error)
)
)
spanAdapter?.onBeforeSpanFinished(invokeAgentSpan)
endInvokeAgentSpan(
span = invokeAgentSpan,
messages = eventContext.context.config.prompt.messages.toList(),
model = eventContext.context.config.model,
error = eventContext.throwable,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = invokeAgentSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptAgentClosing(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry before agent closed handler" }
// Stop all unfinished spans, except the AgentCreateSpan
endUnfinishedSpans(spanCollector, config.isVerbose) { span ->
span.type != SpanType.CREATE_AGENT
}
// Stop agent create span
val agentSpan = spanCollector.getStartedSpan(
executionInfo = eventContext.executionInfo,
eventId = eventContext.eventId,
spanType = SpanType.CREATE_AGENT
) ?: return@intercept
spanAdapter?.onBeforeSpanFinished(agentSpan)
endCreateAgentSpan(
span = agentSpan,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = agentSpan,
path = eventContext.executionInfo
)
// Just in case we miss some spans, stop them as well
if (spanCollector.activeSpansCount > 0) {
logger.warn { "Found <${spanCollector.activeSpansCount}> active span(s) after agent closing. Stopping them." }
endUnfinishedSpans(spanCollector, config.isVerbose)
}
}
//endregion Agent
//region Strategy
pipeline.interceptStrategyStarting(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry before subgraph handler" }
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val parentSpan = spanCollector.getParentSpanForEvent(
executionInfo = patchedExecutionInfo,
) ?: return@intercept
val strategySpan = startStrategySpan(
tracer = tracer,
parentSpan = parentSpan,
id = eventContext.eventId,
runId = eventContext.context.runId,
strategyName = eventContext.strategy.name
)
spanAdapter?.onBeforeSpanStarted(strategySpan)
spanCollector.collectSpan(
span = strategySpan,
path = patchedExecutionInfo
)
}
pipeline.interceptStrategyCompleted(this) intercept@{ eventContext ->
// Find current Strategy Span
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val strategySpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.STRATEGY
) ?: return@intercept
spanAdapter?.onBeforeSpanFinished(strategySpan)
endStrategySpan(span = strategySpan, verbose = config.isVerbose)
spanCollector.removeSpan(
span = strategySpan,
path = patchedExecutionInfo
)
}
//endregion Strategy
//region Node
pipeline.interceptNodeExecutionStarting(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry node starting handler" }
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val parentSpan = spanCollector.getParentSpanForEvent(
executionInfo = patchedExecutionInfo,
) ?: run {
logger.warn { "Failed to find parent span for node: ${eventContext.node.id}. Path: ${patchedExecutionInfo.path()}" }
return@intercept
}
val nodeInput = nodeDataToString(eventContext.input, eventContext.inputType)
val nodeExecuteSpan = startNodeExecuteSpan(
tracer = tracer,
parentSpan = parentSpan,
id = eventContext.eventId,
runId = eventContext.context.runId,
nodeId = eventContext.node.id,
nodeInput = nodeInput
)
spanAdapter?.onBeforeSpanStarted(nodeExecuteSpan)
spanCollector.collectSpan(
span = nodeExecuteSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptNodeExecutionCompleted(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry node completed handler" }
// Find existing span (Node Execute Span)
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val nodeExecuteSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.NODE
) ?: return@intercept
val nodeOutput = nodeDataToString(eventContext.output, eventContext.outputType)
spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan)
endNodeExecuteSpan(
span = nodeExecuteSpan,
nodeOutput = nodeOutput,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = nodeExecuteSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptNodeExecutionFailed(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry node execution failed handler" }
// Find existing span (Node Execute Span)
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val nodeExecuteSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.NODE
) ?: return@intercept
spanAdapter?.onBeforeSpanFinished(nodeExecuteSpan)
endNodeExecuteSpan(
span = nodeExecuteSpan,
nodeOutput = null,
error = eventContext.throwable,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = nodeExecuteSpan,
path = patchedExecutionInfo
)
}
//endregion Node
//region Subgraph
pipeline.interceptSubgraphExecutionStarting(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry before subgraph handler" }
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val parentSpan = spanCollector.getParentSpanForEvent(
executionInfo = patchedExecutionInfo,
) ?: return@intercept
val subgraphInput = nodeDataToString(eventContext.input, eventContext.inputType)
val subgraphExecuteSpan = startSubgraphExecuteSpan(
tracer = tracer,
parentSpan = parentSpan,
id = eventContext.eventId,
runId = eventContext.context.runId,
subgraphId = eventContext.subgraph.id,
subgraphInput = subgraphInput
)
spanAdapter?.onBeforeSpanStarted(subgraphExecuteSpan)
spanCollector.collectSpan(
span = subgraphExecuteSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptSubgraphExecutionCompleted(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry after subgraph handler" }
// Find the existing span (Subgraph Execute Span)
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val subgraphExecuteSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.SUBGRAPH
) ?: return@intercept
val subgraphOutput = nodeDataToString(eventContext.output, eventContext.outputType)
spanAdapter?.onBeforeSpanFinished(subgraphExecuteSpan)
endSubgraphExecuteSpan(
span = subgraphExecuteSpan,
subgraphOutput = subgraphOutput,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = subgraphExecuteSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptSubgraphExecutionFailed(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry subgraph execution error handler" }
// Find the existing span (Subgraph Execute Span)
val patchedExecutionInfo = eventContext.executionInfo.appendRunId(eventContext.context.runId)
val subgraphExecuteSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.SUBGRAPH
) ?: return@intercept
spanAdapter?.onBeforeSpanFinished(subgraphExecuteSpan)
endSubgraphExecuteSpan(
span = subgraphExecuteSpan,
subgraphOutput = null,
error = eventContext.throwable,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = subgraphExecuteSpan,
path = patchedExecutionInfo
)
}
//endregion Subgraph
//region LLM Call
pipeline.interceptLLMCallStarting(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry before LLM call handler" }
val provider = eventContext.model.provider
val patchedExecutionInfo = eventContext.executionInfo
.appendRunId(eventContext.runId)
.appendId(eventContext.eventId)
val parentSpan = spanCollector.getParentSpanForEvent(
executionInfo = patchedExecutionInfo,
) ?: return@intercept
val messages = eventContext.prompt.messages.toList()
val inferenceSpan = startInferenceSpan(
tracer = tracer,
parentSpan = parentSpan,
id = eventContext.eventId,
provider = eventContext.model.provider,
runId = eventContext.runId,
model = eventContext.model,
messages = messages,
llmParams = eventContext.prompt.params,
tools = eventContext.tools
)
// Add events to the InferenceSpan after the span is created
val eventsFromMessages = messages.map { message ->
when (message) {
is Message.System -> {
SystemMessageEvent(provider, message)
}
is Message.User -> {
UserMessageEvent(provider, message)
}
is Message.Assistant, is Message.Reasoning -> {
AssistantMessageEvent(provider, message)
}
is Message.Tool.Call -> {
ChoiceEvent(provider, message, arguments = message.contentJson)
}
is Message.Tool.Result -> {
ToolMessageEvent(
provider = provider,
toolCallId = message.id,
content = message.content
)
}
}
}
inferenceSpan.addEvents(eventsFromMessages)
// Start span
spanAdapter?.onBeforeSpanStarted(inferenceSpan)
spanCollector.collectSpan(
span = inferenceSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptLLMCallCompleted(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry after LLM call handler" }
// Find the current span (Inference Span)
val patchedExecutionInfo = eventContext.executionInfo
.appendRunId(eventContext.runId)
.appendId(eventContext.eventId)
val inferenceSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.INFERENCE
) ?: return@intercept
val provider = eventContext.model.provider
// Add events to the InferenceSpan before finishing the span
val eventsToAdd = buildList {
eventContext.responses.mapIndexed { index, message ->
when (message) {
is Message.Assistant, is Message.Reasoning -> {
add(AssistantMessageEvent(provider, message))
}
is Message.Tool.Call -> {
add(ChoiceEvent(provider, message, arguments = message.contentJson, index = index))
}
}
}
eventContext.moderationResponse?.let { response ->
add(ModerationResponseEvent(provider, response))
}
}
inferenceSpan.addEvents(eventsToAdd)
// Finish Reasons Attribute
eventContext.responses.lastOrNull()?.let { message ->
val finishReasonsAttribute = when (message) {
is Message.Assistant, is Message.Reasoning -> {
SpanAttributes.Response.FinishReasons(reasons = listOf(SpanAttributes.Response.FinishReasonType.Stop))
}
is Message.Tool.Call -> {
SpanAttributes.Response.FinishReasons(reasons = listOf(SpanAttributes.Response.FinishReasonType.ToolCalls))
}
}
inferenceSpan.addAttribute(finishReasonsAttribute)
}
// Stop InferenceSpan
spanAdapter?.onBeforeSpanFinished(inferenceSpan)
endInferenceSpan(
span = inferenceSpan,
messages = eventContext.responses,
model = eventContext.model,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = inferenceSpan,
path = patchedExecutionInfo
)
}
//endregion LLM Call
//region Tool Call
pipeline.interceptToolCallStarting(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry tool call handler" }
val patchedExecutionInfo = eventContext.executionInfo
.appendRunId(eventContext.runId)
.appendId(eventContext.eventId)
val parentSpan = spanCollector.getParentSpanForEvent(
executionInfo = patchedExecutionInfo,
) ?: return@intercept
val executeToolSpan = startExecuteToolSpan(
tracer = tracer,
parentSpan = parentSpan,
id = eventContext.eventId,
toolName = eventContext.toolName,
toolArgs = eventContext.toolArgs,
toolDescription = eventContext.toolDescription,
toolCallId = eventContext.toolCallId
)
spanAdapter?.onBeforeSpanStarted(executeToolSpan)
spanCollector.collectSpan(executeToolSpan, patchedExecutionInfo)
}
pipeline.interceptToolCallCompleted(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry tool result handler" }
// Get the current ExecuteToolSpan
val patchedExecutionInfo = eventContext.executionInfo
.appendRunId(eventContext.runId)
.appendId(eventContext.eventId)
val executeToolSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.EXECUTE_TOOL
) ?: return@intercept
eventContext.toolDescription?.let { toolDescription ->
executeToolSpan.addAttribute(
// gen_ai.tool.description
attribute = SpanAttributes.Tool.Description(description = toolDescription)
)
}
spanAdapter?.onBeforeSpanFinished(span = executeToolSpan)
val toolResult = eventContext.toolResult ?: kotlinx.serialization.json.JsonObject(emptyMap())
endExecuteToolSpan(
span = executeToolSpan,
toolResult = toolResult,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = executeToolSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptToolCallFailed(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry tool call failure handler" }
// Get the current ExecuteToolSpan using executionInfo.path()
val patchedExecutionInfo = eventContext.executionInfo
.appendRunId(eventContext.runId)
.appendId(eventContext.eventId)
val executeToolSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.EXECUTE_TOOL
) ?: return@intercept
eventContext.toolDescription?.let { toolDescription ->
executeToolSpan.addAttribute(
// gen_ai.tool.description
attribute = SpanAttributes.Tool.Description(description = toolDescription)
)
}
executeToolSpan.addAttribute(
attribute = CommonAttributes.Error.Type(eventContext.message)
)
// End the ExecuteToolSpan span
spanAdapter?.onBeforeSpanFinished(executeToolSpan)
val toolResult = kotlinx.serialization.json.JsonObject(emptyMap())
endExecuteToolSpan(
span = executeToolSpan,
toolResult = toolResult,
error = eventContext.error,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = executeToolSpan,
path = patchedExecutionInfo
)
}
pipeline.interceptToolValidationFailed(this) intercept@{ eventContext ->
logger.debug { "Execute OpenTelemetry tool validation error handler" }
// Get the current ExecuteToolSpan using executionInfo.path()
val patchedExecutionInfo = eventContext.executionInfo
.appendRunId(eventContext.runId)
.appendId(eventContext.eventId)
val executeToolSpan = spanCollector.getStartedSpan(
executionInfo = patchedExecutionInfo,
eventId = eventContext.eventId,
spanType = SpanType.EXECUTE_TOOL
) ?: return@intercept
eventContext.toolDescription?.let { toolDescription ->
executeToolSpan.addAttribute(
// gen_ai.tool.description
attribute = SpanAttributes.Tool.Description(description = toolDescription)
)
}
executeToolSpan.addAttribute(
attribute = CommonAttributes.Error.Type(eventContext.message)
)
// End the ExecuteToolSpan span
spanAdapter?.onBeforeSpanFinished(executeToolSpan)
endExecuteToolSpan(
span = executeToolSpan,
toolResult = null,
error = eventContext.error,
verbose = config.isVerbose
)
spanCollector.removeSpan(
span = executeToolSpan,
path = patchedExecutionInfo
)
}
//endregion Tool Call
return openTelemetry
}