void JobsFeature::publishUpdateJobExecutionStatus()

in source/jobs/JobsFeature.cpp [356:488]


void JobsFeature::publishUpdateJobExecutionStatus(
    JobExecutionData data,
    JobExecutionStatusInfo statusInfo,
    function<void(void)> onCompleteCallback)
{
    LOG_DEBUG(TAG, "Attempting to update job execution status!");

    Aws::Crt::Map<Aws::Crt::String, Aws::Crt::String> statusDetails;

    if (!statusInfo.reason.empty())
    {
        statusDetails["reason"] = statusInfo.reason.substr(0, MAX_STATUS_DETAIL_LENGTH).c_str();
    }

    if (!statusInfo.stdoutput.empty())
    {
        // We want the most recent output since we can only include 1024 characters in the job execution update
        int startPos = statusInfo.stdoutput.size() > MAX_STATUS_DETAIL_LENGTH
                           ? statusInfo.stdoutput.size() - MAX_STATUS_DETAIL_LENGTH
                           : 0;
        // TODO We need to add filtering of invalid characters for the status details that may come from weird
        // process output. The valid values for a statusDetail value are '[^\p{C}]+ which translates into
        // "everything other than invisible control characters and unused code points" (See
        // http://www.unicode.org/reports/tr18/#General_Category_Property)
        statusDetails["stdout"] = statusInfo.stdoutput.substr(startPos, statusInfo.stdoutput.size()).c_str();
    }
    else
    {
        LOG_DEBUG(TAG, "Not including stdout with the status details");
    }
    if (!statusInfo.stderror.empty())
    {
        int startPos = statusInfo.stderror.size() > MAX_STATUS_DETAIL_LENGTH
                           ? statusInfo.stderror.size() - MAX_STATUS_DETAIL_LENGTH
                           : 0;
        statusDetails["stderr"] = statusInfo.stderror.substr(startPos, statusInfo.stderror.size()).c_str();
    }
    else
    {
        LOG_DEBUG(TAG, "Not including stderr with the status details");
    }

    /** When we update the job execution status, we need to perform an exponential
     * backoff in case our request gets throttled. Otherwise, if we never properly
     * update the job execution status, we'll never receive the next job
     */
    Retry::ExponentialRetryConfig retryConfig = {10 * 1000, 640 * 1000, -1, &needStop};
    if (needStop.load())
    {
        // If we need to stop the Jobs feature, then we're making a best-effort attempt here
        // to update the job execution status prior to shutting down rather than infinite backoff
        retryConfig.maxRetries = 3;
        retryConfig.needStopFlag = nullptr;
    }

    auto publishLambda = [this, data, statusInfo, statusDetails]() -> bool {
        // We first need to make sure that we haven't previously leaked any promises into our map
        unique_lock<mutex> leakLock(updateJobExecutionPromisesLock);
        for (auto keyPromise = updateJobExecutionPromises.cbegin(); keyPromise != updateJobExecutionPromises.cend();
             /** no increment here **/)
        {
            if (keyPromise->second.isExpired())
            {
                LOGM_DEBUG(
                    TAG,
                    "Removing expired promise for ClientToken %s from the updateJobExecution promise map",
                    keyPromise->first.c_str());
                keyPromise = updateJobExecutionPromises.erase(keyPromise);
            }
            else
            {
                ++keyPromise;
            }
        }
        leakLock.unlock();

        UpdateJobExecutionRequest request;
        request.JobId = data.JobId->c_str();
        request.ThingName = this->thingName.c_str();
        request.Status = statusInfo.status;
        request.StatusDetails = statusDetails;

        // Create a unique client token each time we attempt the request since the promise has to be fresh
        string clientToken = UniqueString::GetRandomToken(10);
        request.ClientToken = Aws::Crt::Optional<Aws::Crt::String>(clientToken.c_str());
        unique_lock<mutex> writeLock(updateJobExecutionPromisesLock);
        this->updateJobExecutionPromises.insert(std::pair<Aws::Crt::String, EphemeralPromise<int>>(
            clientToken.c_str(), EphemeralPromise<int>(std::chrono::milliseconds(15 * 1000))));
        writeLock.unlock();
        LOGM_DEBUG(
            TAG,
            "Created EphermalPromise for ClientToken %s in the updateJobExecution promises map",
            clientToken.c_str());

        this->jobsClient->PublishUpdateJobExecution(
            request,
            AWS_MQTT_QOS_AT_LEAST_ONCE,
            std::bind(&JobsFeature::ackUpdateJobExecutionStatus, this, std::placeholders::_1));
        unique_lock<mutex> futureLock(updateJobExecutionPromisesLock);
        future<int> updateFuture = this->updateJobExecutionPromises.at(clientToken.c_str()).get_future();
        futureLock.unlock();
        bool success = false;
        // Although this entire block will be retried based on the retryConfig, we're only waiting for a maximum of 10
        // seconds for each individual response
        if (std::future_status::timeout == updateFuture.wait_for(std::chrono::seconds(10)))
        {
            LOG_WARN(TAG, "Timeout waiting for ack from PublishUpdateJobExecution");
        }
        else
        {
            int responseCode = updateFuture.get();
            if (responseCode != 0)
            {
                LOGM_WARN(
                    TAG,
                    "Received a non-zero response after publishing an UpdateJobExecution request: %d",
                    responseCode);
            }
            else
            {
                LOGM_DEBUG(TAG, "Success response after UpdateJobExecution for job %s", data.JobId->c_str());
                success = true;
            }
        }
        unique_lock<mutex> eraseLock(updateJobExecutionPromisesLock);
        this->updateJobExecutionPromises.erase(clientToken.c_str());
        return success;
    };
    std::thread updateJobExecutionThread([retryConfig, publishLambda, onCompleteCallback] {
        Retry::exponentialBackoff(retryConfig, publishLambda, onCompleteCallback);
    });
    updateJobExecutionThread.detach();
}