in bundles/remote_services/remote_service_admin_shm_v2/rsa_shm/src/rsa_shm_server.c [169:256]
static void rsaShmServer_msgHandlingWork(void *data) {
assert(data != NULL);
int status = CELIX_SUCCESS;
struct rsa_shm_server_thpool_work_data *workData = data;
rsa_shm_server_t *server = workData->server;
assert(server != NULL);
rsa_shm_msg_control_t *msgCtrl = (rsa_shm_msg_control_t *)workData->msgCtrl;
char *msgBuffer = (char*)workData->msgBody;
const char *metaDataString = msgBuffer;
char *requestData = msgBuffer + workData->metadataSize;
celix_properties_t *metadataProps = NULL;
if (workData->metadataSize != 0) {
if (celix_properties_loadFromString(metaDataString, 0, &metadataProps) != CELIX_SUCCESS) {
celix_logHelper_warning(server->loghelper, "RsaShmServer: Parse metadata failed.");
celix_logHelper_logTssErrors(server->loghelper, CELIX_LOG_LEVEL_WARNING);
}
}
struct iovec reply = {NULL, 0};
struct iovec request = {requestData, workData->requestSize};
status = server->revCB(server->revCBHandle, server, metadataProps, &request, &reply);
if (status != CELIX_SUCCESS || reply.iov_base == NULL || reply.iov_len == 0) {
celix_logHelper_error(server->loghelper, "RsaShmServer: Call receive msg callback failed. Error data:%d, %p, %zu.",
status, reply.iov_base, reply.iov_len);
goto call_receive_cb_failed;
}
char *src = reply.iov_base;
size_t srcSize = reply.iov_len;
int waitRet = 0;
pthread_mutex_lock(&msgCtrl->lock);
while (true) {
if (msgCtrl->msgState == REQ_CANCELLED || waitRet != 0) {
pthread_mutex_unlock(&msgCtrl->lock);
celix_logHelper_error(server->loghelper, "RsaShmServer: Client cancelled the request, or timeout. %d.", waitRet);
goto reply_err;
}
size_t destSize = workData->msgBodyTotalSize;
char *dest = msgBuffer;
size_t bytes = MIN(srcSize, destSize);
memcpy(dest, src, bytes);
src += bytes;
srcSize -= bytes;
if (srcSize == 0) {
msgCtrl->msgState = REPLIED;
msgCtrl->actualReplyedSize = bytes;
//Signaling the condition variable first, and then unlocking the mutex, because client will free ctrl when msgState is REPLIED.
pthread_cond_signal(&msgCtrl->signal);
break;
} else {
msgCtrl->msgState = REPLYING;
msgCtrl->actualReplyedSize = bytes;
pthread_cond_signal(&msgCtrl->signal);
struct timespec timeout = celix_gettime(CLOCK_MONOTONIC);
timeout.tv_sec += server->msgTimeOutInSec;
while (msgCtrl->msgState == REPLYING && waitRet == 0) {
//pthread_cond_timedwait shall not return an error code of [EINTR]. @ref https://man7.org/linux/man-pages/man3/pthread_cond_timedwait.3p.html
waitRet = pthread_cond_timedwait(&msgCtrl->signal, &msgCtrl->lock, &timeout);
}
}
}
pthread_mutex_unlock(&msgCtrl->lock);
free(reply.iov_base);
if (metadataProps != NULL) {
celix_properties_destroy(metadataProps);
}
shmCache_releaseMemoryPtr(server->shmCache, msgBuffer);
shmCache_releaseMemoryPtr(server->shmCache, msgCtrl);
free(data);
return;
reply_err:
free(reply.iov_base);
call_receive_cb_failed:
if (metadataProps != NULL) {
celix_properties_destroy(metadataProps);
}
rsaShmServer_terminateMsgHandling(msgCtrl);
shmCache_releaseMemoryPtr(server->shmCache, msgBuffer);
shmCache_releaseMemoryPtr(server->shmCache, msgCtrl);
free(data);
return;
}