in a2a/a2a-server/src/commonMain/kotlin/ai/koog/a2a/server/session/SessionManager.kt [63:114]
public suspend fun addSession(session: Session): CompletableJob {
sessionsRwLock.withWriteLock {
check(session.taskId !in sessions) {
"Session for taskId '${session.taskId}' already runs."
}
sessions[session.taskId] = session
}
// Signal to indicate the monitoring is started.
val monitoringStarted = Job()
// Monitor for agent job completion to send push notifications and remove session from the map.
coroutineScope.launch {
val firstEvent = session.events
.onStart { monitoringStarted.complete() }
.firstOrNull()
// Wait for the agent job to finish
session.agentJob.join()
/*
Check and wait if there's a cancellation request for this task running now and still publishing some events.
Then remove it from the session map.
*/
tasksMutex.withLock(cancelKey(session.taskId)) {
sessionsRwLock.withWriteLock {
sessions -= session.taskId
session.cancelAndJoin()
}
}
// Send push notifications with the current state of the task, after the session completion, if configured.
coroutineScope.launch {
if (firstEvent is TaskEvent && pushSender != null && pushConfigStorage != null) {
val task = taskStorage.get(session.taskId, historyLength = 0, includeArtifacts = false)
if (task != null) {
pushConfigStorage.getAll(session.taskId).forEach { config ->
try {
pushSender.send(config, task)
} catch (e: Exception) {
// TODO log error
}
}
}
}
}
}
return monitoringStarted
}