nlsCppSdk/transport/nlsEventNetWork.cpp (771 lines of code) (raw):

/* * Copyright 2021 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdint.h> #include <stdio.h> #include <iostream> #ifndef _MSC_VER #include <unistd.h> #endif #include "connectNode.h" #include "event2/dns.h" #include "event2/thread.h" #include "iNlsRequest.h" #include "nlog.h" #include "nlsClientImpl.h" #include "nlsEventNetWork.h" #include "nlsGlobal.h" #include "nodeManager.h" #include "utility.h" #include "workThread.h" #ifdef ENABLE_REQUEST_RECORDING #include "text_utils.h" #endif #ifdef ENABLE_PRECONNECTED_POOL #include "connectedPool.h" #endif namespace AlibabaNls { #ifdef _MSC_VER #pragma comment(lib, "ws2_32") HANDLE NlsEventNetWork::_mtxThread = NULL; #else pthread_mutex_t NlsEventNetWork::_mtxThread = PTHREAD_MUTEX_INITIALIZER; #endif NlsEventNetWork *NlsEventNetWork::_eventClient = NULL; NlsEventNetWork::NlsEventNetWork() : _workThreadArray(NULL), _workThreadsNumber(0), _currentCpuNumber(0), _addrInFamily(0), _directIp(), _enableSysGetAddr(false), _syncCallTimeoutMs(0), #ifdef ENABLE_PRECONNECTED_POOL _preconnectedPool(NULL), _maxPreconnectedNumber(0), _preconnectedTimeoutMs(15000), _prerequestedTimeoutMs(7000), #endif _instance(NULL) { } NlsEventNetWork::~NlsEventNetWork() {} void NlsEventNetWork::DnsLogCb(int w, const char *m) { LOG_DEBUG(m); } void NlsEventNetWork::EventLogCb(int w, const char *m) { LOG_DEBUG(m); } void NlsEventNetWork::initEventNetWork(NlsClientImpl *instance, int count, char *aiFamily, char *directIp, bool sysGetAddr, unsigned int syncCallTimeoutMs) { MUTEX_LOCK(_mtxThread); #if defined(_MSC_VER) #ifdef EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED LOG_DEBUG("evthread_use_windows_thread."); int ret = evthread_use_windows_threads(); #endif WORD wVersionRequested; WSADATA wsaData; wVersionRequested = MAKEWORD(2, 2); (void)WSAStartup(wVersionRequested, &wsaData); #else LOG_DEBUG("evthread_use_pthreads."); int ret = evthread_use_pthreads(); #endif if (ret != Success) { LOG_ERROR("Invoke evthread_use_pthreads failed, ret:%d.", ret); MUTEX_UNLOCK(_mtxThread); exit(1); } #ifdef ENABLE_NLS_DEBUG evthread_enable_lock_debugging(); #endif _instance = instance; _addrInFamily = AF_INET; if (aiFamily != NULL) { if (strncmp(aiFamily, "AF_INET", 16) == 0) { _addrInFamily = AF_INET; } else if (strncmp(aiFamily, "AF_INET6", 16) == 0) { _addrInFamily = AF_INET6; } else if (strncmp(aiFamily, "AF_UNSPEC", 16) == 0) { _addrInFamily = AF_UNSPEC; } LOG_INFO("Set sockaddr_in type: %s.", aiFamily); } memset(_directIp, 0, 64); if (directIp) { strncpy(_directIp, directIp, 64); } _enableSysGetAddr = sysGetAddr; _syncCallTimeoutMs = syncCallTimeoutMs; #if defined(_MSC_VER) SYSTEM_INFO sysInfo; GetSystemInfo(&sysInfo); int cpuNumber = (int)sysInfo.dwNumberOfProcessors; #elif defined(__linux__) || defined(__ANDROID__) int cpuNumber = (int)sysconf(_SC_NPROCESSORS_ONLN); #endif #if defined(_MSC_VER) WorkThread::_mtxCpu = CreateMutex(NULL, FALSE, NULL); #else pthread_mutex_init(&WorkThread::_mtxCpu, NULL); #endif if (count <= 0) { _workThreadsNumber = cpuNumber; } else { _workThreadsNumber = count; } LOG_INFO("Work threads number: %d.", _workThreadsNumber); _workThreadArray = new WorkThread[_workThreadsNumber]; evdns_set_log_fn(DnsLogCb); event_set_log_callback(EventLogCb); MUTEX_UNLOCK(_mtxThread); LOG_INFO("Init NlsEventNetWork done."); return; } void NlsEventNetWork::destroyEventNetWork() { LOG_INFO("Destroy NlsEventNetWork(%p) begin ...", _eventClient); MUTEX_LOCK(_mtxThread); delete[] _workThreadArray; _workThreadArray = NULL; #if defined(_MSC_VER) CloseHandle(WorkThread::_mtxCpu); #else pthread_mutex_destroy(&WorkThread::_mtxCpu); #endif _workThreadsNumber = 0; _currentCpuNumber = 0; MUTEX_UNLOCK(_mtxThread); LOG_INFO("Destroy NlsEventNetWork(%p) done.", _eventClient); return; } /** * @brief: 选择工作进程 * @return: 成功则返回工程进程号, 失败则返回负值. */ int NlsEventNetWork::selectThreadNumber() { int number = 0; if (_workThreadArray != NULL) { number = _currentCpuNumber; if (++_currentCpuNumber == _workThreadsNumber) { _currentCpuNumber = 0; } LOG_INFO("Select Thread NO:%d, Next NO:%d, Total:%d.", number, _currentCpuNumber, _workThreadsNumber); } else { LOG_ERROR( "WorkThread isn't startup. Please invoke startWorkThread() first."); number = -1; } return number; } int NlsEventNetWork::start(INlsRequest *request) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } #ifdef ENABLE_PRECONNECTED_POOL if (_preconnectedPool) { node->usePreconnection(true); node->useLongConnection(false); } else { node->usePreconnection(false); } #endif /* 长链接模式下, 若为Completed状态, 则等待其进入Closed后, 再重置状态. */ if (node->isLongConnection()) { int try_count = 500; while (try_count-- > 0 && node->getConnectNodeStatus() == NodeCompleted) { #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } if (node->getConnectNodeStatus() == NodeClosed) { LOG_DEBUG( "Node:%p current is NodeClosed and longConnection mode, reset " "status.", node); node->initAllStatus(); } } /* * invoke start * Node处于刚创建完状态, 且处于非退出状态, 则可进行start操作. */ if (node->getConnectNodeStatus() == NodeCreated && node->getExitStatus() == ExitInvalid) { node->setConnectNodeStatus(NodeInvoking); #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("start", NodeInvoking, true, 0); #endif int num = request->getThreadNumber(); if (num < 0) { num = selectThreadNumber(); } if (num < 0) { node->setConnectNodeStatus(NodeCreated); MUTEX_UNLOCK(_mtxThread); #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("start", NodeCreated, false, 0); #endif return -(SelectThreadFailed); } else { request->setThreadNumber(num); } LOG_DEBUG("Request(%p) Node(%p) select NO:%d Total:%d thread.", request, node, num, _workThreadsNumber); WorkThread *work_thread = &_workThreadArray[num]; node->setEventThread(work_thread); node->getEventThread()->setInstance(_instance); node->setInstance(_instance); node->getEventThread()->setAddrInFamily(_addrInFamily); if (strnlen(_directIp, 64) > 0) { node->getEventThread()->setDirectHost(_directIp); } node->getEventThread()->setUseSysGetAddrInfo(_enableSysGetAddr); node->setSyncCallTimeout(_syncCallTimeoutMs); work_thread->updateParameters(node); #ifdef ENABLE_PRECONNECTED_POOL ConnectedStatus getPrestartedNode = PreNodeInvalid; uint64_t try_begin_ms = utility::TextUtils::GetTimestampMs(); if (_preconnectedPool) { getPrestartedNode = (ConnectedStatus)node->tryToGetPreconnection(); } uint64_t try_end_ms = utility::TextUtils::GetTimestampMs(); if (getPrestartedNode == PreNodeConnected) { // 获得了preconnected节点, 需要发起start LOG_DEBUG("Request(%p) node(%p) get a preconnected node ...", request, node); #ifdef ENABLE_REQUEST_RECORDING node->getNodeProcess()->connect_type = ConnectWithPreconnectedNodePool; #endif int event_ret = event_add(node->getStartWithPoolEvent(true), NULL); event_active(node->getStartWithPoolEvent(), EV_READ, 0); } else if (getPrestartedNode == PreNodeStarted) { // 获得了prestarted节点, 直接开始工作 LOG_DEBUG("Request(%p) node(%p) get a prestarted node ...", request, node); #ifdef ENABLE_REQUEST_RECORDING node->getNodeProcess()->connect_type = ConnectWithPrestartedNodePool; #endif int event_ret = event_add(node->getStartWithPoolEvent(true), NULL); event_active(node->getStartWithPoolEvent(), EV_READ, 0); LOG_DEBUG( "Request(%p) node(%p) get a prestarted node " "tryToGetPreconnection latency %llums", request, node, try_end_ms - try_begin_ms); } else #endif { LOG_DEBUG( "Request(%p) node(%p) ready to invoke event_add LaunchEvent ...", request, node); int event_ret = event_add(node->getLaunchEvent(true), NULL); if (event_ret != Success) { LOG_ERROR("Request(%p) node(%p) invoking event_add failed(%d).", request, node, event_ret); MUTEX_UNLOCK(_mtxThread); #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("start", NodeCreated, false, 0); #endif return -(InvokeStartFailed); } else { LOG_DEBUG( "Request(%p) node(%p) invoking event_add success, ready to launch " "request.", request, node); } event_active(node->getLaunchEvent(), EV_READ, 0); } node->initNlsEncoder(); if (node->getSyncCallTimeout() > 0) { node->waitInvokeFinish(); int error_code = node->getErrorCode(); if (error_code != Success) { MUTEX_UNLOCK(_mtxThread); #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("start", NodeCreated, false, 0); #endif return -(error_code); } } #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("start", NodeCreated, false, 0); #endif } else if (node->getExitStatus() == ExitInvalid && node->getConnectNodeStatus() > NodeCreated && node->getConnectNodeStatus() < NodeFailed) { LOG_WARN( "Request(%p) node(%p) has invoked start, node status:%s, exit " "status:%s. skip ...", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return Success; } else { LOG_ERROR( "Request(%p) node(%p) invoke start failed, current status is invalid. " "node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); node->setConnectNodeStatus(NodeCreated); MUTEX_UNLOCK(_mtxThread); return -(InvokeStartFailed); } MUTEX_UNLOCK(_mtxThread); LOG_DEBUG("Request(%p) node(%p) invoke start success.", request, node); return Success; } #ifdef ENABLE_PRECONNECTED_POOL int NlsEventNetWork::startInner(INlsRequest *request) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } if (_preconnectedPool) { node->usePreconnection(true); node->useLongConnection(false); } else { node->usePreconnection(false); } /* * invoke start * Node处于刚创建完状态, 且处于非退出状态, 则可进行start操作. */ if (node->getConnectNodeStatus() == NodeCreated && node->getExitStatus() == ExitInvalid) { node->setConnectNodeStatus(NodeInvoking); #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("start", NodeInvoking, true, 0); #endif int num = request->getThreadNumber(); if (num < 0) { num = selectThreadNumber(); } if (num < 0) { node->setConnectNodeStatus(NodeCreated); MUTEX_UNLOCK(_mtxThread); #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("start", NodeCreated, false, 0); #endif return -(SelectThreadFailed); } else { request->setThreadNumber(num); } LOG_DEBUG("Request(%p) Node(%p) select NO:%d Total:%d thread.", request, node, num, _workThreadsNumber); WorkThread *work_thread = &_workThreadArray[num]; node->setEventThread(work_thread); node->getEventThread()->setInstance(_instance); node->setInstance(_instance); node->getEventThread()->setAddrInFamily(_addrInFamily); if (strnlen(_directIp, 64) > 0) { node->getEventThread()->setDirectHost(_directIp); } node->getEventThread()->setUseSysGetAddrInfo(_enableSysGetAddr); node->setSyncCallTimeout(_syncCallTimeoutMs); work_thread->updateParameters(node); node->initNlsEncoder(); } else if (node->getExitStatus() == ExitInvalid && node->getConnectNodeStatus() > NodeCreated && node->getConnectNodeStatus() < NodeFailed) { LOG_WARN( "Request(%p) node(%p) has invoked start, node status:%s, exit " "status:%s. skip ...", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return Success; } else { LOG_ERROR( "Request(%p) node(%p) invoke start failed, current status is invalid. " "node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); node->setConnectNodeStatus(NodeCreated); MUTEX_UNLOCK(_mtxThread); return -(InvokeStartFailed); } MUTEX_UNLOCK(_mtxThread); LOG_DEBUG("Request(%p) node(%p) invoke start success.", request, node); return Success; } #endif int NlsEventNetWork::sendAudio(INlsRequest *request, const uint8_t *data, size_t dataSize, ENCODER_TYPE type) { EVENT_CLIENT_CHECK(_eventClient); ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request:%p is nullptr, you have destroyed request!", request); return -(NodeEmpty); } /* Node也许处于Starting状态还未到Started状态, 可等待一会. */ int try_count = 500; while (try_count-- > 0 && node->getConnectNodeStatus() == NodeStarting) { #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } if (node->getConnectNodeStatus() != NodeStarted || node->getExitStatus() != ExitInvalid) { LOG_ERROR( "Request(%p) node(%p) invoke sendAudio command failed, current status " "is invalid. node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); return -(InvokeSendAudioFailed); } #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("sendAudio", NodeSendAudio, true, dataSize); #endif int ret = 0; if (type != ENCODER_NONE) { ret = node->addSlicedAudioDataBuffer(data, dataSize); } else { ret = node->addAudioDataBuffer(data, dataSize); } #ifdef ENABLE_REQUEST_RECORDING node->updateNodeProcess("sendAudio", NodeSendAudio, false, 0); #endif return ret; } int NlsEventNetWork::stop(INlsRequest *request) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } /* FlowingSynthesizer Node也许处于发送单轮文本状态, 可等待一会. */ int try_count = 500; while (request->getRequestParam()->_requestType == FlowingSynthesizer && node->_isSendSingleRoundText && try_count-- > 0) { LOG_WARN( "Request(%p) Node(%p) is sending a single round of synthetic text.", request, node); #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } /* invoke stop * Node未处于运行状态, 或正处于退出状态, 则当前不可调用stop. */ if (node->getExitStatus() == ExitStopping) { LOG_WARN( "Request(%p) node(%p) has invoked stop, node status:%s, exit " "status:%s. skip ...", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return Success; } else if (node->getExitStatus() == ExitCancel) { LOG_WARN( "Request(%p) node(%p) has invoked cancel, node status:%s, exit " "status:%s. skip ...", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return Success; } if (node->getConnectNodeStatus() < NodeInvoking || node->getConnectNodeStatus() >= NodeFailed || node->getExitStatus() != ExitInvalid) { LOG_ERROR( "Request(%p) node(%p) invoke stop command failed, current status is " "invalid. node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return -(InvokeStopFailed); } int ret = node->cmdNotify(CmdStop, NULL); if (ret == Success && node->getSyncCallTimeout() > 0) { node->waitInvokeFinish(); int error_code = node->getErrorCode(); if (error_code != Success) { MUTEX_UNLOCK(_mtxThread); return -(error_code); } } MUTEX_UNLOCK(_mtxThread); return ret; } int NlsEventNetWork::cancel(INlsRequest *request) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } /* invoke cancel * Node未处于运行状态, 或正处于退出状态, 则当前不可调用stop. */ if (node->getExitStatus() == ExitCancel) { LOG_WARN( "Request(%p) node(%p) has invoked cancel, node status:%s, exit " "status:%s. skip ...", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return Success; } if (node->getConnectNodeStatus() < NodeInvoking || node->getConnectNodeStatus() >= NodeClosed) { LOG_ERROR( "Request(%p) node(%p) invoke cancel command failed, current status is " "invalid. node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return -(InvokeCancelFailed); } int ret = node->cmdNotify(CmdCancel, NULL); // NodeConnecting状态尽量不做操作, 500ms int try_count = 100; while (try_count-- > 0 && node->getConnectNodeStatus() == NodeConnecting) { #if defined(_MSC_VER) ReleaseMutex(_mtxThread); Sleep(5); WaitForSingleObject(_mtxThread, INFINITE); #else pthread_mutex_unlock(&_mtxThread); usleep(5 * 1000); pthread_mutex_lock(&_mtxThread); #endif } // LOG_DUMP_EVENTS(node->getEventThread()->_workBase); MUTEX_UNLOCK(_mtxThread); return ret; } int NlsEventNetWork::stControl(INlsRequest *request, const char *message) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } /* invoke stControl * Node未处于started状态, 或处于退出状态, 则当前不可调用stControl. */ if (node->getConnectNodeStatus() != NodeStarted || node->getExitStatus() != ExitInvalid) { LOG_ERROR( "Request(%p) node(%p) invoke stControl command failed, current status " "is invalid. node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return -(InvokeStControlFailed); } int ret = node->cmdNotify(CmdStControl, message); MUTEX_UNLOCK(_mtxThread); return ret; } int NlsEventNetWork::sendText(INlsRequest *request, const char *text) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } /* Node也许处于Starting状态还未到Started状态, 可等待一会. */ int try_count = 500; while (try_count-- > 0 && node->getConnectNodeStatus() == NodeStarting) { #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } /* invoke sendText * Node未处于started状态, 或处于退出状态, 则当前不可调用sendText. */ if (node->getConnectNodeStatus() != NodeStarted || node->getExitStatus() != ExitInvalid) { LOG_ERROR( "Request(%p) node(%p) invoke sendText command failed, current status " "is invalid. node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return -(InvokeSendTextFailed); } int ret = node->cmdNotify(CmdSendText, text); MUTEX_UNLOCK(_mtxThread); return ret; } int NlsEventNetWork::sendPing(INlsRequest *request) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } /* Node也许处于Starting状态还未到Started状态, 可等待一会. */ int try_count = 500; while (try_count-- > 0 && node->getConnectNodeStatus() == NodeStarting) { #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } /* invoke sendPing * Node未处于started状态, 或处于退出状态, 则当前不可调用sendPing. */ if (node->getConnectNodeStatus() != NodeStarted || node->getExitStatus() != ExitInvalid) { LOG_ERROR( "Request(%p) node(%p) invoke sendPing command failed, current status " "is invalid. node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return -(InvokeSendTextFailed); } int ret = node->cmdNotify(CmdSendPing, NULL); MUTEX_UNLOCK(_mtxThread); return ret; } int NlsEventNetWork::sendFlush(INlsRequest *request, const char *parameters) { MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return -(EventClientEmpty); } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return -(NodeEmpty); } /* Node也许处于Starting状态还未到Started状态, 可等待一会. */ int try_count = 500; while (try_count-- > 0 && node->getConnectNodeStatus() == NodeStarting) { #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } /* invoke sendFlush * Node未处于started状态, 或处于退出状态, 则当前不可调用sendFlush. */ if (node->getConnectNodeStatus() != NodeStarted || node->getExitStatus() != ExitInvalid) { LOG_ERROR( "Request(%p) node(%p) invoke sendFlush command failed, current status " "is invalid. node status:%s, exit status:%s.", request, node, node->getConnectNodeStatusString().c_str(), node->getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxThread); return -(InvokeSendTextFailed); } int ret = node->cmdNotify(CmdSendFlush, parameters); MUTEX_UNLOCK(_mtxThread); return ret; } const char *NlsEventNetWork::dumpAllInfo(INlsRequest *request) { #ifdef ENABLE_REQUEST_RECORDING MUTEX_LOCK(_mtxThread); if (_eventClient == NULL) { LOG_ERROR( "NlsEventNetWork has destroyed, please invoke startWorkThread() " "first."); MUTEX_UNLOCK(_mtxThread); return NULL; } ConnectNode *node = request->getConnectNode(); if (node == NULL) { LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!", request); MUTEX_UNLOCK(_mtxThread); return NULL; } std::string info(node->dumpAllInfo()); MUTEX_UNLOCK(_mtxThread); return info.c_str(); #else return NULL; #endif } #ifdef ENABLE_PRECONNECTED_POOL int NlsEventNetWork::initPreconnectedPool(unsigned int maxNumber, unsigned int connectedTimeoutMs, unsigned int requestedTimeoutMs) { int ret = Success; MUTEX_LOCK(_mtxThread); if (_preconnectedPool) { LOG_WARN("ConnectedPool has existed, destroy first."); delete _preconnectedPool; _preconnectedPool = NULL; } _maxPreconnectedNumber = maxNumber; _preconnectedTimeoutMs = connectedTimeoutMs; _prerequestedTimeoutMs = requestedTimeoutMs; if (_maxPreconnectedNumber > 0) { _preconnectedPool = new ConnectedPool( _maxPreconnectedNumber, _preconnectedTimeoutMs, _prerequestedTimeoutMs); if (_preconnectedPool == NULL) { LOG_ERROR("New ConnectedPool failed."); ret = -(ConnectedPoolEmpty); } else { LOG_INFO( "New a ConnectedPool(%p) with max connected number:%d, connected " "timeout:%dms and requested timeout:%dms", _preconnectedPool, _maxPreconnectedNumber, _preconnectedTimeoutMs, _prerequestedTimeoutMs); _preconnectedPool->startConnectedPool(); } } MUTEX_UNLOCK(_mtxThread); return ret; } int NlsEventNetWork::destroyPreconnectedPool() { MUTEX_LOCK(_mtxThread); if (_preconnectedPool) { _preconnectedPool->stopConnectedPool(); LOG_INFO("Destroy ConnectedPool(%p).", _preconnectedPool); delete _preconnectedPool; _preconnectedPool = NULL; } MUTEX_UNLOCK(_mtxThread); return Success; } ConnectedPool *NlsEventNetWork::getPreconnectedPool() { return _preconnectedPool; } #endif // ENABLE_PRECONNECTED_POOL } // namespace AlibabaNls