nlsCppSdk/transport/nodeManager.cpp (381 lines of code) (raw):

/* * Copyright 2021 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef _MSC_VER #include <unistd.h> #endif #include <time.h> #include "connectNode.h" #include "iNlsRequest.h" #include "nlog.h" #include "nlsGlobal.h" #include "nodeManager.h" #include "utility.h" namespace AlibabaNls { NlsNodeManager::NlsNodeManager() : _timeout_ms(DefaultRemoveTimeout) { #if defined(_MSC_VER) _mtxNodeManager = CreateMutex(NULL, FALSE, NULL); #else pthread_mutex_init(&_mtxNodeManager, NULL); #endif } NlsNodeManager::~NlsNodeManager() { #if defined(_MSC_VER) CloseHandle(_mtxNodeManager); #else pthread_mutex_destroy(&_mtxNodeManager); #endif } /** * @brief: 新创建request加入到NodeManager中 * @return: */ int NlsNodeManager::addRequestIntoInfoWithInstance(void* request, void* instance) { MUTEX_LOCK(_mtxNodeManager); if (instance == NULL) { LOG_ERROR("instance is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(EventClientEmpty); } if (request == NULL) { LOG_ERROR("request is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(RequestEmpty); } INlsRequest* nls_request = static_cast<INlsRequest*>(request); ConnectNode* node = static_cast<ConnectNode*>(nls_request->getConnectNode()); if (node == NULL) { LOG_ERROR("node is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(NodeEmpty); } std::map<void*, NodeInfo>::iterator iter; iter = this->_infoByRequest.find(request); if (iter != this->_infoByRequest.end()) { NodeInfo& info = iter->second; LOG_WARN("request:%p has added in NodeInfo, status:%s node:%p", request, this->getNodeStatusString(info.status).c_str(), info.node); if (info.status > NodeStatusCreated && info.status < NodeStatusReleased) { LOG_ERROR("request:%p is conflicted in NodeInfo, status:%s node:%p", info.request, this->getNodeStatusString(info.status).c_str(), info.node); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } else { if (info.instance != instance) { LOG_ERROR("the request:%p of instance(%p) isnot in instance(%p)", info.request, instance, info.instance); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } LOG_WARN("request:%p cover old request in NodeInfo, status:%s node:%p", info.request, this->getNodeStatusString(info.status).c_str(), info.node); info.request = request; info.node = node; info.instance = instance; info.uuid = node->getNodeUUID(); info.status = NodeStatusCreated; this->_requestListByNode[node] = request; } } else { NodeInfo info; info.request = request; info.node = node; info.instance = instance; info.uuid = node->getNodeUUID(); info.status = NodeStatusCreated; this->_infoByRequest.insert(std::make_pair(request, info)); this->_requestListByNode.insert(std::make_pair(node, request)); LOG_DEBUG("add request(%p) node(%p) into NodeInfo", request, node); } MUTEX_UNLOCK(_mtxNodeManager); return Success; } /** * @brief: 检查request是否属于此instance * @return: Error Code */ int NlsNodeManager::checkRequestWithInstance(void* request, void* instance) { MUTEX_LOCK(_mtxNodeManager); if (instance == NULL) { LOG_ERROR("instance is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(EventClientEmpty); } if (request == NULL) { LOG_ERROR("request is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(RequestEmpty); } std::map<void*, NodeInfo>::iterator iter; iter = this->_infoByRequest.find(request); if (iter != this->_infoByRequest.end()) { NodeInfo& info = iter->second; if (info.instance != instance) { LOG_ERROR("the request:%p of instance(%p) isnot in instance(%p)", info.request, instance, info.instance); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } else { INlsRequest* nls_request = static_cast<INlsRequest*>(request); ConnectNode* node = static_cast<ConnectNode*>(nls_request->getConnectNode()); std::map<void*, void*>::iterator node_iter; node_iter = this->_requestListByNode.find(node); if (node_iter == this->_requestListByNode.end()) { LOG_ERROR("node(%p) isn't in NodeInfo, request(%p) is invalid.", node, request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } else { std::string uuid = node->getNodeUUID(); if (info.uuid != uuid) { LOG_ERROR("the uuid(%s) isnot in node(%p), request(%p) is invalid.", uuid.c_str(), node, request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } } } } else { LOG_ERROR("request:%p isnot in NodeInfo", request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } MUTEX_UNLOCK(_mtxNodeManager); return Success; } /** * @brief: request释放后从NodeManager中删除此request * @return: */ int NlsNodeManager::removeRequestFromInfo(void* request, bool wait) { MUTEX_LOCK(_mtxNodeManager); if (request == NULL) { LOG_ERROR("request is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(RequestEmpty); } int timeout = 0; std::map<void*, NodeInfo>::iterator iter; while (wait) { iter = this->_infoByRequest.find(request); if (iter != this->_infoByRequest.end()) { NodeInfo& info = iter->second; if (info.status != NodeStatusCreated && info.status < NodeStatusClosed) { if (timeout >= this->_timeout_ms) { LOG_WARN( "request(%p) node(%p) status(%s) is invalid, wait timeout(%dms), " "remove it by force.", request, info.node, this->getNodeStatusString(info.status).c_str(), timeout); break; } LOG_DEBUG("request(%p) node(%p) status(%s) is waiting timeout(%dms)...", request, info.node, this->getNodeStatusString(info.status).c_str(), timeout); MUTEX_UNLOCK(_mtxNodeManager); #if defined(_MSC_VER) Sleep(StepSleepMs); #else usleep(StepSleepMs * 1000); #endif timeout += StepSleepMs; MUTEX_LOCK(_mtxNodeManager); } else { break; } } else { break; } } // while iter = this->_infoByRequest.find(request); if (iter != this->_infoByRequest.end()) { NodeInfo& info = iter->second; void* node = info.node; std::map<void*, void*>::iterator iter2; iter2 = this->_requestListByNode.find(node); if (iter2 != this->_requestListByNode.end()) { this->_requestListByNode.erase(iter2); } LOG_INFO("request(%p) node(%p) status(%s) removed.", request, info.node, this->getNodeStatusString(info.status).c_str()); _infoByRequest.erase(iter); } else { LOG_ERROR("request:%p isnot in NodeInfo", request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } MUTEX_UNLOCK(_mtxNodeManager); return Success; } int NlsNodeManager::removeInstanceFromInfo(void* instance) { return Success; } int NlsNodeManager::checkRequestExist(void* request, int* status) { MUTEX_LOCK(_mtxNodeManager); if (request == NULL) { LOG_ERROR("request is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(RequestEmpty); } std::map<void*, NodeInfo>::iterator iter; iter = this->_infoByRequest.find(request); if (iter != this->_infoByRequest.end()) { NodeInfo& info = iter->second; if (info.request != request) { LOG_ERROR("the request:%p mismatch the request:%p in NodeInfo", request, info.request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } else { INlsRequest* nls_request = static_cast<INlsRequest*>(request); ConnectNode* node = static_cast<ConnectNode*>(nls_request->getConnectNode()); std::map<void*, void*>::iterator node_iter; node_iter = this->_requestListByNode.find(node); if (node_iter == this->_requestListByNode.end()) { LOG_ERROR("node(%p) isn't in NodeInfo, request(%p) is invalid.", node, request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } else { std::string uuid = node->getNodeUUID(); if (info.uuid != uuid) { LOG_ERROR("the uuid(%s) isnot in node(%p), request(%p) is invalid.", uuid.c_str(), node, request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } } } *status = info.status; } else { LOG_ERROR("Request:%p isn't in NodeInfo", request); MUTEX_UNLOCK(_mtxNodeManager); return -(RequestEmpty); } MUTEX_UNLOCK(_mtxNodeManager); return Success; } int NlsNodeManager::checkNodeExist(void* node, int* status) { MUTEX_LOCK(_mtxNodeManager); if (node == NULL) { LOG_ERROR("node is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(NodeEmpty); } std::map<void*, void*>::iterator iter0; iter0 = this->_requestListByNode.find(node); if (iter0 == this->_requestListByNode.end()) { LOG_ERROR("node(%p) isn't in NodeInfo.", node); MUTEX_UNLOCK(_mtxNodeManager); return -(NodeEmpty); } ConnectNode* connect_node = static_cast<ConnectNode*>(node); INlsRequest* request = static_cast<INlsRequest*>(connect_node->getRequest()); if (request == NULL) { LOG_ERROR("request is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(RequestEmpty); } std::map<void*, NodeInfo>::iterator iter; iter = this->_infoByRequest.find(request); if (iter != this->_infoByRequest.end()) { NodeInfo& info = iter->second; if (info.request != request) { LOG_ERROR("the request:%p mismatch the request:%p in NodeInfo", request, info.request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } else { std::string uuid = connect_node->getNodeUUID(); if (info.uuid != uuid) { LOG_ERROR("the uuid(%s) isnot in node(%p), request(%p) is invalid.", uuid.c_str(), connect_node, request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } } *status = info.status; } else { LOG_ERROR("Request:%p isnot in NodeInfo", request); MUTEX_UNLOCK(_mtxNodeManager); return -(RequestEmpty); } MUTEX_UNLOCK(_mtxNodeManager); return Success; } int NlsNodeManager::updateNodeStatus(void* node, int status) { MUTEX_LOCK(_mtxNodeManager); if (node == NULL) { LOG_ERROR("node is nullptr."); MUTEX_UNLOCK(_mtxNodeManager); return -(NodeEmpty); } std::map<void*, void*>::iterator iter0; iter0 = this->_requestListByNode.find(node); if (iter0 == this->_requestListByNode.end()) { LOG_ERROR("node(%p) isn't in NodeInfo.", node); MUTEX_UNLOCK(_mtxNodeManager); return -(NodeEmpty); } ConnectNode* connect_node = static_cast<ConnectNode*>(node); INlsRequest* request = static_cast<INlsRequest*>(connect_node->getRequest()); std::map<void*, NodeInfo>::iterator iter; iter = this->_infoByRequest.find(request); if (iter != this->_infoByRequest.end()) { NodeInfo& info = iter->second; std::string uuid = connect_node->getNodeUUID(); if (info.uuid != uuid) { LOG_ERROR("the uuid(%s) isnot in node(%p), request(%p) is invalid.", uuid.c_str(), connect_node, request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvalidRequest); } LOG_DEBUG("Node(%p) set node status from %s to %s.", info.node, this->getNodeStatusString(info.status).c_str(), this->getNodeStatusString(status).c_str()); info.status = status; } else { LOG_ERROR("Request(%p) isn't in NodeInfo", request); MUTEX_UNLOCK(_mtxNodeManager); return -(InvaildNodeStatus); } MUTEX_UNLOCK(_mtxNodeManager); return Success; } std::string NlsNodeManager::getNodeStatusString(int status) { std::string ret_str("Unknown"); switch (status) { case NodeStatusInvalid: ret_str.assign("NodeStatusInvalid"); break; case NodeStatusCreated: ret_str.assign("NodeStatusCreated"); break; case NodeStatusInvoking: ret_str.assign("NodeStatusInvoking"); break; case NodeStatusInvoked: ret_str.assign("NodeStatusInvoked"); break; case NodeStatusConnecting: ret_str.assign("NodeStatusConnecting"); break; case NodeStatusConnected: ret_str.assign("NodeStatusConnected"); break; case NodeStatusHandshaking: ret_str.assign("NodeStatusHandshaking"); break; case NodeStatusHandshaked: ret_str.assign("NodeStatusHandshaked"); break; case NodeStatusRunning: ret_str.assign("NodeStatusRunning"); break; case NodeStatusCancelling: ret_str.assign("NodeStatusCancelling"); break; case NodeStatusClosing: ret_str.assign("NodeStatusClosing"); break; case NodeStatusClosed: ret_str.assign("NodeStatusClosed"); break; case NodeStatusReleased: ret_str.assign("NodeStatusReleased"); break; default: ret_str.assign("Unknown"); break; } return ret_str; } } // namespace AlibabaNls