docker_images/c/wrapper/glue/InternalGlue.cpp (347 lines of code) (raw):
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
// InternalGlue.cpp is inherited as a polymorphic class by DeviceGlue and ModuleGlue.
#include "InternalGlue.h"
#include "GlueUtils.h"
#include "json.h"
#ifndef MU_ENUM_TO_STRING
#define MU_ENUM_TO_STRING ENUM_TO_STRING
#endif
InternalGlue::InternalGlue()
{
IoTHub_Init();
}
InternalGlue::~InternalGlue()
{
}
static void connectionStatusCallback(IOTHUB_CLIENT_CONNECTION_STATUS result, IOTHUB_CLIENT_CONNECTION_STATUS_REASON reason, void *user_context)
{
(void)reason;
(void)user_context;
// DOES NOT TAKE INTO ACCOUNT NETWORK OUTAGES
::time_t timetime = ::time(nullptr);
if (result == IOTHUB_CLIENT_CONNECTION_AUTHENTICATED)
{
std::cout << ::asctime(::localtime(&timetime)) << "the client (" << user_context << ") is connected to edgehub / iothub" << std::endl;
}
else
{
std::cout << ::asctime(::localtime(&timetime)) << "the client (" << user_context << ") has been disconnected" << std::endl;
}
}
void setConnectionStatusCallback(IOTHUB_CLIENT_CORE_HANDLE client)
{
IOTHUB_CLIENT_RESULT ret;
if (!client)
{
throw new std::runtime_error("client is not opened");
}
// Setting connection status callback to get indication of connection to edgehub / iothub
ret = IoTHubClientCore_SetConnectionStatusCallback(client, connectionStatusCallback, client);
ThrowIfFailed(ret, "IoTHubClientCore_SetConnectionStatusCallback");
}
std::string InternalGlue::Connect(const char *transportType, std::string connectionString, std::string caCertificate)
{
// NOTE: Currently not using the caCertificate. The TLS Handshake between the module and edgeHub will fail
// unless this cert is in the trusted certificate store.
std::cout << "InternalGlue::Connect for " << transportType << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client;
IOTHUB_CLIENT_TRANSPORT_PROVIDER protocol = protocolFromTransportName(transportType);
if ((client = IoTHubClientCore_CreateFromConnectionString(connectionString.c_str(), protocol)) == NULL)
{
throw new std::runtime_error("failed to create client");
}
else
{
char address[32];
sprintf(address, "%p", client);
std::cout << "InternalGlue::Connect Client Pointer: " << address << std::endl;
bool traceOn = true;
bool rawTraceOn = true;
size_t sasTokenLifetime = 3600;
IoTHubClientCore_SetOption(client, "logtrace", &traceOn);
IoTHubClientCore_SetOption(client, "rawlogtrace", &rawTraceOn);
IoTHubClientCore_SetOption(client, "sas_token_lifetime", &sasTokenLifetime);
std::string clientId = getNextClientId();
this->clientMap[clientId] = (void *)client;
setConnectionStatusCallback(client);
std::string ret = "{ \"connectionId\" : \"" + clientId + "\"}";
std::cout << "returning " << ret << std::endl;
return ret;
}
}
void InternalGlue::Disconnect(std::string connectionId)
{
std::cout << "InternalGlue::Disconnect for " << connectionId << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (client)
{
this->clientMap.erase(connectionId);
std::cout << "Destroying client(" << (void*)client << ")" << std::endl;
IoTHubClientCore_Destroy(client);
}
twin_callback_struct *twin_cb = (twin_callback_struct *)this->twinMap[connectionId];
if (twin_cb)
{
this->twinMap.erase(connectionId);
delete twin_cb;
}
}
void InternalGlue::EnableMethods(std::string connectionId)
{
std::cout << "InternalGlue::EnableMethods for " << connectionId << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (!client)
{
throw new std::runtime_error("client is not opened");
}
}
std::string add_patch_to_twin(std::string prev_complete_twin, std::string patch)
{
// add twin patch to complete twin
JSON_Value *twin_root_value;
JSON_Value *patch_root_value;
JSON_Object *twin_root_object;
if ((twin_root_value = json_parse_string(prev_complete_twin.c_str())) == NULL)
{
throw new std::runtime_error("parson error");
}
else if ((patch_root_value = json_parse_string(patch.c_str())) == NULL)
{
throw new std::runtime_error("parson error");
}
else if ((twin_root_object = json_value_get_object(twin_root_value)) == NULL)
{
throw new std::runtime_error("parson error");
}
else if ((json_object_set_value(twin_root_object, "desired", patch_root_value)) != JSONSuccess)
{
throw new std::runtime_error("parson error");
}
std::string updated_twin_s = std::string(json_serialize_to_string(twin_root_value));
json_value_free(twin_root_value); //implicitly frees twin_root_object and patch_root_value as well
return updated_twin_s;
}
void twinCallback(DEVICE_TWIN_UPDATE_STATE update_state, const unsigned char *payLoad, const size_t size, void *userContextCallback)
{
std::cout << "twinCallback called with state " << update_state << std::endl;
twin_callback_struct *response = (twin_callback_struct *)userContextCallback;
response->latest_payload = std::string(reinterpret_cast<const char *>(payLoad), size);
if (update_state == DEVICE_TWIN_UPDATE_COMPLETE)
{
// the device twin update is a total twin update
response->current_complete = std::string(reinterpret_cast<const char *>(payLoad), size);
std::cout << "complete twin:" << response->current_complete << std::endl;
}
else if (update_state == DEVICE_TWIN_UPDATE_PARTIAL)
{
// the device twin update is a patch, so we should only patch
response->current_complete = add_patch_to_twin(response->current_complete, response->latest_payload);
std::cout << "latest payload:" << response->latest_payload << std::endl;
std::cout << "complete twin: " << response->current_complete << std::endl;
response->cvp.notify_one();
}
response->cv.notify_one();
}
void InternalGlue::EnableTwin(std::string connectionId)
{
IOTHUB_CLIENT_RESULT ret;
std::cout << "InternalGlue::EnableTwin for " << connectionId << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (!client)
{
throw new std::runtime_error("client is not opened");
}
twin_callback_struct *resp = new twin_callback_struct;
ret = IoTHubClientCore_SetDeviceTwinCallback(client, twinCallback, resp);
ThrowIfFailed(ret, "IoTHubClientCore_SetDeviceTwinCallback");
std::cout << "waiting for initial Twin response" << std::endl;
{
std::unique_lock<std::mutex> lk(resp->m);
resp->cv.wait(lk);
}
std::cout << "initial Twin response received" << std::endl;
if (resp->latest_payload.empty())
{
throw new std::runtime_error("twin not enabled");
}
this->twinMap[connectionId] = (void *)resp;
}
void InternalGlue::SendEvent(std::string connectionId, std::string eventBody)
{
std::cout << "InternalGlue::SendEvent for " << connectionId << std::endl;
std::cout << eventBody << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (!client)
{
throw new std::runtime_error("client is not opened");
}
std::mutex m;
std::condition_variable cv;
IOTHUB_MESSAGE_HANDLE message = stringToMessage(eventBody);
std::cout << "calling IoTHubClient_SendEventAsync" << std::endl;
IOTHUB_CLIENT_RESULT ret = IoTHubClientCore_SendEventAsync(client, message, sendEventCallback, &cv);
ThrowIfFailed(ret, "IoTHubClientCore_SendEventAsync");
std::cout << "waiting for send confirmation" << std::endl;
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk);
}
std::cout << "send confirmation received" << std::endl;
}
IOTHUBMESSAGE_DISPOSITION_RESULT receiveMessageCallback(IOTHUB_MESSAGE_HANDLE message, void *userContextCallback)
{
std::cout << "receiveMessageCallback called" << std::endl;
message_response_struct *response = (message_response_struct *)userContextCallback;
const char *str = IoTHubMessage_GetString(message);
if (str)
{
response->response_string = str;
}
else
{
const unsigned char *buffer;
size_t size;
IOTHUB_MESSAGE_RESULT ret = IoTHubMessage_GetByteArray(message, &buffer, &size);
if (ret == IOTHUB_MESSAGE_OK)
{
response->response_string.assign((const char *)buffer, size);
}
else
{
response->response_string = std::string("WARNING: IoTHubMessage_GetByteArray returned ") + MU_ENUM_TO_STRING(IOTHUB_MESSAGE_RESULT, ret);
std::cout << response->response_string << std::endl;
}
}
std::cout << "string == " << response->response_string << std::endl;
response->cv.notify_one();
return IOTHUBMESSAGE_ACCEPTED;
}
int methodCallback(const char *method_name, const unsigned char *payload, const size_t size, unsigned char **response, size_t *response_size, void *userContextCallback)
{
std::cout << "methodCallback called" << std::endl;
int result;
method_callback_struct *cb_data = (method_callback_struct *)userContextCallback;
cb_data->actual_method_name = std::string(reinterpret_cast<const char *>(method_name));
cb_data->actual_request_payload = std::string(reinterpret_cast<const char *>(payload), size);
if (cb_data->actual_method_name.compare(cb_data->expected_method_name) == 0)
{
if (cb_data->expected_request_payload.compare(cb_data->actual_request_payload) == 0)
{
std::cout << "method and payload matched. returning response" << std::endl;
*response_size = cb_data->response.length();
*response = (unsigned char *)malloc(*response_size);
if (response == NULL)
{
throw new std::runtime_error("failed to allocate memory for response");
};
(void)memcpy(*response, cb_data->response.c_str(), *response_size);
result = cb_data->status_code;
}
else
{
std::cout << "request payload doesn't match" << std::endl;
std::cout << "expected: " << cb_data->expected_request_payload << std::endl;
std::cout << "received: " << cb_data->actual_request_payload << std::endl;
result = 500;
}
}
else
{
std::cout << "method name doesn't match" << std::endl;
std::cout << "expected: " << cb_data->expected_method_name << std::endl;
std::cout << "received: " << cb_data->actual_method_name << std::endl;
result = 404;
}
cb_data->cv.notify_one();
return result;
}
void InternalGlue::WaitForMethodAndReturnResponse(std::string connectionId, std::string methodName, std::string requestAndResponse)
{
IOTHUB_CLIENT_RESULT ret;
std::cout << "InternalGlue::WaitForMethodAndReturnResponse for " << connectionId << " and " << methodName << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (!client)
{
throw new std::runtime_error("client is not opened");
}
std::string expectedRequest, response;
int statusCode;
parseMethodRequestAndResponse(requestAndResponse, &expectedRequest, &response, &statusCode);
method_callback_struct cb_data;
cb_data.expected_method_name = methodName;
cb_data.expected_request_payload = expectedRequest;
cb_data.response = response;
cb_data.status_code = statusCode;
ret = IoTHubClientCore_SetDeviceMethodCallback(client, methodCallback, &cb_data);
ThrowIfFailed(ret, "IoTHubClientCore_SetDeviceMethodCallback");
std::cout << "waiting for method call" << std::endl;
{
std::unique_lock<std::mutex> lk(cb_data.m);
cb_data.cv.wait(lk);
}
std::cout << "method call received" << std::endl;
}
std::string InternalGlue::WaitForDesiredPropertyPatch(std::string connectionId)
{
std::cout << "InternalGlue::WaitForDesiredPropertyPatch for " << connectionId << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (!client)
{
throw new std::runtime_error("client is not opened");
}
twin_callback_struct *resp = (twin_callback_struct *)(this->twinMap[connectionId]);
if (!resp)
{
throw new std::runtime_error("no twin callback struct");
}
std::cout << "waiting for Twin patch response" << std::endl;
{
std::unique_lock<std::mutex> lk(resp->m);
resp->cv.wait(lk);
}
std::cout << "Twin patch response received" << std::endl;
return addJsonWrapperObject(resp->latest_payload, "desired");
}
std::string InternalGlue::GetTwin(std::string connectionId)
{
std::cout << "InternalGlue::GetTwin for " << connectionId << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (!client)
{
throw new std::runtime_error("client is not opened");
}
twin_callback_struct *resp = (twin_callback_struct *)(this->twinMap[connectionId]);
if (!resp)
{
throw new std::runtime_error("no twin callback struct");
}
return resp->current_complete;
}
void reportedStateCallback(int status_code, void *userContextCallback)
{
std::cout << "reportedStateCallback called with " << status_code << std::endl;
std::condition_variable *cv = (std::condition_variable *)userContextCallback;
cv->notify_one();
}
void InternalGlue::SendTwinPatch(std::string connectionId, std::string props)
{
std::cout << "InternalGlue::SendTwinPatch for " << connectionId << std::endl;
std::cout << props << std::endl;
IOTHUB_CLIENT_CORE_HANDLE client = (IOTHUB_CLIENT_CORE_HANDLE)this->clientMap[connectionId];
if (!client)
{
throw new std::runtime_error("client is not opened");
}
std::mutex m;
std::condition_variable cv;
std::string reported = Json(props).getSubObject("reported");
IOTHUB_CLIENT_RESULT res = IoTHubClientCore_SendReportedState(client, (const unsigned char *)reported.c_str(), reported.length(), reportedStateCallback, &cv);
ThrowIfFailed(res, "IoTHubClientCore_SendReportedState");
std::cout << "waiting for send reported state confirmation" << std::endl;
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk);
}
std::cout << "send reported state confirmation received" << std::endl;
}
void InternalGlue::CleanupResources()
{
std::cout << "InternalGlue::CleanupResources called" << std::endl;
// copy the map since we're removing things from it while we're iterating over it.
std::map<std::string, void *> mapCopy = this->clientMap;
for (auto iter = mapCopy.begin(); iter != mapCopy.end(); ++iter)
{
std::cout << "missed cleanup of " << iter->first << std::endl;
this->Disconnect(iter->first);
}
}