in pekko-connectors-sample-sqs-java/src/main/java/alpakka/sample/sqssample/Main.java [91:108]
CompletionStage<SqsPublishResult> enrichAndPublish(SqsAsyncClient sqsClient, Message sqsMsg) {
SqsPublishSettings publishSettings = SqsPublishSettings.create();
final Flow<SendMessageRequest, SqsPublishResult, NotUsed> publishFlow = SqsPublishFlow.create(publishUrl, publishSettings, sqsClient);
return Source.<Message>single(sqsMsg)
.map(Main::transform)
.mapAsync(1, (MessageFromSqs msg) -> {
CompletionStage<EnrichActor.Enriched> response =
AskPattern.ask(system, ref -> new EnrichActor.Enrich(msg.id, ref), Duration.ofSeconds(2), system.scheduler());
return response.thenApply(res -> {
log.debug("ask received '{}'", res);
return new EnrichedMessage(msg.id, msg.name, msg.url, res.data);
});
})
.map(amsg -> SendMessageRequest.builder().messageBody(enrichedMessageWriter.writeValueAsString(amsg)).build())
.log("sending to publish queue")
.via(publishFlow)
.runWith(Sink.head(), system);
}