in a2a/a2a-server/src/commonMain/kotlin/ai/koog/a2a/server/A2AServer.kt [373:458]
protected open fun onSendMessageCommon(
request: Request<MessageSendParams>,
ctx: ServerCallContext
): Flow<Response<Event>> = channelFlow {
val message = request.data.message
if (message.parts.isEmpty()) {
throw A2AInvalidParamsException("Empty message parts are not supported")
}
val taskId = message.taskId ?: idGenerator.generateTaskId(message)
val (session, monitoringStarted) = tasksMutex.withLock(taskId) {
// If there's a currently running session for the same task, wait for it to finish.
sessionManager.getSession(taskId)?.join()
// Check if message links to a task.
val task: Task? = message.taskId?.let { taskId ->
// Check if the specified task exists
val task = taskStorage.get(taskId, historyLength = 0, includeArtifacts = false)
?: throw A2ATaskNotFoundException("Task '$taskId' not found")
task
}
// Create event processor for the session based on the input data.
val eventProcessor = SessionEventProcessor(
contextId = task?.contextId
?: message.contextId
?: idGenerator.generateContextId(message),
taskId = taskId,
taskStorage = taskStorage,
)
// Create request context based on the request information.
val requestContext = RequestContext(
callContext = ctx,
params = request.data,
taskStorage = ContextTaskStorage(eventProcessor.contextId, taskStorage),
messageStorage = ContextMessageStorage(eventProcessor.contextId, messageStorage),
contextId = eventProcessor.contextId,
taskId = eventProcessor.taskId,
task = task,
)
LazySession(
coroutineScope = coroutineScope,
eventProcessor = eventProcessor,
) {
agentExecutor.execute(requestContext, eventProcessor)
}.let {
it to sessionManager.addSession(it)
}
}
// Signal that event collection is started
val eventCollectinStarted: CompletableJob = Job()
// Signal that all events have been collected
val eventCollectionFinished: CompletableJob = Job()
// Subscribe to events stream and start emitting them.
launch {
session.events
.onStart {
eventCollectinStarted.complete()
}
.collect { event ->
send(Response(data = event, id = request.id))
}
eventCollectionFinished.complete()
}
// Ensure event collection is setup to stream events in response.
eventCollectinStarted.join()
// Ensure monitoring is ready to monitor the session.
monitoringStarted.join()
/*
Start the session to execute the agent and wait for it to finish.
Using await here to propagate any exceptions thrown by the agent execution.
*/
session.agentJob.await()
// Make sure all events have been collected and sent
eventCollectionFinished.join()
}