override suspend fun onCancelTask()

in a2a/a2a-server/src/commonMain/kotlin/ai/koog/a2a/server/A2AServer.kt [510:580]


    override suspend fun onCancelTask(
        request: Request<TaskIdParams>,
        ctx: ServerCallContext
    ): Response<Task> {
        val taskParams = request.data
        val taskId = taskParams.id

        /*
         Cancellation uses two lock levels. The first is the standard task lock.
         If it’s already held by another request, ignore it because cancellation takes priority.
         If it’s not held, acquire it to block new requests while the cancellation is in progress.
         */
        val lockAcquired = tasksMutex.tryLock(taskId)

        return try {
            /*
             The second lock is a per-task cancellation lock.
             It’s always taken during cancellation to serialize cancel operations and allow them to proceed even if the
             regular task lock is held. It prevents overlapping cancels and delays session teardown so the event processor
             isn’t closed immediately after the agent job is canceled. This allows the cancel handler to emit additional
             cancellation events through the same processor and session, ensuring that existing subscribers receive all events.
             */
            tasksMutex.withLock(cancelKey(taskId)) {
                val session = sessionManager.getSession(taskParams.id)

                val task = taskStorage.get(taskParams.id, historyLength = 0, includeArtifacts = true)
                    ?: throw A2ATaskNotFoundException("Task '${taskParams.id}' not found")

                // Task is not running, check if it's already in a terminal state.
                if (session == null && task.status.state.terminal) {
                    throw A2ATaskNotCancelableException("Task '${taskParams.id}' is already in terminal state ${task.status.state}")
                }

                val eventProcessor = session?.eventProcessor ?: SessionEventProcessor(
                    contextId = task.contextId,
                    taskId = task.id,
                    taskStorage = taskStorage,
                )

                // Create request context based on the request information.
                val requestContext = RequestContext(
                    callContext = ctx,
                    params = request.data,
                    taskStorage = ContextTaskStorage(eventProcessor.contextId, taskStorage),
                    messageStorage = ContextMessageStorage(eventProcessor.contextId, messageStorage),
                    contextId = eventProcessor.contextId,
                    taskId = eventProcessor.taskId,
                    task = task,
                )

                // Attempt to cancel the agent execution and wait until it's finished.
                agentExecutor.cancel(requestContext, eventProcessor, session?.agentJob)

                // Return the final task state.
                Response(
                    data = taskStorage.get(taskParams.id, historyLength = 0, includeArtifacts = true)
                        ?.also {
                            if (it.status.state != TaskState.Canceled) {
                                throw A2ATaskNotCancelableException("Task '${taskParams.id}' was not canceled successfully, current state is ${it.status.state}")
                            }
                        }
                        ?: throw A2ATaskNotFoundException("Task '${taskParams.id}' not found"),
                    id = request.id,
                )
            }
        } finally {
            if (lockAcquired) {
                tasksMutex.unlock(taskId)
            }
        }
    }