static void rsaShmServer_msgHandlingWork()

in bundles/remote_services/remote_service_admin_shm_v2/rsa_shm/src/rsa_shm_server.c [169:256]


static void rsaShmServer_msgHandlingWork(void *data) {
    assert(data != NULL);
    int status = CELIX_SUCCESS;
    struct rsa_shm_server_thpool_work_data *workData = data;
    rsa_shm_server_t *server = workData->server;
    assert(server != NULL);

    rsa_shm_msg_control_t *msgCtrl = (rsa_shm_msg_control_t *)workData->msgCtrl;
    char *msgBuffer = (char*)workData->msgBody;
    const char *metaDataString = msgBuffer;
    char *requestData = msgBuffer + workData->metadataSize;

    celix_properties_t *metadataProps = NULL;
    if (workData->metadataSize != 0) {
        if (celix_properties_loadFromString(metaDataString, 0, &metadataProps) != CELIX_SUCCESS) {
            celix_logHelper_warning(server->loghelper, "RsaShmServer: Parse metadata failed.");
            celix_logHelper_logTssErrors(server->loghelper, CELIX_LOG_LEVEL_WARNING);
        }
    }

    struct iovec reply = {NULL, 0};
    struct iovec request = {requestData, workData->requestSize};
    status = server->revCB(server->revCBHandle, server, metadataProps, &request, &reply);
    if (status != CELIX_SUCCESS || reply.iov_base == NULL || reply.iov_len == 0) {
        celix_logHelper_error(server->loghelper, "RsaShmServer: Call receive msg callback failed. Error data:%d, %p, %zu.",
                status, reply.iov_base, reply.iov_len);
        goto call_receive_cb_failed;
    }

    char *src = reply.iov_base;
    size_t srcSize = reply.iov_len;
    int waitRet = 0;
    pthread_mutex_lock(&msgCtrl->lock);
    while (true) {
        if (msgCtrl->msgState == REQ_CANCELLED || waitRet != 0) {
            pthread_mutex_unlock(&msgCtrl->lock);
            celix_logHelper_error(server->loghelper, "RsaShmServer: Client cancelled the request, or timeout. %d.", waitRet);
            goto reply_err;
        }
        size_t destSize = workData->msgBodyTotalSize;
        char *dest = msgBuffer;
        size_t bytes = MIN(srcSize, destSize);
        memcpy(dest, src, bytes);
        src += bytes;
        srcSize -= bytes;
        if (srcSize == 0) {
            msgCtrl->msgState = REPLIED;
            msgCtrl->actualReplyedSize = bytes;
            //Signaling the condition variable first, and then unlocking the mutex, because client will free ctrl when msgState is REPLIED.
            pthread_cond_signal(&msgCtrl->signal);
            break;
        } else {
            msgCtrl->msgState = REPLYING;
            msgCtrl->actualReplyedSize = bytes;
            pthread_cond_signal(&msgCtrl->signal);

            struct timespec timeout = celix_gettime(CLOCK_MONOTONIC);
            timeout.tv_sec += server->msgTimeOutInSec;
            while (msgCtrl->msgState == REPLYING && waitRet == 0) {
                //pthread_cond_timedwait shall not return an error code of [EINTR]. @ref https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html
                waitRet = pthread_cond_timedwait(&msgCtrl->signal, &msgCtrl->lock, &timeout);
            }
        }
    }
    pthread_mutex_unlock(&msgCtrl->lock);


    free(reply.iov_base);
    if (metadataProps != NULL) {
        celix_properties_destroy(metadataProps);
    }
    shmCache_releaseMemoryPtr(server->shmCache, msgBuffer);
    shmCache_releaseMemoryPtr(server->shmCache, msgCtrl);
    free(data);
    return;

reply_err:
    free(reply.iov_base);
call_receive_cb_failed:
    if (metadataProps != NULL) {
        celix_properties_destroy(metadataProps);
    }
    rsaShmServer_terminateMsgHandling(msgCtrl);
    shmCache_releaseMemoryPtr(server->shmCache, msgBuffer);
    shmCache_releaseMemoryPtr(server->shmCache, msgCtrl);
    free(data);
    return;
}