void run()

in pekko-connectors-sample-mqtt-http-to-s3-java/src/main/java/alpakka/sample/triggereddownload/Main.java [52:80]


    void run() throws Exception {
        final MqttConnectionSettings mqttConnectionSettings =
                MqttConnectionSettings
                        .create(
                                mqttBroker,
                                "upload-control",
                                new MemoryPersistence()
                        );

        @SuppressWarnings("unchecked")
        MqttSubscriptions mqttSubscriptions =
                MqttSubscriptions.create(topic, MqttQoS.atLeastOnce());

        MqttSource
                .atMostOnce(mqttConnectionSettings, mqttSubscriptions, 8)
                .map(m -> m.payload().utf8String())
                .<DownloadCommand>map(downloadCommandReader::readValue)
                .mapAsync(4, info -> {
                            String s3BucketKey = createS3BucketKey(info);
                            return Source.single(info.url)
                                    .map(HttpRequest::GET)
                                    .mapAsync(1, http::singleRequest)
                                    .flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
                                    .runWith(S3.multipartUpload(s3Bucket, s3BucketKey, ContentTypes.TEXT_HTML_UTF8), system);
                        }
                )
                .runForeach(res -> System.out.println(res), system)
                .exceptionally(e -> { e.printStackTrace(); return Done.done(); });
    }