void run()

in pekko-connectors-sample-sqs-java/src/main/java/alpakka/sample/sqssample/Main.java [56:89]


    void run() throws Exception {
        // create SQS client
        String sqsEndpoint = "http://localhost:9324";
        SqsAsyncClient sqsClient =
                SqsAsyncClient.builder()
                        .credentialsProvider(
                                StaticCredentialsProvider.create(AwsBasicCredentials.create("x", "x")))
                        .endpointOverride(URI.create(sqsEndpoint))
                        .region(Region.EU_CENTRAL_1)
                        .httpClient(AkkaHttpClient.builder().withActorSystem(system.classicSystem()).build())
                        .build();
        system.getWhenTerminated().thenAccept(notUsed -> sqsClient.close());

        // configure SQS
        SqsSourceSettings settings = SqsSourceSettings.create().withCloseOnEmptyReceive(true);
        SqsAckSettings ackSettings = SqsAckSettings.create();

        // create running stream
        CompletionStage<Done> streamCompletion = SqsSource.create(sourceQueueUrl, settings, sqsClient)
                .log("read from SQS")
                .mapAsync(8, (Message msg) -> {
                    return enrichAndPublish(sqsClient, msg)
                            // upon completion ignore the result and pass on the original message
                            .thenApply(result -> msg);
                })
                .map(msg -> MessageAction.delete(msg))
                .runWith(
                        SqsAckSink.create(sourceQueueUrl, ackSettings, sqsClient),
                        system
                );

        // terminate the actor system when the stream completes (see withCloseOnEmptyReceive)
        streamCompletion.thenAccept(done -> system.terminate());
    }