in bundles/remote_services/remote_service_admin_shm_v2/rsa_shm/src/rsa_shm_client.c [252:373]
celix_status_t rsaShmClientManager_sendMsgTo(rsa_shm_client_manager_t *clientManager,
const char *peerServerName, long serviceId, celix_properties_t *metadata,
const struct iovec *request, struct iovec *response) {
celix_status_t status = CELIX_SUCCESS;
if (clientManager == NULL || peerServerName == NULL || strlen(peerServerName) >= MAX_RSA_SHM_SERVER_NAME_SIZE
|| request == NULL || request->iov_base == NULL || request->iov_len == 0
|| response == NULL) {
return CELIX_ILLEGAL_ARGUMENT;
}
size_t metadataStringSize = 0;
FILE *fp = NULL;
rsa_shm_msg_control_t *msgCtrl = NULL;
celix_autoptr(rsa_shm_client_t) client = rsaShmClientManager_getClient(clientManager, peerServerName);
if (client == NULL) {
return CELIX_ILLEGAL_STATE;
}
if (rsaShmClient_shouldBreakInvocation(client, serviceId)) {
celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Breaking current invocation for service id %ld.", serviceId);
return CELIX_ILLEGAL_STATE;
}
celix_autofree char* metadataString = NULL;
fp = open_memstream(&metadataString, &metadataStringSize);
if (fp == NULL) {
celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error opening metadata memory. %d.", errno);
return CELIX_ERROR_MAKE(CELIX_FACILITY_CERRNO, errno);
}
if (metadata != NULL) {
status = celix_properties_saveToStream(metadata, fp, 0);
if (status != CELIX_SUCCESS) {
fclose(fp);
celix_logHelper_error(
clientManager->logHelper, "RsaShmClient: Error encoding metadata to memory stream. %d.", status);
celix_logHelper_logTssErrors(clientManager->logHelper, CELIX_LOG_LEVEL_ERROR);
return status;
}
}
fclose(fp);
// make the metadata include the terminating null byte ('\0')
size_t metadataSize = (metadataStringSize == 0) ? 0 : metadataStringSize +1;
size_t msgBodySize = MAX((metadataSize + request->iov_len), ESTIMATED_MSG_RESPONSE_SIZE_DEFAULT);
celix_auto(rsa_shm_msg_control_alloc_t) msgCtrlAlloc = {
.ctrl = NULL,
.clientManager = clientManager,
};
status = rsaShmClientManager_createMsgControl(clientManager, &msgCtrlAlloc);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error creating msg control. %d.", status);
return status;
}
msgCtrl = (rsa_shm_msg_control_t*)msgCtrlAlloc.ctrl;
celix_auto(celix_shm_pool_alloc_guard_t) msgBodyAlloc =
celix_shmPoolAllocGuard_init(shmPool_malloc(clientManager->shmPool, msgBodySize), clientManager->shmPool);
char *msgBody = (char *)msgBodyAlloc.ptr;
if (msgBody == NULL) {
celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error allocing msg buffer.");
return CELIX_ENOMEM;
}
if (metadataSize != 0) {
memcpy(msgBody, metadataString, metadataSize);
}
memcpy(msgBody + metadataSize, request->iov_base,request->iov_len);
rsa_shm_msg_t msgInfo = {
.size = sizeof(rsa_shm_msg_t),
.shmId = shmPool_getShmId(clientManager->shmPool),
.ctrlDataOffset = shmPool_getMemoryOffset(clientManager->shmPool, msgCtrl),
.ctrlDataSize = sizeof(rsa_shm_msg_control_t),
.msgBodyOffset = shmPool_getMemoryOffset(clientManager->shmPool, msgBody),
.msgBodyTotalSize = msgBodySize,
.metadataSize = metadataSize,
.requestSize = request->iov_len,
};
//LCOV_EXCL_START
if (msgInfo.shmId < 0 || msgInfo.ctrlDataOffset < 0 || msgInfo.msgBodyOffset < 0) {
celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Illegal message info.");
// assert(0);
return CELIX_ILLEGAL_ARGUMENT;
}
//LCOV_EXCL_STOP
while(1) {
if (sendto(client->cfd, &msgInfo, sizeof(msgInfo), 0, (struct sockaddr *) &client->serverAddr,
sizeof(struct sockaddr_un)) == sizeof(msgInfo)) {
break;
} else if (errno != EINTR) {
celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error sending message to %s. %d",
peerServerName, errno);
return CELIX_ERROR_MAKE(CELIX_FACILITY_CERRNO, errno);
} else {
celix_logHelper_warning(clientManager->logHelper, "RsaShmClient: Interrupted while sending message to %s, try again. %d",
peerServerName, errno);
}
};
bool replied = false;
status = rsaShmClientManager_receiveResponse(clientManager, msgCtrl, msgBody,
msgBodySize, response, &replied);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(clientManager->logHelper, "RsaShmClient: Error receiving response. %d.", status);
rsaShmClientManager_markSvcCallFailed(clientManager, peerServerName, serviceId);
}
if (replied) {
rsaShmClientManager_markSvcCallFinished(clientManager, peerServerName, serviceId);
} else {
rsa_shm_exception_msg_t *exceptionMsg = (rsa_shm_exception_msg_t *)malloc(sizeof(*exceptionMsg));
assert(exceptionMsg != NULL);
exceptionMsg->msgCtrl = (rsa_shm_msg_control_t*)celix_steal_ptr(msgCtrlAlloc.ctrl);
exceptionMsg->msgBuffer = celix_steal_ptr(msgBodyAlloc.ptr);
exceptionMsg->serviceId = serviceId;
exceptionMsg->peerServerName = strdup(peerServerName);
// Let rsaShmClientManager_exceptionMsgHandlerThread free exception message
celixThreadMutex_lock(&clientManager->exceptionMsgListMutex);
celix_arrayList_add(clientManager->exceptionMsgList, exceptionMsg);
celixThreadMutex_unlock(&clientManager->exceptionMsgListMutex);
celixThreadCondition_signal(&clientManager->exceptionMsgListNotEmpty);
}
return status;
}