in pekko-connectors-sample-mqtt-http-to-s3-java/src/main/java/alpakka/sample/triggereddownload/Main.java [52:80]
void run() throws Exception {
final MqttConnectionSettings mqttConnectionSettings =
MqttConnectionSettings
.create(
mqttBroker,
"upload-control",
new MemoryPersistence()
);
@SuppressWarnings("unchecked")
MqttSubscriptions mqttSubscriptions =
MqttSubscriptions.create(topic, MqttQoS.atLeastOnce());
MqttSource
.atMostOnce(mqttConnectionSettings, mqttSubscriptions, 8)
.map(m -> m.payload().utf8String())
.<DownloadCommand>map(downloadCommandReader::readValue)
.mapAsync(4, info -> {
String s3BucketKey = createS3BucketKey(info);
return Source.single(info.url)
.map(HttpRequest::GET)
.mapAsync(1, http::singleRequest)
.flatMapConcat(httpResponse -> httpResponse.entity().getDataBytes())
.runWith(S3.multipartUpload(s3Bucket, s3BucketKey, ContentTypes.TEXT_HTML_UTF8), system);
}
)
.runForeach(res -> System.out.println(res), system)
.exceptionally(e -> { e.printStackTrace(); return Done.done(); });
}