CompletionStage enrichAndPublish()

in pekko-connectors-sample-sqs-java/src/main/java/alpakka/sample/sqssample/Main.java [91:108]


    CompletionStage<SqsPublishResult> enrichAndPublish(SqsAsyncClient sqsClient, Message sqsMsg) {
        SqsPublishSettings publishSettings = SqsPublishSettings.create();
        final Flow<SendMessageRequest, SqsPublishResult, NotUsed> publishFlow = SqsPublishFlow.create(publishUrl, publishSettings, sqsClient);
        return Source.<Message>single(sqsMsg)
                .map(Main::transform)
                .mapAsync(1, (MessageFromSqs msg) -> {
                    CompletionStage<EnrichActor.Enriched> response =
                        AskPattern.ask(system, ref -> new EnrichActor.Enrich(msg.id, ref), Duration.ofSeconds(2), system.scheduler());
                    return response.thenApply(res -> {
                                log.debug("ask received '{}'", res);
                                return new EnrichedMessage(msg.id, msg.name, msg.url, res.data);
                            });
                })
                .map(amsg -> SendMessageRequest.builder().messageBody(enrichedMessageWriter.writeValueAsString(amsg)).build())
                .log("sending to publish queue")
                .via(publishFlow)
                .runWith(Sink.head(), system);
    }