celix_status_t rsaShmClientManager_sendMsgTo()

in bundles/remote_services/remote_service_admin_shm_v2/rsa_shm/src/rsa_shm_client.c [252:373]


celix_status_t rsaShmClientManager_sendMsgTo(rsa_shm_client_manager_t *clientManager,
        const char *peerServerName, long serviceId, celix_properties_t *metadata,
        const struct iovec *request, struct iovec *response) {
    celix_status_t status = CELIX_SUCCESS;
    if (clientManager == NULL || peerServerName == NULL || strlen(peerServerName) >= MAX_RSA_SHM_SERVER_NAME_SIZE
            || request == NULL || request->iov_base == NULL || request->iov_len == 0
            || response == NULL) {
        return CELIX_ILLEGAL_ARGUMENT;
    }
    size_t metadataStringSize = 0;
    FILE *fp = NULL;
    rsa_shm_msg_control_t *msgCtrl = NULL;

    celix_autoptr(rsa_shm_client_t) client = rsaShmClientManager_getClient(clientManager, peerServerName);
    if (client == NULL) {
        return CELIX_ILLEGAL_STATE;
    }

    if (rsaShmClient_shouldBreakInvocation(client, serviceId)) {
        celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Breaking current invocation for service id %ld.", serviceId);
        return CELIX_ILLEGAL_STATE;
    }

    celix_autofree char* metadataString = NULL;
    fp = open_memstream(&metadataString, &metadataStringSize);
    if (fp == NULL) {
        celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error opening metadata memory. %d.", errno);
        return CELIX_ERROR_MAKE(CELIX_FACILITY_CERRNO, errno);
    }
    if (metadata != NULL) {
        status = celix_properties_saveToStream(metadata, fp, 0);
        if (status != CELIX_SUCCESS) {
            fclose(fp);
            celix_logHelper_error(
                clientManager->logHelper, "RsaShmClient: Error encoding metadata to memory stream. %d.", status);
            celix_logHelper_logTssErrors(clientManager->logHelper, CELIX_LOG_LEVEL_ERROR);
            return status;
        }
    }
    fclose(fp);
    // make the metadata include the terminating null byte ('\0')
    size_t metadataSize = (metadataStringSize == 0) ? 0 : metadataStringSize +1;
    size_t msgBodySize = MAX((metadataSize + request->iov_len), ESTIMATED_MSG_RESPONSE_SIZE_DEFAULT);

    celix_auto(rsa_shm_msg_control_alloc_t) msgCtrlAlloc = {
            .ctrl = NULL,
            .clientManager = clientManager,
    };
    status = rsaShmClientManager_createMsgControl(clientManager, &msgCtrlAlloc);
    if (status != CELIX_SUCCESS) {
        celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error creating msg control. %d.", status);
        return status;
    }
    msgCtrl = (rsa_shm_msg_control_t*)msgCtrlAlloc.ctrl;
    celix_auto(celix_shm_pool_alloc_guard_t) msgBodyAlloc =
        celix_shmPoolAllocGuard_init(shmPool_malloc(clientManager->shmPool, msgBodySize), clientManager->shmPool);
    char *msgBody = (char *)msgBodyAlloc.ptr;
    if (msgBody == NULL) {
        celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error allocing msg buffer.");
        return CELIX_ENOMEM;
    }
    if (metadataSize != 0) {
        memcpy(msgBody, metadataString, metadataSize);
    }
    memcpy(msgBody + metadataSize, request->iov_base,request->iov_len);

    rsa_shm_msg_t msgInfo = {
            .size = sizeof(rsa_shm_msg_t),
            .shmId = shmPool_getShmId(clientManager->shmPool),
            .ctrlDataOffset = shmPool_getMemoryOffset(clientManager->shmPool, msgCtrl),
            .ctrlDataSize = sizeof(rsa_shm_msg_control_t),
            .msgBodyOffset = shmPool_getMemoryOffset(clientManager->shmPool, msgBody),
            .msgBodyTotalSize = msgBodySize,
            .metadataSize = metadataSize,
            .requestSize = request->iov_len,
    };
    //LCOV_EXCL_START
    if (msgInfo.shmId < 0 || msgInfo.ctrlDataOffset < 0 || msgInfo.msgBodyOffset < 0) {
        celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Illegal message info.");
        // assert(0);
        return CELIX_ILLEGAL_ARGUMENT;
    }
    //LCOV_EXCL_STOP
    while(1) {
        if (sendto(client->cfd, &msgInfo, sizeof(msgInfo), 0, (struct sockaddr *) &client->serverAddr,
                   sizeof(struct sockaddr_un)) == sizeof(msgInfo)) {
            break;
        } else if (errno != EINTR) {
            celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error sending message to %s. %d",
                                  peerServerName, errno);
            return CELIX_ERROR_MAKE(CELIX_FACILITY_CERRNO, errno);
        } else {
            celix_logHelper_warning(clientManager->logHelper, "RsaShmClient: Interrupted while sending message to %s, try again. %d",
                                    peerServerName, errno);
        }
    };

    bool replied = false;
    status = rsaShmClientManager_receiveResponse(clientManager, msgCtrl, msgBody,
            msgBodySize, response, &replied);
    if (status != CELIX_SUCCESS) {
        celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error receiving response. %d.", status);
        rsaShmClientManager_markSvcCallFailed(clientManager, peerServerName, serviceId);
    }

    if (replied) {
        rsaShmClientManager_markSvcCallFinished(clientManager, peerServerName, serviceId);
    } else {
        rsa_shm_exception_msg_t *exceptionMsg = (rsa_shm_exception_msg_t *)malloc(sizeof(*exceptionMsg));
        assert(exceptionMsg != NULL);
        exceptionMsg->msgCtrl = (rsa_shm_msg_control_t*)celix_steal_ptr(msgCtrlAlloc.ctrl);
        exceptionMsg->msgBuffer = celix_steal_ptr(msgBodyAlloc.ptr);
        exceptionMsg->serviceId = serviceId;
        exceptionMsg->peerServerName = strdup(peerServerName);
        // Let rsaShmClientManager_exceptionMsgHandlerThread free exception message
        celixThreadMutex_lock(&clientManager->exceptionMsgListMutex);
        celix_arrayList_add(clientManager->exceptionMsgList, exceptionMsg);
        celixThreadMutex_unlock(&clientManager->exceptionMsgListMutex);
        celixThreadCondition_signal(&clientManager->exceptionMsgListNotEmpty);
    }
    return status;
}