commands/source/IotCommandsClientV2.cpp (290 lines of code) (raw):
/* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*
* This file is generated
*/
#include <aws/iotcommands/IotCommandsClientV2.h>
#include <aws/crt/UUID.h>
#include <aws/iotcommands/CommandExecutionEvent.h>
#include <aws/iotcommands/CommandExecutionsSubscriptionRequest.h>
#include <aws/iotcommands/UpdateCommandExecutionRequest.h>
#include <aws/iotcommands/UpdateCommandExecutionResponse.h>
#include <aws/iotcommands/V2ErrorResponse.h>
namespace Aws
{
namespace Iotcommands
{
class ClientV2 : public IClientV2
{
public:
ClientV2(
Aws::Crt::Allocator *allocator,
std::shared_ptr<Aws::Iot::RequestResponse::IMqttRequestResponseClient> bindingClient);
virtual ~ClientV2() = default;
bool UpdateCommandExecution(
const UpdateCommandExecutionRequest &request,
const UpdateCommandExecutionResultHandler &handler) override;
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> CreateCommandExecutionsCborPayloadStream(
const CommandExecutionsSubscriptionRequest &request,
const Aws::Iot::RequestResponse::StreamingOperationOptions<CommandExecutionEvent> &options) override;
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> CreateCommandExecutionsGenericPayloadStream(
const CommandExecutionsSubscriptionRequest &request,
const Aws::Iot::RequestResponse::StreamingOperationOptions<CommandExecutionEvent> &options) override;
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> CreateCommandExecutionsJsonPayloadStream(
const CommandExecutionsSubscriptionRequest &request,
const Aws::Iot::RequestResponse::StreamingOperationOptions<CommandExecutionEvent> &options) override;
private:
Aws::Crt::Allocator *m_allocator;
std::shared_ptr<Aws::Iot::RequestResponse::IMqttRequestResponseClient> m_bindingClient;
};
ClientV2::ClientV2(
Aws::Crt::Allocator *allocator,
std::shared_ptr<Aws::Iot::RequestResponse::IMqttRequestResponseClient> bindingClient)
: m_allocator(allocator), m_bindingClient(std::move(bindingClient))
{
// It's simpler to do this than branch the codegen based on the presence of streaming operations
(void)m_allocator;
}
template <typename R, typename E>
static void s_applyUnmodeledErrorToHandler(const std::function<void(R &&)> &handler, int errorCode)
{
ServiceErrorV2<E> error(errorCode);
R finalResult(std::move(error));
handler(std::move(finalResult));
}
template <typename R, typename E>
static void s_applyModeledErrorToHandler(const std::function<void(R &&)> &handler, E &&modeledError)
{
ServiceErrorV2<E> error(AWS_ERROR_MQTT_REQUEST_RESPONSE_MODELED_SERVICE_ERROR, std::move(modeledError));
R finalResult(std::move(error));
handler(std::move(finalResult));
}
static void s_UpdateCommandExecutionResponseHandler(
Aws::Iot::RequestResponse::UnmodeledResult &&result,
const UpdateCommandExecutionResultHandler &handler,
const Aws::Crt::String &successPathTopic,
const Aws::Crt::String &failurePathTopic)
{
using E = V2ErrorResponse;
using R = Aws::Iot::RequestResponse::Result<UpdateCommandExecutionResponse, ServiceErrorV2<E>>;
if (!result.IsSuccess())
{
s_applyUnmodeledErrorToHandler<R, E>(handler, result.GetError());
return;
}
auto response = result.GetResponse();
const auto &payload = response.GetPayload();
Aws::Crt::String objectStr(reinterpret_cast<char *>(payload.ptr), payload.len);
Aws::Crt::JsonObject jsonObject(objectStr);
if (!jsonObject.WasParseSuccessful())
{
s_applyUnmodeledErrorToHandler<R, E>(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_PAYLOAD_PARSE_ERROR);
return;
}
const auto &topic = response.GetTopic();
auto responseTopic = Aws::Crt::String((const char *)topic.ptr, topic.len);
if (responseTopic == successPathTopic)
{
UpdateCommandExecutionResponse modeledResponse(jsonObject);
Aws::Iot::RequestResponse::Result<UpdateCommandExecutionResponse, ServiceErrorV2<E>> finalResult(
std::move(modeledResponse));
handler(std::move(finalResult));
}
else if (responseTopic == failurePathTopic)
{
V2ErrorResponse modeledError(jsonObject);
s_applyModeledErrorToHandler<R, E>(handler, std::move(modeledError));
}
else
{
s_applyUnmodeledErrorToHandler<R, E>(handler, AWS_ERROR_MQTT_REQUEST_RESPONSE_INVALID_RESPONSE_PATH);
}
}
bool ClientV2::UpdateCommandExecution(
const UpdateCommandExecutionRequest &request,
const UpdateCommandExecutionResultHandler &handler)
{
Aws::Crt::StringStream publishTopicStream;
publishTopicStream << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType) << "/"
<< *request.DeviceId << "/executions/" << *request.ExecutionId << "/response/json";
Aws::Crt::String publishTopic = publishTopicStream.str();
Aws::Crt::StringStream subscriptionTopicStream0;
subscriptionTopicStream0 << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType) << "/"
<< *request.DeviceId << "/executions/" << *request.ExecutionId
<< "/response/accepted/json";
Aws::Crt::String subscriptionTopic0 = subscriptionTopicStream0.str();
Aws::Crt::StringStream subscriptionTopicStream1;
subscriptionTopicStream1 << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType) << "/"
<< *request.DeviceId << "/executions/" << *request.ExecutionId
<< "/response/rejected/json";
Aws::Crt::String subscriptionTopic1 = subscriptionTopicStream1.str();
struct aws_byte_cursor subscriptionTopicFilters[2] = {
Aws::Crt::ByteCursorFromString(subscriptionTopic0),
Aws::Crt::ByteCursorFromString(subscriptionTopic1),
};
Aws::Crt::StringStream responsePathTopicAcceptedStream;
responsePathTopicAcceptedStream << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType)
<< "/" << *request.DeviceId << "/executions/" << *request.ExecutionId
<< "/response/accepted/json";
Aws::Crt::String responsePathTopicAccepted = responsePathTopicAcceptedStream.str();
Aws::Crt::StringStream responsePathTopicRejectedStream;
responsePathTopicRejectedStream << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType)
<< "/" << *request.DeviceId << "/executions/" << *request.ExecutionId
<< "/response/rejected/json";
Aws::Crt::String responsePathTopicRejected = responsePathTopicRejectedStream.str();
struct aws_mqtt_request_operation_response_path responsePaths[2];
responsePaths[0].topic = Aws::Crt::ByteCursorFromString(responsePathTopicAccepted);
responsePaths[1].topic = Aws::Crt::ByteCursorFromString(responsePathTopicRejected);
AWS_ZERO_STRUCT(responsePaths[0].correlation_token_json_path);
AWS_ZERO_STRUCT(responsePaths[1].correlation_token_json_path);
Aws::Crt::JsonObject jsonObject;
request.SerializeToObject(jsonObject);
Aws::Crt::String outgoingJson = jsonObject.View().WriteCompact(true);
struct aws_mqtt_request_operation_options options;
AWS_ZERO_STRUCT(options);
options.subscription_topic_filters = subscriptionTopicFilters;
options.subscription_topic_filter_count = 2;
options.response_paths = responsePaths;
options.response_path_count = 2;
options.publish_topic = Aws::Crt::ByteCursorFromString(publishTopic);
options.serialized_request =
Aws::Crt::ByteCursorFromArray((uint8_t *)outgoingJson.data(), outgoingJson.length());
auto resultHandler = [handler, responsePathTopicAccepted, responsePathTopicRejected](
Aws::Iot::RequestResponse::UnmodeledResult &&result)
{
s_UpdateCommandExecutionResponseHandler(
std::move(result), handler, responsePathTopicAccepted, responsePathTopicRejected);
};
int submitResult = m_bindingClient->SubmitRequest(options, std::move(resultHandler));
return submitResult == AWS_OP_SUCCESS;
}
static struct aws_byte_cursor s_getSegmentFromTopic(const Aws::Crt::ByteCursor &topic, int segment)
{
struct aws_byte_cursor topicSegment;
AWS_ZERO_STRUCT(topicSegment);
int segmentPosition = 0;
while (aws_byte_cursor_next_split(&topic, '/', &topicSegment))
{
if (segmentPosition == segment)
{
return topicSegment;
}
++segmentPosition;
}
return {};
}
static bool s_initModeledEvent(
const Aws::Iot::RequestResponse::IncomingPublishEvent &publishEvent,
CommandExecutionEvent &modeledEvent)
{
auto segmentExecutionId = s_getSegmentFromTopic(publishEvent.GetTopic(), 5);
modeledEvent.SetExecutionId(segmentExecutionId);
modeledEvent.SetPayload(publishEvent.GetPayload());
auto contentType = publishEvent.GetContentType();
if (contentType)
{
modeledEvent.SetContentType(*contentType);
}
auto messageExpiryIntervalSeconds = publishEvent.GetMessageExpiryIntervalSeconds();
if (messageExpiryIntervalSeconds)
{
modeledEvent.SetTimeout(*messageExpiryIntervalSeconds);
}
return true;
}
template <typename T> class ServiceStreamingOperation : public Aws::Iot::RequestResponse::IStreamingOperation
{
public:
explicit ServiceStreamingOperation(std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> stream)
: m_stream(std::move(stream))
{
}
static std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> Create(
Aws::Crt::Allocator *allocator,
const std::shared_ptr<Aws::Iot::RequestResponse::IMqttRequestResponseClient> &bindingClient,
const Aws::Crt::String &subscriptionTopicFilter,
const Aws::Iot::RequestResponse::StreamingOperationOptions<T> &options)
{
std::function<void(Aws::Iot::RequestResponse::IncomingPublishEvent &&)> unmodeledHandler =
[options](Aws::Iot::RequestResponse::IncomingPublishEvent &&publishEvent)
{
T modeledEvent;
if (!s_initModeledEvent(publishEvent, modeledEvent))
{
return;
}
options.GetStreamHandler()(std::move(modeledEvent));
};
Aws::Iot::RequestResponse::StreamingOperationOptionsInternal internalOptions;
internalOptions.subscriptionTopicFilter = Aws::Crt::ByteCursorFromString(subscriptionTopicFilter);
internalOptions.subscriptionStatusEventHandler = options.GetSubscriptionStatusEventHandler();
internalOptions.incomingPublishEventHandler = unmodeledHandler;
auto unmodeledStream = bindingClient->CreateStream(internalOptions);
if (!unmodeledStream)
{
return nullptr;
}
return Aws::Crt::MakeShared<ServiceStreamingOperation<T>>(allocator, unmodeledStream);
}
void Open() override { m_stream->Open(); }
private:
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> m_stream;
};
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> ClientV2::
CreateCommandExecutionsCborPayloadStream(
const CommandExecutionsSubscriptionRequest &request,
const Aws::Iot::RequestResponse::StreamingOperationOptions<CommandExecutionEvent> &options)
{
Aws::Crt::StringStream topicStream;
topicStream << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType) << "/"
<< *request.DeviceId << "/executions/+/request/cbor";
Aws::Crt::String topic = topicStream.str();
return ServiceStreamingOperation<CommandExecutionEvent>::Create(
m_allocator, m_bindingClient, topic, options);
}
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> ClientV2::
CreateCommandExecutionsGenericPayloadStream(
const CommandExecutionsSubscriptionRequest &request,
const Aws::Iot::RequestResponse::StreamingOperationOptions<CommandExecutionEvent> &options)
{
Aws::Crt::StringStream topicStream;
topicStream << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType) << "/"
<< *request.DeviceId << "/executions/+/request";
Aws::Crt::String topic = topicStream.str();
return ServiceStreamingOperation<CommandExecutionEvent>::Create(
m_allocator, m_bindingClient, topic, options);
}
std::shared_ptr<Aws::Iot::RequestResponse::IStreamingOperation> ClientV2::
CreateCommandExecutionsJsonPayloadStream(
const CommandExecutionsSubscriptionRequest &request,
const Aws::Iot::RequestResponse::StreamingOperationOptions<CommandExecutionEvent> &options)
{
Aws::Crt::StringStream topicStream;
topicStream << "$aws/commands/" << DeviceTypeMarshaller::ToString(*request.DeviceType) << "/"
<< *request.DeviceId << "/executions/+/request/json";
Aws::Crt::String topic = topicStream.str();
return ServiceStreamingOperation<CommandExecutionEvent>::Create(
m_allocator, m_bindingClient, topic, options);
}
std::shared_ptr<IClientV2> NewClientFrom5(
const Aws::Crt::Mqtt5::Mqtt5Client &protocolClient,
const Aws::Iot::RequestResponse::RequestResponseClientOptions &options,
Aws::Crt::Allocator *allocator)
{
std::shared_ptr<Aws::Iot::RequestResponse::IMqttRequestResponseClient> bindingClient =
Aws::Iot::RequestResponse::NewClientFrom5(protocolClient, options, allocator);
if (nullptr == bindingClient)
{
return nullptr;
}
return Aws::Crt::MakeShared<ClientV2>(allocator, allocator, bindingClient);
}
std::shared_ptr<IClientV2> NewClientFrom311(
const Aws::Crt::Mqtt::MqttConnection &protocolClient,
const Aws::Iot::RequestResponse::RequestResponseClientOptions &options,
Aws::Crt::Allocator *allocator)
{
std::shared_ptr<Aws::Iot::RequestResponse::IMqttRequestResponseClient> bindingClient =
Aws::Iot::RequestResponse::NewClientFrom311(protocolClient, options, allocator);
if (nullptr == bindingClient)
{
return nullptr;
}
return Aws::Crt::MakeShared<ClientV2>(allocator, allocator, bindingClient);
}
} // namespace Iotcommands
} // namespace Aws