protected open fun onSendMessageCommon()

in a2a/a2a-server/src/commonMain/kotlin/ai/koog/a2a/server/A2AServer.kt [373:458]


    protected open fun onSendMessageCommon(
        request: Request<MessageSendParams>,
        ctx: ServerCallContext
    ): Flow<Response<Event>> = channelFlow {
        val message = request.data.message

        if (message.parts.isEmpty()) {
            throw A2AInvalidParamsException("Empty message parts are not supported")
        }

        val taskId = message.taskId ?: idGenerator.generateTaskId(message)

        val (session, monitoringStarted) = tasksMutex.withLock(taskId) {
            // If there's a currently running session for the same task, wait for it to finish.
            sessionManager.getSession(taskId)?.join()

            // Check if message links to a task.
            val task: Task? = message.taskId?.let { taskId ->
                // Check if the specified task exists
                val task = taskStorage.get(taskId, historyLength = 0, includeArtifacts = false)
                    ?: throw A2ATaskNotFoundException("Task '$taskId' not found")

                task
            }

            // Create event processor for the session based on the input data.
            val eventProcessor = SessionEventProcessor(
                contextId = task?.contextId
                    ?: message.contextId
                    ?: idGenerator.generateContextId(message),
                taskId = taskId,
                taskStorage = taskStorage,
            )

            // Create request context based on the request information.
            val requestContext = RequestContext(
                callContext = ctx,
                params = request.data,
                taskStorage = ContextTaskStorage(eventProcessor.contextId, taskStorage),
                messageStorage = ContextMessageStorage(eventProcessor.contextId, messageStorage),
                contextId = eventProcessor.contextId,
                taskId = eventProcessor.taskId,
                task = task,
            )

            LazySession(
                coroutineScope = coroutineScope,
                eventProcessor = eventProcessor,
            ) {
                agentExecutor.execute(requestContext, eventProcessor)
            }.let {
                it to sessionManager.addSession(it)
            }
        }

        // Signal that event collection is started
        val eventCollectinStarted: CompletableJob = Job()
        // Signal that all events have been collected
        val eventCollectionFinished: CompletableJob = Job()

        // Subscribe to events stream and start emitting them.
        launch {
            session.events
                .onStart {
                    eventCollectinStarted.complete()
                }
                .collect { event ->
                    send(Response(data = event, id = request.id))
                }

            eventCollectionFinished.complete()
        }

        // Ensure event collection is setup to stream events in response.
        eventCollectinStarted.join()
        // Ensure monitoring is ready to monitor the session.
        monitoringStarted.join()

        /*
         Start the session to execute the agent and wait for it to finish.
         Using await here to propagate any exceptions thrown by the agent execution.
         */
        session.agentJob.await()
        // Make sure all events have been collected and sent
        eventCollectionFinished.join()
    }