in powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java [128:172]
private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Exception> nonRetryableMessageToException) {
Optional<String> dlqUrl = fetchDlqUrl(nonRetryableMessageToException);
if (!dlqUrl.isPresent()) {
return false;
}
List<SendMessageBatchRequestEntry> dlqMessages = nonRetryableMessageToException.keySet().stream()
.map(sqsMessage -> {
Map<String, MessageAttributeValue> messageAttributesMap = new HashMap<>();
sqsMessage.getMessageAttributes().forEach((s, messageAttribute) -> {
MessageAttributeValue.Builder builder = MessageAttributeValue.builder();
builder
.dataType(messageAttribute.getDataType())
.stringValue(messageAttribute.getStringValue());
if (null != messageAttribute.getBinaryValue()) {
builder.binaryValue(SdkBytes.fromByteBuffer(messageAttribute.getBinaryValue()));
}
messageAttributesMap.put(s, builder.build());
});
return SendMessageBatchRequestEntry.builder()
.messageBody(sqsMessage.getBody())
.id(sqsMessage.getMessageId())
.messageAttributes(messageAttributesMap)
.build();
})
.collect(toList());
batchRequest(dlqMessages, 10, entriesToSend -> {
SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(SendMessageBatchRequest.builder()
.entries(entriesToSend)
.queueUrl(dlqUrl.get())
.build());
LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse);
});
return true;
}