in ses-plugin-server/src/main/kotlin/jetbrains/buildServer/sesPlugin/sqs/awsCommunication/ReceiveMessagesTask.kt [26:66]
override fun perform(sqs: AmazonSQS, queueUrl: String): AmazonSQSCommunicationResult<AmazonSQSNotification> {
val messagesResult = try {
sqs.receiveMessage(prepareRequest().withQueueUrl(queueUrl))
} catch (ex: Exception) {
return AmazonSQSCommunicationResult(emptyList(), ex, "Cannot communicate with Amazon SQS")
}
if (messagesResult.messages.isEmpty()) {
return AmazonSQSCommunicationResult(emptyList(), null, "No new messages")
}
if (properties.getBoolean("teamcity.sesIntegration.markMessagesAsUnread", false)) {
for (i in messagesResult.messages) {
if (Thread.currentThread().isInterrupted) return AmazonSQSCommunicationResult(emptyList(), null, "Execution is interrupted")
sqs.changeMessageVisibility(ChangeMessageVisibilityRequest().withQueueUrl(queueUrl).withReceiptHandle(i.receiptHandle).withVisibilityTimeout(0))
}
} else {
if (properties.getBoolean("teamcity.sesIntegration.deleteReadMessages", true)) {
val r = DeleteMessageBatchRequest().withQueueUrl(queueUrl)
messagesResult.messages.forEach {
r.entries.add(DeleteMessageBatchRequestEntry(it.messageId, it.receiptHandle))
}
try {
sqs.deleteMessageBatch(r)
} catch (e: Exception) {
loggerService.log {
logger.warnAndDebugDetails("Cannot delete processed messages from '$queueUrl'", e)
}
}
}
}
if (Thread.currentThread().isInterrupted) return AmazonSQSCommunicationResult(emptyList(), null, "Execution is interrupted")
return messagesResult.messages.map {
sqsNotificationParser.parse(it.body)
}.let {
AmazonSQSCommunicationResult(it)
}
}