in plugins/amazonq/chat/jetbrains-community/src/software/aws/toolkits/jetbrains/services/cwc/controller/chat/messenger/ChatPromptHandler.kt [55:158]
fun handle(
tabId: String,
triggerId: String,
data: ChatRequestData,
sessionInfo: ChatSessionInfo,
shouldAddIndexInProgressMessage: Boolean,
isInlineChat: Boolean = false,
) = flow {
val session = sessionInfo.session
session.chat(data)
.onStart {
// The first thing we always send back is an AnswerStream message to indicate the beginning of a streaming answer
val response =
ChatMessage(
tabId = tabId,
triggerId = triggerId,
messageId = requestId,
messageType = ChatMessageType.AnswerStream,
message = "",
userIntent = data.userIntent,
)
telemetryHelper.setResponseStreamStartTime(tabId)
emit(response)
}
.onCompletion { error ->
// Don't emit any other responses if we cancelled the collection
if (error is CancellationException) {
return@onCompletion
} // for any other exception, let the `catch` operator handle it.
else if (error != null) {
throw error
}
// Send the gathered suggestions in a final answer-part message
if (relatedSuggestions.isNotEmpty()) {
val suggestionMessage = ChatMessage(
tabId = tabId,
triggerId = triggerId,
messageId = requestId,
messageType = ChatMessageType.AnswerPart,
message = responseText.toString(),
relatedSuggestions = relatedSuggestions,
userIntent = data.userIntent,
)
emit(suggestionMessage)
}
// Send the Answer message to indicate the end of the response stream
val response = ChatMessage(
tabId = tabId,
triggerId = triggerId,
messageId = requestId,
messageType = ChatMessageType.Answer,
followUps = followUps,
userIntent = data.userIntent,
)
telemetryHelper.setResponseStreamTotalTime(tabId)
telemetryHelper.setResponseHasProjectContext(
requestId,
telemetryHelper.getIsProjectContextEnabled() && data.useRelevantDocuments && data.relevantTextDocuments.isNotEmpty()
)
telemetryHelper.recordAddMessage(data, response, responseText.length, statusCode, countTotalNumberOfCodeBlocks(responseText))
emit(response)
broadcastQEvent(QFeatureEvent.INVOCATION)
}
.catch { exception ->
val statusCode = if (exception is AwsServiceException) exception.statusCode() else 0
telemetryHelper.recordMessageResponseError(data, tabId, statusCode)
if (exception is CodeWhispererStreamingException) {
throw ChatApiException(
message = exception.message ?: "Encountered exception calling the API",
sessionId = session.conversationId,
requestId = exception.requestId(),
statusCode = exception.statusCode(),
cause = exception,
)
} else {
throw ChatApiException(
message = exception.message ?: "Encountered exception calling the API",
sessionId = session.conversationId,
requestId = null,
statusCode = null,
cause = exception,
)
}
}
.onEach { responseEvent ->
if (isInlineChat) processChatEvent(tabId, triggerId, data, responseEvent, shouldAddIndexInProgressMessage)?.let { emit(it) }
}
.collect { responseEvent ->
if (!isInlineChat) {
processChatEvent(
tabId,
triggerId,
data,
responseEvent,
shouldAddIndexInProgressMessage
)?.let { emit(it) }
}
}
}