in pekko-connectors-sample-sqs-java/src/main/java/alpakka/sample/sqssample/Main.java [56:89]
void run() throws Exception {
// create SQS client
String sqsEndpoint = "http://localhost:9324";
SqsAsyncClient sqsClient =
SqsAsyncClient.builder()
.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
.endpointOverride(URI.create(sqsEndpoint))
.region(Region.EU_CENTRAL_1)
.httpClient(AkkaHttpClient.builder().withActorSystem(system.classicSystem()).build())
.build();
system.getWhenTerminated().thenAccept(notUsed -> sqsClient.close());
// configure SQS
SqsSourceSettings settings = SqsSourceSettings.create().withCloseOnEmptyReceive(true);
SqsAckSettings ackSettings = SqsAckSettings.create();
// create running stream
CompletionStage<Done> streamCompletion = SqsSource.create(sourceQueueUrl, settings, sqsClient)
.log("read from SQS")
.mapAsync(8, (Message msg) -> {
return enrichAndPublish(sqsClient, msg)
// upon completion ignore the result and pass on the original message
.thenApply(result -> msg);
})
.map(msg -> MessageAction.delete(msg))
.runWith(
SqsAckSink.create(sourceQueueUrl, ackSettings, sqsClient),
system
);
// terminate the actor system when the stream completes (see withCloseOnEmptyReceive)
streamCompletion.thenAccept(done -> system.terminate());
}