in source/jobs/JobsFeature.cpp [356:488]
void JobsFeature::publishUpdateJobExecutionStatus(
JobExecutionData data,
JobExecutionStatusInfo statusInfo,
function<void(void)> onCompleteCallback)
{
LOG_DEBUG(TAG, "Attempting to update job execution status!");
Aws::Crt::Map<Aws::Crt::String, Aws::Crt::String> statusDetails;
if (!statusInfo.reason.empty())
{
statusDetails["reason"] = statusInfo.reason.substr(0, MAX_STATUS_DETAIL_LENGTH).c_str();
}
if (!statusInfo.stdoutput.empty())
{
// We want the most recent output since we can only include 1024 characters in the job execution update
int startPos = statusInfo.stdoutput.size() > MAX_STATUS_DETAIL_LENGTH
? statusInfo.stdoutput.size() - MAX_STATUS_DETAIL_LENGTH
: 0;
// TODO We need to add filtering of invalid characters for the status details that may come from weird
// process output. The valid values for a statusDetail value are '[^\p{C}]+ which translates into
// "everything other than invisible control characters and unused code points" (See
// http://www.unicode.org/reports/tr18/#General_Category_Property)
statusDetails["stdout"] = statusInfo.stdoutput.substr(startPos, statusInfo.stdoutput.size()).c_str();
}
else
{
LOG_DEBUG(TAG, "Not including stdout with the status details");
}
if (!statusInfo.stderror.empty())
{
int startPos = statusInfo.stderror.size() > MAX_STATUS_DETAIL_LENGTH
? statusInfo.stderror.size() - MAX_STATUS_DETAIL_LENGTH
: 0;
statusDetails["stderr"] = statusInfo.stderror.substr(startPos, statusInfo.stderror.size()).c_str();
}
else
{
LOG_DEBUG(TAG, "Not including stderr with the status details");
}
/** When we update the job execution status, we need to perform an exponential
* backoff in case our request gets throttled. Otherwise, if we never properly
* update the job execution status, we'll never receive the next job
*/
Retry::ExponentialRetryConfig retryConfig = {10 * 1000, 640 * 1000, -1, &needStop};
if (needStop.load())
{
// If we need to stop the Jobs feature, then we're making a best-effort attempt here
// to update the job execution status prior to shutting down rather than infinite backoff
retryConfig.maxRetries = 3;
retryConfig.needStopFlag = nullptr;
}
auto publishLambda = [this, data, statusInfo, statusDetails]() -> bool {
// We first need to make sure that we haven't previously leaked any promises into our map
unique_lock<mutex> leakLock(updateJobExecutionPromisesLock);
for (auto keyPromise = updateJobExecutionPromises.cbegin(); keyPromise != updateJobExecutionPromises.cend();
/** no increment here **/)
{
if (keyPromise->second.isExpired())
{
LOGM_DEBUG(
TAG,
"Removing expired promise for ClientToken %s from the updateJobExecution promise map",
keyPromise->first.c_str());
keyPromise = updateJobExecutionPromises.erase(keyPromise);
}
else
{
++keyPromise;
}
}
leakLock.unlock();
UpdateJobExecutionRequest request;
request.JobId = data.JobId->c_str();
request.ThingName = this->thingName.c_str();
request.Status = statusInfo.status;
request.StatusDetails = statusDetails;
// Create a unique client token each time we attempt the request since the promise has to be fresh
string clientToken = UniqueString::GetRandomToken(10);
request.ClientToken = Aws::Crt::Optional<Aws::Crt::String>(clientToken.c_str());
unique_lock<mutex> writeLock(updateJobExecutionPromisesLock);
this->updateJobExecutionPromises.insert(std::pair<Aws::Crt::String, EphemeralPromise<int>>(
clientToken.c_str(), EphemeralPromise<int>(std::chrono::milliseconds(15 * 1000))));
writeLock.unlock();
LOGM_DEBUG(
TAG,
"Created EphermalPromise for ClientToken %s in the updateJobExecution promises map",
clientToken.c_str());
this->jobsClient->PublishUpdateJobExecution(
request,
AWS_MQTT_QOS_AT_LEAST_ONCE,
std::bind(&JobsFeature::ackUpdateJobExecutionStatus, this, std::placeholders::_1));
unique_lock<mutex> futureLock(updateJobExecutionPromisesLock);
future<int> updateFuture = this->updateJobExecutionPromises.at(clientToken.c_str()).get_future();
futureLock.unlock();
bool success = false;
// Although this entire block will be retried based on the retryConfig, we're only waiting for a maximum of 10
// seconds for each individual response
if (std::future_status::timeout == updateFuture.wait_for(std::chrono::seconds(10)))
{
LOG_WARN(TAG, "Timeout waiting for ack from PublishUpdateJobExecution");
}
else
{
int responseCode = updateFuture.get();
if (responseCode != 0)
{
LOGM_WARN(
TAG,
"Received a non-zero response after publishing an UpdateJobExecution request: %d",
responseCode);
}
else
{
LOGM_DEBUG(TAG, "Success response after UpdateJobExecution for job %s", data.JobId->c_str());
success = true;
}
}
unique_lock<mutex> eraseLock(updateJobExecutionPromisesLock);
this->updateJobExecutionPromises.erase(clientToken.c_str());
return success;
};
std::thread updateJobExecutionThread([retryConfig, publishLambda, onCompleteCallback] {
Retry::exponentialBackoff(retryConfig, publishLambda, onCompleteCallback);
});
updateJobExecutionThread.detach();
}