in sdk/src/main/java/software/amazon/awssdk/iot/iotjobs/IotJobsV2Client.java [430:460]
private <T, E> void submitOperation(V2ClientFuture<T> finalFuture, RequestResponseOperation operation, String responseTopic, Class<T> responseClass, String errorTopic, Class<E> errorClass, BiFunction<String, E, Throwable> exceptionFactory) {
try {
CompletableFuture<MqttRequestResponse> responseFuture = this.rrClient.submitRequest(operation);
CompletableFuture<MqttRequestResponse> compositeFuture = responseFuture.whenComplete((res, ex) -> {
if (ex != null) {
finalFuture.completeExceptionally(exceptionFactory.apply(ex.getMessage(), null));
} else if (res.getTopic().equals(responseTopic)) {
try {
String payload = new String(res.getPayload(), StandardCharsets.UTF_8);
T response = this.gson.fromJson(payload, responseClass);
finalFuture.complete(response);
} catch (Exception e) {
finalFuture.completeExceptionally(exceptionFactory.apply(e.getMessage(), null));
}
} else if (res.getTopic().equals(errorTopic)) {
try {
String payload = new String(res.getPayload(), StandardCharsets.UTF_8);
E error = this.gson.fromJson(payload, errorClass);
finalFuture.completeExceptionally(exceptionFactory.apply("Request-response operation failure", error));
} catch (Exception e) {
finalFuture.completeExceptionally(exceptionFactory.apply(e.getMessage(), null));
}
} else {
finalFuture.completeExceptionally(exceptionFactory.apply("Request-response operation completed on unknown topic: " + res.getTopic(), null));
}
});
finalFuture.setTriggeringFuture(compositeFuture);
} catch (Exception ex) {
finalFuture.completeExceptionally(exceptionFactory.apply(ex.getMessage(), null));
}
}