sdk/src/main/java/software/amazon/awssdk/iot/iotshadow/IotShadowV2Client.java (418 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*
* This file is generated.
*/
package software.amazon.awssdk.iot.iotshadow;
import java.lang.AutoCloseable;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.UUID;
import java.util.function.BiFunction;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import software.amazon.awssdk.crt.CrtRuntimeException;
import software.amazon.awssdk.crt.iot.*;
import software.amazon.awssdk.crt.mqtt.MqttClientConnection;
import software.amazon.awssdk.crt.mqtt5.Mqtt5Client;
import software.amazon.awssdk.iot.*;
import software.amazon.awssdk.iot.iotshadow.model.*;
/**
* The AWS IoT Device Shadow service adds shadows to AWS IoT thing objects. Shadows are a simple data store for device properties and state. Shadows can make a device’s state available to apps and other services whether the device is connected to AWS IoT or not.
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html
*
*/
public class IotShadowV2Client implements AutoCloseable {
private MqttRequestResponseClient rrClient;
private final Gson gson;
private Gson createGson() {
GsonBuilder gson = new GsonBuilder();
gson.disableHtmlEscaping();
gson.registerTypeAdapter(Timestamp.class, new Timestamp.Serializer());
gson.registerTypeAdapter(Timestamp.class, new Timestamp.Deserializer());
addTypeAdapters(gson);
return gson.create();
}
private void addTypeAdapters(GsonBuilder gson) {
ShadowStateFactory shadowStateFactory = new ShadowStateFactory();
gson.registerTypeAdapterFactory(shadowStateFactory);
}
private IotShadowV2Client(MqttRequestResponseClient rrClient) {
this.rrClient = rrClient;
this.gson = createGson();
}
/**
* Constructs a new IotShadowV2Client, using an MQTT5 client as transport
*
* @param protocolClient the MQTT5 client to use
* @param options configuration options to use
*/
static public IotShadowV2Client newFromMqtt5(Mqtt5Client protocolClient, MqttRequestResponseClientOptions options) {
MqttRequestResponseClient rrClient = new MqttRequestResponseClient(protocolClient, options);
return new IotShadowV2Client(rrClient);
}
/**
* Constructs a new IotShadowV2Client, using an MQTT311 client as transport
*
* @param protocolClient the MQTT311 client to use
* @param options configuration options to use
*/
static public IotShadowV2Client newFromMqtt311(MqttClientConnection protocolClient, MqttRequestResponseClientOptions options) {
MqttRequestResponseClient rrClient = new MqttRequestResponseClient(protocolClient, options);
return new IotShadowV2Client(rrClient);
}
/**
* Releases all resources used by the client. It is not valid to invoke operations
* on the client after it has been closed.
*/
public void close() {
this.rrClient.decRef();
this.rrClient = null;
}
/**
* Deletes a named shadow for an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#delete-pub-sub-topic
*
* @param request modeled request to perform
*
* @return a future that will complete with the corresponding response
*/
public CompletableFuture<DeleteShadowResponse> deleteNamedShadow(DeleteNamedShadowRequest request) {
V2ClientFuture<DeleteShadowResponse> responseFuture = new V2ClientFuture<>();
try {
if (request.thingName == null) {
throw new CrtRuntimeException("DeleteNamedShadowRequest.thingName cannot be null");
}
if (request.shadowName == null) {
throw new CrtRuntimeException("DeleteNamedShadowRequest.shadowName cannot be null");
}
RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder();
// Correlation Token
String correlationToken = UUID.randomUUID().toString();
request.clientToken = correlationToken;
builder.withCorrelationToken(correlationToken);
// Publish Topic
String publishTopic = "$aws/things/{thingName}/shadow/name/{shadowName}/delete";
publishTopic = publishTopic.replace("{thingName}", request.thingName);
publishTopic = publishTopic.replace("{shadowName}", request.shadowName);
builder.withPublishTopic(publishTopic);
// Payload
String payloadJson = gson.toJson(request);
builder.withPayload(payloadJson.getBytes(StandardCharsets.UTF_8));
// Subscriptions
String subscription0 = "$aws/things/{thingName}/shadow/name/{shadowName}/delete/+";
subscription0 = subscription0.replace("{thingName}", request.thingName);
subscription0 = subscription0.replace("{shadowName}", request.shadowName);
builder.withSubscription(subscription0);
// Response paths
ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder();
String responseTopic1 = publishTopic + "/accepted";
pathBuilder1.withResponseTopic(publishTopic + "/accepted");
pathBuilder1.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder1.build());
ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder();
String responseTopic2 = publishTopic + "/rejected";
pathBuilder2.withResponseTopic(publishTopic + "/rejected");
pathBuilder2.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder2.build());
// Submit
submitOperation(responseFuture, builder.build(), responseTopic1, DeleteShadowResponse.class, responseTopic2, V2ErrorResponse.class, IotShadowV2Client::createV2ErrorResponseException);
} catch (Exception e) {
responseFuture.completeExceptionally(createV2ErrorResponseException(e.getMessage(), null));
}
return responseFuture;
}
/**
* Deletes the (classic) shadow for an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#delete-pub-sub-topic
*
* @param request modeled request to perform
*
* @return a future that will complete with the corresponding response
*/
public CompletableFuture<DeleteShadowResponse> deleteShadow(DeleteShadowRequest request) {
V2ClientFuture<DeleteShadowResponse> responseFuture = new V2ClientFuture<>();
try {
if (request.thingName == null) {
throw new CrtRuntimeException("DeleteShadowRequest.thingName cannot be null");
}
RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder();
// Correlation Token
String correlationToken = UUID.randomUUID().toString();
request.clientToken = correlationToken;
builder.withCorrelationToken(correlationToken);
// Publish Topic
String publishTopic = "$aws/things/{thingName}/shadow/delete";
publishTopic = publishTopic.replace("{thingName}", request.thingName);
builder.withPublishTopic(publishTopic);
// Payload
String payloadJson = gson.toJson(request);
builder.withPayload(payloadJson.getBytes(StandardCharsets.UTF_8));
// Subscriptions
String subscription0 = "$aws/things/{thingName}/shadow/delete/+";
subscription0 = subscription0.replace("{thingName}", request.thingName);
builder.withSubscription(subscription0);
// Response paths
ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder();
String responseTopic1 = publishTopic + "/accepted";
pathBuilder1.withResponseTopic(publishTopic + "/accepted");
pathBuilder1.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder1.build());
ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder();
String responseTopic2 = publishTopic + "/rejected";
pathBuilder2.withResponseTopic(publishTopic + "/rejected");
pathBuilder2.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder2.build());
// Submit
submitOperation(responseFuture, builder.build(), responseTopic1, DeleteShadowResponse.class, responseTopic2, V2ErrorResponse.class, IotShadowV2Client::createV2ErrorResponseException);
} catch (Exception e) {
responseFuture.completeExceptionally(createV2ErrorResponseException(e.getMessage(), null));
}
return responseFuture;
}
/**
* Gets a named shadow for an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#get-pub-sub-topic
*
* @param request modeled request to perform
*
* @return a future that will complete with the corresponding response
*/
public CompletableFuture<GetShadowResponse> getNamedShadow(GetNamedShadowRequest request) {
V2ClientFuture<GetShadowResponse> responseFuture = new V2ClientFuture<>();
try {
if (request.thingName == null) {
throw new CrtRuntimeException("GetNamedShadowRequest.thingName cannot be null");
}
if (request.shadowName == null) {
throw new CrtRuntimeException("GetNamedShadowRequest.shadowName cannot be null");
}
RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder();
// Correlation Token
String correlationToken = UUID.randomUUID().toString();
request.clientToken = correlationToken;
builder.withCorrelationToken(correlationToken);
// Publish Topic
String publishTopic = "$aws/things/{thingName}/shadow/name/{shadowName}/get";
publishTopic = publishTopic.replace("{thingName}", request.thingName);
publishTopic = publishTopic.replace("{shadowName}", request.shadowName);
builder.withPublishTopic(publishTopic);
// Payload
String payloadJson = gson.toJson(request);
builder.withPayload(payloadJson.getBytes(StandardCharsets.UTF_8));
// Subscriptions
String subscription0 = "$aws/things/{thingName}/shadow/name/{shadowName}/get/+";
subscription0 = subscription0.replace("{thingName}", request.thingName);
subscription0 = subscription0.replace("{shadowName}", request.shadowName);
builder.withSubscription(subscription0);
// Response paths
ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder();
String responseTopic1 = publishTopic + "/accepted";
pathBuilder1.withResponseTopic(publishTopic + "/accepted");
pathBuilder1.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder1.build());
ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder();
String responseTopic2 = publishTopic + "/rejected";
pathBuilder2.withResponseTopic(publishTopic + "/rejected");
pathBuilder2.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder2.build());
// Submit
submitOperation(responseFuture, builder.build(), responseTopic1, GetShadowResponse.class, responseTopic2, V2ErrorResponse.class, IotShadowV2Client::createV2ErrorResponseException);
} catch (Exception e) {
responseFuture.completeExceptionally(createV2ErrorResponseException(e.getMessage(), null));
}
return responseFuture;
}
/**
* Gets the (classic) shadow for an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#get-pub-sub-topic
*
* @param request modeled request to perform
*
* @return a future that will complete with the corresponding response
*/
public CompletableFuture<GetShadowResponse> getShadow(GetShadowRequest request) {
V2ClientFuture<GetShadowResponse> responseFuture = new V2ClientFuture<>();
try {
if (request.thingName == null) {
throw new CrtRuntimeException("GetShadowRequest.thingName cannot be null");
}
RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder();
// Correlation Token
String correlationToken = UUID.randomUUID().toString();
request.clientToken = correlationToken;
builder.withCorrelationToken(correlationToken);
// Publish Topic
String publishTopic = "$aws/things/{thingName}/shadow/get";
publishTopic = publishTopic.replace("{thingName}", request.thingName);
builder.withPublishTopic(publishTopic);
// Payload
String payloadJson = gson.toJson(request);
builder.withPayload(payloadJson.getBytes(StandardCharsets.UTF_8));
// Subscriptions
String subscription0 = "$aws/things/{thingName}/shadow/get/+";
subscription0 = subscription0.replace("{thingName}", request.thingName);
builder.withSubscription(subscription0);
// Response paths
ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder();
String responseTopic1 = publishTopic + "/accepted";
pathBuilder1.withResponseTopic(publishTopic + "/accepted");
pathBuilder1.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder1.build());
ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder();
String responseTopic2 = publishTopic + "/rejected";
pathBuilder2.withResponseTopic(publishTopic + "/rejected");
pathBuilder2.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder2.build());
// Submit
submitOperation(responseFuture, builder.build(), responseTopic1, GetShadowResponse.class, responseTopic2, V2ErrorResponse.class, IotShadowV2Client::createV2ErrorResponseException);
} catch (Exception e) {
responseFuture.completeExceptionally(createV2ErrorResponseException(e.getMessage(), null));
}
return responseFuture;
}
/**
* Update a named shadow for a device.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#update-pub-sub-topic
*
* @param request modeled request to perform
*
* @return a future that will complete with the corresponding response
*/
public CompletableFuture<UpdateShadowResponse> updateNamedShadow(UpdateNamedShadowRequest request) {
V2ClientFuture<UpdateShadowResponse> responseFuture = new V2ClientFuture<>();
try {
if (request.thingName == null) {
throw new CrtRuntimeException("UpdateNamedShadowRequest.thingName cannot be null");
}
if (request.shadowName == null) {
throw new CrtRuntimeException("UpdateNamedShadowRequest.shadowName cannot be null");
}
RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder();
// Correlation Token
String correlationToken = UUID.randomUUID().toString();
request.clientToken = correlationToken;
builder.withCorrelationToken(correlationToken);
// Publish Topic
String publishTopic = "$aws/things/{thingName}/shadow/name/{shadowName}/update";
publishTopic = publishTopic.replace("{thingName}", request.thingName);
publishTopic = publishTopic.replace("{shadowName}", request.shadowName);
builder.withPublishTopic(publishTopic);
// Payload
String payloadJson = gson.toJson(request);
builder.withPayload(payloadJson.getBytes(StandardCharsets.UTF_8));
// Subscriptions
String subscription0 = "$aws/things/{thingName}/shadow/name/{shadowName}/update/accepted";
subscription0 = subscription0.replace("{thingName}", request.thingName);
subscription0 = subscription0.replace("{shadowName}", request.shadowName);
builder.withSubscription(subscription0);
String subscription1 = "$aws/things/{thingName}/shadow/name/{shadowName}/update/rejected";
subscription1 = subscription1.replace("{thingName}", request.thingName);
subscription1 = subscription1.replace("{shadowName}", request.shadowName);
builder.withSubscription(subscription1);
// Response paths
ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder();
String responseTopic1 = publishTopic + "/accepted";
pathBuilder1.withResponseTopic(publishTopic + "/accepted");
pathBuilder1.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder1.build());
ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder();
String responseTopic2 = publishTopic + "/rejected";
pathBuilder2.withResponseTopic(publishTopic + "/rejected");
pathBuilder2.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder2.build());
// Submit
submitOperation(responseFuture, builder.build(), responseTopic1, UpdateShadowResponse.class, responseTopic2, V2ErrorResponse.class, IotShadowV2Client::createV2ErrorResponseException);
} catch (Exception e) {
responseFuture.completeExceptionally(createV2ErrorResponseException(e.getMessage(), null));
}
return responseFuture;
}
/**
* Update a device's (classic) shadow.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#update-pub-sub-topic
*
* @param request modeled request to perform
*
* @return a future that will complete with the corresponding response
*/
public CompletableFuture<UpdateShadowResponse> updateShadow(UpdateShadowRequest request) {
V2ClientFuture<UpdateShadowResponse> responseFuture = new V2ClientFuture<>();
try {
if (request.thingName == null) {
throw new CrtRuntimeException("UpdateShadowRequest.thingName cannot be null");
}
RequestResponseOperation.RequestResponseOperationBuilder builder = RequestResponseOperation.builder();
// Correlation Token
String correlationToken = UUID.randomUUID().toString();
request.clientToken = correlationToken;
builder.withCorrelationToken(correlationToken);
// Publish Topic
String publishTopic = "$aws/things/{thingName}/shadow/update";
publishTopic = publishTopic.replace("{thingName}", request.thingName);
builder.withPublishTopic(publishTopic);
// Payload
String payloadJson = gson.toJson(request);
builder.withPayload(payloadJson.getBytes(StandardCharsets.UTF_8));
// Subscriptions
String subscription0 = "$aws/things/{thingName}/shadow/update/accepted";
subscription0 = subscription0.replace("{thingName}", request.thingName);
builder.withSubscription(subscription0);
String subscription1 = "$aws/things/{thingName}/shadow/update/rejected";
subscription1 = subscription1.replace("{thingName}", request.thingName);
builder.withSubscription(subscription1);
// Response paths
ResponsePath.ResponsePathBuilder pathBuilder1 = ResponsePath.builder();
String responseTopic1 = publishTopic + "/accepted";
pathBuilder1.withResponseTopic(publishTopic + "/accepted");
pathBuilder1.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder1.build());
ResponsePath.ResponsePathBuilder pathBuilder2 = ResponsePath.builder();
String responseTopic2 = publishTopic + "/rejected";
pathBuilder2.withResponseTopic(publishTopic + "/rejected");
pathBuilder2.withCorrelationTokenJsonPath("clientToken");
builder.withResponsePath(pathBuilder2.build());
// Submit
submitOperation(responseFuture, builder.build(), responseTopic1, UpdateShadowResponse.class, responseTopic2, V2ErrorResponse.class, IotShadowV2Client::createV2ErrorResponseException);
} catch (Exception e) {
responseFuture.completeExceptionally(createV2ErrorResponseException(e.getMessage(), null));
}
return responseFuture;
}
/**
* Create a stream for NamedShadowDelta events for a named shadow of an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#update-delta-pub-sub-topic
*
* @param request modeled streaming operation subscription configuration
* @param options set of callbacks that the operation should invoke in response to related events
*
* @return a streaming operation which will invoke a callback every time a message is received on the
* associated MQTT topic
*/
public StreamingOperation createNamedShadowDeltaUpdatedStream(NamedShadowDeltaUpdatedSubscriptionRequest request, V2ClientStreamOptions<ShadowDeltaUpdatedEvent> options) {
String topic = "$aws/things/{thingName}/shadow/name/{shadowName}/update/delta";
if (request.thingName == null) {
throw new CrtRuntimeException("NamedShadowDeltaUpdatedSubscriptionRequest.thingName cannot be null");
}
topic = topic.replace("{thingName}", request.thingName);
if (request.shadowName == null) {
throw new CrtRuntimeException("NamedShadowDeltaUpdatedSubscriptionRequest.shadowName cannot be null");
}
topic = topic.replace("{shadowName}", request.shadowName);
StreamingOperationOptions innerOptions = StreamingOperationOptions.builder()
.withTopic(topic)
.withSubscriptionStatusEventCallback(options.subscriptionEventHandler())
.withIncomingPublishEventCallback((event) -> {
try {
String payload = new String(event.getPayload(), StandardCharsets.UTF_8);
ShadowDeltaUpdatedEvent response = this.gson.fromJson(payload, ShadowDeltaUpdatedEvent.class);
options.streamEventHandler().accept(response);
} catch (Exception e) {
V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder()
.withCause(e)
.withPayload(event.getPayload())
.withTopic(event.getTopic())
.build();
options.deserializationFailureHandler().accept(failureEvent);
}
})
.build();
return this.rrClient.createStream(innerOptions);
}
/**
* Create a stream for ShadowUpdated events for a named shadow of an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#update-documents-pub-sub-topic
*
* @param request modeled streaming operation subscription configuration
* @param options set of callbacks that the operation should invoke in response to related events
*
* @return a streaming operation which will invoke a callback every time a message is received on the
* associated MQTT topic
*/
public StreamingOperation createNamedShadowUpdatedStream(NamedShadowUpdatedSubscriptionRequest request, V2ClientStreamOptions<ShadowUpdatedEvent> options) {
String topic = "$aws/things/{thingName}/shadow/name/{shadowName}/update/documents";
if (request.thingName == null) {
throw new CrtRuntimeException("NamedShadowUpdatedSubscriptionRequest.thingName cannot be null");
}
topic = topic.replace("{thingName}", request.thingName);
if (request.shadowName == null) {
throw new CrtRuntimeException("NamedShadowUpdatedSubscriptionRequest.shadowName cannot be null");
}
topic = topic.replace("{shadowName}", request.shadowName);
StreamingOperationOptions innerOptions = StreamingOperationOptions.builder()
.withTopic(topic)
.withSubscriptionStatusEventCallback(options.subscriptionEventHandler())
.withIncomingPublishEventCallback((event) -> {
try {
String payload = new String(event.getPayload(), StandardCharsets.UTF_8);
ShadowUpdatedEvent response = this.gson.fromJson(payload, ShadowUpdatedEvent.class);
options.streamEventHandler().accept(response);
} catch (Exception e) {
V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder()
.withCause(e)
.withPayload(event.getPayload())
.withTopic(event.getTopic())
.build();
options.deserializationFailureHandler().accept(failureEvent);
}
})
.build();
return this.rrClient.createStream(innerOptions);
}
/**
* Create a stream for ShadowDelta events for the (classic) shadow of an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#update-delta-pub-sub-topic
*
* @param request modeled streaming operation subscription configuration
* @param options set of callbacks that the operation should invoke in response to related events
*
* @return a streaming operation which will invoke a callback every time a message is received on the
* associated MQTT topic
*/
public StreamingOperation createShadowDeltaUpdatedStream(ShadowDeltaUpdatedSubscriptionRequest request, V2ClientStreamOptions<ShadowDeltaUpdatedEvent> options) {
String topic = "$aws/things/{thingName}/shadow/update/delta";
if (request.thingName == null) {
throw new CrtRuntimeException("ShadowDeltaUpdatedSubscriptionRequest.thingName cannot be null");
}
topic = topic.replace("{thingName}", request.thingName);
StreamingOperationOptions innerOptions = StreamingOperationOptions.builder()
.withTopic(topic)
.withSubscriptionStatusEventCallback(options.subscriptionEventHandler())
.withIncomingPublishEventCallback((event) -> {
try {
String payload = new String(event.getPayload(), StandardCharsets.UTF_8);
ShadowDeltaUpdatedEvent response = this.gson.fromJson(payload, ShadowDeltaUpdatedEvent.class);
options.streamEventHandler().accept(response);
} catch (Exception e) {
V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder()
.withCause(e)
.withPayload(event.getPayload())
.withTopic(event.getTopic())
.build();
options.deserializationFailureHandler().accept(failureEvent);
}
})
.build();
return this.rrClient.createStream(innerOptions);
}
/**
* Create a stream for ShadowUpdated events for the (classic) shadow of an AWS IoT thing.
*
*
* AWS documentation: https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html#update-documents-pub-sub-topic
*
* @param request modeled streaming operation subscription configuration
* @param options set of callbacks that the operation should invoke in response to related events
*
* @return a streaming operation which will invoke a callback every time a message is received on the
* associated MQTT topic
*/
public StreamingOperation createShadowUpdatedStream(ShadowUpdatedSubscriptionRequest request, V2ClientStreamOptions<ShadowUpdatedEvent> options) {
String topic = "$aws/things/{thingName}/shadow/update/documents";
if (request.thingName == null) {
throw new CrtRuntimeException("ShadowUpdatedSubscriptionRequest.thingName cannot be null");
}
topic = topic.replace("{thingName}", request.thingName);
StreamingOperationOptions innerOptions = StreamingOperationOptions.builder()
.withTopic(topic)
.withSubscriptionStatusEventCallback(options.subscriptionEventHandler())
.withIncomingPublishEventCallback((event) -> {
try {
String payload = new String(event.getPayload(), StandardCharsets.UTF_8);
ShadowUpdatedEvent response = this.gson.fromJson(payload, ShadowUpdatedEvent.class);
options.streamEventHandler().accept(response);
} catch (Exception e) {
V2DeserializationFailureEvent failureEvent = V2DeserializationFailureEvent.builder()
.withCause(e)
.withPayload(event.getPayload())
.withTopic(event.getTopic())
.build();
options.deserializationFailureHandler().accept(failureEvent);
}
})
.build();
return this.rrClient.createStream(innerOptions);
}
static private Throwable createV2ErrorResponseException(String message, V2ErrorResponse errorResponse) {
return new V2ErrorResponseException(message, errorResponse);
}
private <T, E> void submitOperation(V2ClientFuture<T> finalFuture, RequestResponseOperation operation, String responseTopic, Class<T> responseClass, String errorTopic, Class<E> errorClass, BiFunction<String, E, Throwable> exceptionFactory) {
try {
CompletableFuture<MqttRequestResponse> responseFuture = this.rrClient.submitRequest(operation);
CompletableFuture<MqttRequestResponse> compositeFuture = responseFuture.whenComplete((res, ex) -> {
if (ex != null) {
finalFuture.completeExceptionally(exceptionFactory.apply(ex.getMessage(), null));
} else if (res.getTopic().equals(responseTopic)) {
try {
String payload = new String(res.getPayload(), StandardCharsets.UTF_8);
T response = this.gson.fromJson(payload, responseClass);
finalFuture.complete(response);
} catch (Exception e) {
finalFuture.completeExceptionally(exceptionFactory.apply(e.getMessage(), null));
}
} else if (res.getTopic().equals(errorTopic)) {
try {
String payload = new String(res.getPayload(), StandardCharsets.UTF_8);
E error = this.gson.fromJson(payload, errorClass);
finalFuture.completeExceptionally(exceptionFactory.apply("Request-response operation failure", error));
} catch (Exception e) {
finalFuture.completeExceptionally(exceptionFactory.apply(e.getMessage(), null));
}
} else {
finalFuture.completeExceptionally(exceptionFactory.apply("Request-response operation completed on unknown topic: " + res.getTopic(), null));
}
});
finalFuture.setTriggeringFuture(compositeFuture);
} catch (Exception ex) {
finalFuture.completeExceptionally(exceptionFactory.apply(ex.getMessage(), null));
}
}
}