nlsCppSdk/transport/connectNode.cpp (3,857 lines of code) (raw):

/* * Copyright 2021 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdint.h> #include <stdio.h> #include <iostream> #include <vector> #if defined(__ANDROID__) || defined(__linux__) #include <errno.h> #include <signal.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #ifndef __ANDRIOD__ #include <iconv.h> #endif #endif #ifdef __GNUC__ #include <arpa/inet.h> #include <netdb.h> #include <netinet/in.h> #include <sys/ioctl.h> #include <sys/poll.h> #include <sys/time.h> #endif #include "Config.h" #include "connectNode.h" #include "iNlsRequest.h" #include "iNlsRequestParam.h" #include "nlog.h" #include "nlsClientImpl.h" #include "nlsEventNetWork.h" #include "nlsGlobal.h" #include "nodeManager.h" #include "text_utils.h" #include "utility.h" #include "workThread.h" #ifdef ENABLE_REQUEST_RECORDING #include "json/json.h" #include "text_utils.h" #endif namespace AlibabaNls { ConnectNode::ConnectNode(INlsRequest *request, HandleBaseOneParamWithReturnVoid<NlsEvent> *handler, bool isLongConnection) : _request(request), _handler(handler), _isLongConnection(isLongConnection), _usePreconnection(false), _isPreconnecting(false), _isPreNodeStartStepByStep(false), _dnsRequest(NULL), _dnsRequestCallbackStatus(0), _retryConnectCount(0), _socketFd(INVALID_SOCKET), _binaryEvBuffer(NULL), _readEvBuffer(NULL), _cmdEvBuffer(NULL), _wwvEvBuffer(NULL), _nlsEncoder(NULL), _encoderType(ENCODER_NONE), _audioFrame(NULL), _audioFrameSize(0), _maxFrameSize(0), _isFirstAudioFrame(true), _eventThread(NULL), _isDestroy(false), _isWakeStop(false), _isStop(false), _isFirstBinaryFrame(true), _isConnected(false), _workStatus(NodeCreated), _exitStatus(ExitInvalid), _instance(NULL), _syncCallTimeoutMs(0), _nodeErrCode(Success), _limitSize(Buffer16kMaxLimit), #ifdef __LINUX__ _nodename(NULL), _servname(NULL), _dnsThread(0), _dnsThreadExit(false), _dnsErrorCode(0), _addrinfo(NULL), _dnsThreadRunning(false), _dnsEvent(NULL), #endif _sslHandle(NULL), _nativeSslHandle(NULL), _enableRecvTv(false), _enableOnMessage(false), _launchEvent(NULL), #ifdef ENABLE_PRECONNECTED_POOL _startWithPoolEvent(NULL), _poolIndex(-1), #endif _singleRoundTextEvent(NULL), _isSendSingleRoundText(false), _connectEvent(NULL), _readEvent(NULL), _writeEvent(NULL), #ifdef ENABLE_CONTINUED _reconnectEvent(NULL), #endif _inEventCallbackNode(false), _releasingFlag(false), _waitEventCallbackAbnormally(false) { _binaryEvBuffer = evbuffer_new(); if (_binaryEvBuffer == NULL) { LOG_ERROR("_binaryEvBuffer is nullptr"); } evbuffer_enable_locking(_binaryEvBuffer, NULL); _readEvBuffer = evbuffer_new(); if (_readEvBuffer == NULL) { LOG_ERROR("_readEvBuffer is nullptr"); } _cmdEvBuffer = evbuffer_new(); if (_cmdEvBuffer == NULL) { LOG_ERROR("_cmdEvBuffer is nullptr"); } _wwvEvBuffer = evbuffer_new(); if (_wwvEvBuffer == NULL) { LOG_ERROR("_wwvEvBuffer is nullptr"); } evbuffer_enable_locking(_readEvBuffer, NULL); evbuffer_enable_locking(_cmdEvBuffer, NULL); evbuffer_enable_locking(_wwvEvBuffer, NULL); _sslHandle = new SSLconnect(); if (_sslHandle == NULL) { LOG_ERROR("Node(%p) _sslHandle is nullptr.", this); } else { _nativeSslHandle = _sslHandle; } LOG_INFO( "Request(%p) Node(%p) create ConnectNode include webSocketTcp:%p and " "SSL:%p, native " "SSL:%p", request, this, &_webSocket, _sslHandle, _nativeSslHandle); _webSocket.setConnectNode((void *)this); // will update parameters in updateParameters() _enableRecvTv = request->getRequestParam()->getEnableRecvTimeout(); utility::TextUtils::GetTimevalFromMs( &_recvTv, request->getRequestParam()->getRecvTimeout()); utility::TextUtils::GetTimevalFromMs( &_sendTv, request->getRequestParam()->getSendTimeout()); utility::TextUtils::GetTimevalFromMs( &_connectTv, request->getRequestParam()->getTimeout()); _enableOnMessage = request->getRequestParam()->getEnableOnMessage(); #ifdef ENABLE_HIGH_EFFICIENCY _connectTimerTv.tv_sec = 0; _connectTimerTv.tv_usec = ConnectTimerIntervalMs * 1000; _connectTimerFlag = true; _connectTimerEvent = NULL; #endif #ifdef __LINUX__ _gaicbRequest[0] = NULL; #endif #if defined(_MSC_VER) _mtxNode = CreateMutex(NULL, FALSE, NULL); _mtxCloseNode = CreateMutex(NULL, FALSE, NULL); _mtxEventCallbackNode = CreateEvent(NULL, FALSE, FALSE, NULL); _mtxInvokeSyncCallNode = CreateEvent(NULL, FALSE, FALSE, NULL); #else pthread_mutex_init(&_mtxNode, NULL); pthread_mutex_init(&_mtxCloseNode, NULL); pthread_mutex_init(&_mtxEventCallbackNode, NULL); pthread_mutex_init(&_mtxInvokeSyncCallNode, NULL); pthread_cond_init(&_cvEventCallbackNode, NULL); pthread_cond_init(&_cvInvokeSyncCallNode, NULL); #endif #ifdef ENABLE_REQUEST_RECORDING _nodeProcess.last_status = NodeCreated; _nodeProcess.create_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.last_op_timestamp_ms = _nodeProcess.create_timestamp_ms; #endif _nodeUUID = utility::TextUtils::getRandomUuid(); LOG_INFO( "Node(%p) create ConnectNode done with long connection flag:%s, the UUID " "is %s", this, _isLongConnection ? "True" : "False", _nodeUUID.c_str()); } ConnectNode::~ConnectNode() { LOG_DEBUG("Node(%p) destroy ConnectNode begin.", this); #ifdef __LINUX__ if (_url._enableSysGetAddr) { if (_dnsThread) { LOG_WARN("Node(%p) dnsThread(%lu) still exist, waiting exiting", this, _dnsThread); _dnsThreadExit = true; pthread_join(_dnsThread, NULL); LOG_WARN("Node(%p) dnsThread(%lu) exited.", this, _dnsThread); _dnsThread = 0; if (_gaicbRequest[0]) { free(_gaicbRequest[0]); _gaicbRequest[0] = NULL; } } else { LOG_DEBUG("Node(%p) dnsThread has exited.", this); } } #endif waitEventCallback(); closeConnectNode(); if (_eventThread) { _eventThread->freeListNode(_eventThread, _request); } _request = NULL; if (_sslHandle) { if (_sslHandle == _nativeSslHandle) { LOG_INFO("Node(%p) delete _sslHandle:%p.", this, _sslHandle); delete _sslHandle; _sslHandle = NULL; _nativeSslHandle = NULL; } else { LOG_INFO( "Node(%p) _sslHandle:%p does not belong to it with " "_nativeSslHandle:%p, should not delete.", this, _sslHandle, _nativeSslHandle); } } _handler = NULL; if (_cmdEvBuffer) { evbuffer_free(_cmdEvBuffer); _cmdEvBuffer = NULL; } if (_readEvBuffer) { evbuffer_free(_readEvBuffer); _readEvBuffer = NULL; } if (_binaryEvBuffer) { evbuffer_free(_binaryEvBuffer); _binaryEvBuffer = NULL; } if (_wwvEvBuffer) { evbuffer_free(_wwvEvBuffer); _wwvEvBuffer = NULL; } if (_launchEvent) { event_free(_launchEvent); _launchEvent = NULL; } #ifdef ENABLE_PRECONNECTED_POOL if (_startWithPoolEvent) { event_free(_startWithPoolEvent); _startWithPoolEvent = NULL; } _poolIndex = -1; #endif if (_singleRoundTextEvent) { event_free(_singleRoundTextEvent); _singleRoundTextEvent = NULL; } _isSendSingleRoundText = false; #ifdef ENABLE_CONTINUED if (_reconnectEvent) { event_free(_reconnectEvent); _reconnectEvent = NULL; } #endif if (_eventThread) { if (_dnsRequest && _dnsRequestCallbackStatus == 1) { LOG_DEBUG( "Node(%p) cancel _dnsRequest(%p), current " "event_count_active_added_virtual:%d", this, _dnsRequest, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_ACTIVE | EVENT_BASE_COUNT_ADDED | EVENT_BASE_COUNT_VIRTUAL)); LOG_DEBUG( "Node(%p) cancel _dnsRequest(%p), current event_count_active:%d", this, _dnsRequest, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_ACTIVE)); LOG_DEBUG( "Node(%p) cancel _dnsRequest(%p), current event_count_virtual:%d", this, _dnsRequest, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_VIRTUAL)); LOG_DEBUG("Node(%p) cancel _dnsRequest(%p), current event_count_added:%d", this, _dnsRequest, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_ADDED)); if (_waitEventCallbackAbnormally && _workStatus == NodeConnecting) { LOG_WARN( "Node(%p) is in an exception and NodeConnecting, skipping " "evdns_getaddrinfo_cancel."); } else { evdns_getaddrinfo_cancel(_dnsRequest); } } LOG_DEBUG( "Node(%p) get event_count_active_added_virtual %d in deconstructing " "ConnectNode.", this, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_ACTIVE | EVENT_BASE_COUNT_ADDED | EVENT_BASE_COUNT_VIRTUAL)); LOG_DEBUG( "Node(%p) get event_count_active %d in deconstructing ConnectNode.", this, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_ACTIVE)); LOG_DEBUG( "Node(%p) get event_count_virtual %d in deconstructing ConnectNode.", this, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_VIRTUAL)); LOG_DEBUG( "Node(%p) get event_count_added %d in deconstructing ConnectNode.", this, event_base_get_num_events(_eventThread->_workBase, EVENT_BASE_COUNT_ADDED)); } _eventThread = NULL; if (_nlsEncoder) { _nlsEncoder->destroyNlsEncoder(); delete _nlsEncoder; _nlsEncoder = NULL; } if (_audioFrame) { free(_audioFrame); _audioFrame = NULL; } _audioFrameSize = 0; _maxFrameSize = 0; _isFirstAudioFrame = true; #if defined(_MSC_VER) CloseHandle(_mtxNode); CloseHandle(_mtxCloseNode); CloseHandle(_mtxEventCallbackNode); CloseHandle(_mtxInvokeSyncCallNode); #else pthread_mutex_destroy(&_mtxNode); pthread_mutex_destroy(&_mtxCloseNode); pthread_mutex_destroy(&_mtxEventCallbackNode); pthread_mutex_destroy(&_mtxInvokeSyncCallNode); pthread_cond_destroy(&_cvEventCallbackNode); pthread_cond_destroy(&_cvInvokeSyncCallNode); #endif _inEventCallbackNode = false; _instance = NULL; LOG_DEBUG("Node(%p) destroy ConnectNode done.", this); } /** * @brief: 长链接模式下完成一轮交互初始化参数而非释放 * @return: */ void ConnectNode::initAllStatus() { MUTEX_LOCK(_mtxNode); _isFirstBinaryFrame = true; _isStop = false; _isDestroy = false; _isWakeStop = false; _workStatus = NodeCreated; _exitStatus = ExitInvalid; MUTEX_UNLOCK(_mtxNode); } /** * @brief: 获得用于启动当前node的libevent, 用于启动当前请求 * @return: libevent的event指针 */ struct event *ConnectNode::getLaunchEvent(bool init) { if (_launchEvent == NULL) { _launchEvent = event_new(_eventThread->_workBase, -1, EV_READ, WorkThread::launchEventCallback, this); if (NULL == _launchEvent) { LOG_ERROR("Node(%p) new event(_launchEvent) failed.", this); } else { LOG_DEBUG("Node(%p) new event(_launchEvent).", this); } } else { if (init) { event_del(_launchEvent); int assign_ret = event_assign(_launchEvent, _eventThread->_workBase, -1, EV_READ, WorkThread::launchEventCallback, this); LOG_DEBUG("Node(%p) new event_assign(_launchEvent) with ret:%d.", this, assign_ret); } } return _launchEvent; } #ifdef ENABLE_PRECONNECTED_POOL struct event *ConnectNode::getStartWithPoolEvent(bool init) { if (_startWithPoolEvent == NULL) { _startWithPoolEvent = event_new(_eventThread->_workBase, -1, EV_READ, WorkThread::startWithPoolEventCallback, this); if (NULL == _startWithPoolEvent) { LOG_ERROR("Node(%p) new event(_startWithPoolEvent) failed.", this); } else { LOG_DEBUG("Node(%p) new event(_startWithPoolEvent).", this); } } else { if (init) { event_del(_startWithPoolEvent); int assign_ret = event_assign(_startWithPoolEvent, _eventThread->_workBase, -1, EV_READ, WorkThread::startWithPoolEventCallback, this); LOG_DEBUG("Node(%p) new event_assign(_startWithPoolEvent) with ret:%d.", this, assign_ret); } } return _startWithPoolEvent; } #endif struct event *ConnectNode::getSingleRoundTextEvent() { if (_singleRoundTextEvent == NULL) { _singleRoundTextEvent = event_new(_eventThread->_workBase, -1, EV_READ, WorkThread::singleRoundTextEventCallback, this); if (NULL == _singleRoundTextEvent) { LOG_ERROR("Node(%p) new event(_singleRoundTextEvent) failed.", this); } else { LOG_DEBUG("Node(%p) new event(_singleRoundTextEvent).", this); } } else { event_del(_singleRoundTextEvent); int assign_ret = event_assign(_singleRoundTextEvent, _eventThread->_workBase, -1, EV_READ, WorkThread::singleRoundTextEventCallback, this); LOG_DEBUG("Node(%p) new event_assign(_singleRoundTextEvent) with ret:%d.", this, assign_ret); } return _singleRoundTextEvent; } /** * @brief: 获得当前node的运行状态 * @return: 当前node的运行状态枚举值 */ ConnectStatus ConnectNode::getConnectNodeStatus() { MUTEX_LOCK(_mtxNode); ConnectStatus status = _workStatus; MUTEX_UNLOCK(_mtxNode); return status; } /** * @brief: 获得当前node的运行状态 * @return: 当前node的运行状态枚举值对应字符串 */ std::string ConnectNode::getConnectNodeStatusString() { MUTEX_LOCK(_mtxNode); std::string ret_str = getConnectNodeStatusStringLocked(); MUTEX_UNLOCK(_mtxNode); return ret_str; } std::string ConnectNode::getConnectNodeStatusStringLocked() { return getConnectNodeStatusString(_workStatus); } std::string ConnectNode::getConnectNodeStatusString(ConnectStatus status) { std::string ret_str("Unknown"); switch (status) { case NodeInvalid: ret_str.assign("NodeInvalid"); break; case NodeCreated: ret_str.assign("NodeCreated"); break; case NodeInvoking: ret_str.assign("NodeInvoking"); break; case NodeInvoked: ret_str.assign("NodeInvoked"); break; case NodeConnecting: ret_str.assign("NodeConnecting"); break; case NodeConnected: ret_str.assign("NodeConnected"); break; case NodeHandshaking: ret_str.assign("NodeHandshaking"); break; case NodeHandshaked: ret_str.assign("NodeHandshaked"); break; case NodeStarting: ret_str.assign("NodeStarting"); break; case NodeStarted: ret_str.assign("NodeStarted"); break; case NodeWakeWording: ret_str.assign("NodeWakeWording"); break; case NodeFailed: ret_str.assign("NodeFailed"); break; case NodeCompleted: ret_str.assign("NodeCompleted"); break; case NodeClosed: ret_str.assign("NodeClosed"); break; case NodeReleased: ret_str.assign("NodeReleased"); break; case NodeStop: ret_str.assign("NodeStop"); break; case NodeCancel: ret_str.assign("NodeCancel"); break; case NodeSendAudio: ret_str.assign("NodeSendAudio"); break; case NodeSendControl: ret_str.assign("NodeSendControl"); break; case NodePlayAudio: ret_str.assign("NodePlayAudio"); break; case NodeSendText: ret_str.assign("NodeSendText"); break; default: LOG_ERROR("Current invalid node status:%d.", status); } return ret_str; } /** * @brief: 设置当前node的运行状态 * @return: */ void ConnectNode::setConnectNodeStatus(ConnectStatus status) { MUTEX_LOCK(_mtxNode); _workStatus = status; #ifdef ENABLE_REQUEST_RECORDING _nodeProcess.last_status = _workStatus; #endif MUTEX_UNLOCK(_mtxNode); } /** * @brief: 获得当前node的退出状态 * @return: */ ExitStatus ConnectNode::getExitStatus() { MUTEX_LOCK(_mtxNode); ExitStatus ret = _exitStatus; MUTEX_UNLOCK(_mtxNode); return ret; } /** * @brief: 获得当前node的退出状态 * @return: 当前node的退出状态枚举值对应字符串 */ std::string ConnectNode::getExitStatusString() { MUTEX_LOCK(_mtxNode); std::string ret_str = "Unknown"; switch (_exitStatus) { case ExitInvalid: ret_str.assign("ExitInvalid"); break; case ExitStopping: ret_str.assign("ExitStopping"); break; case ExitCancel: ret_str.assign("ExitCancel"); break; default: LOG_ERROR("Current invalid exit status:%d.", _exitStatus); } MUTEX_UNLOCK(_mtxNode); return ret_str; } /** * @brief: node运行指令对应的字符串 * @return: */ std::string ConnectNode::getCmdTypeString(int type) { std::string ret_str = "Unknown"; switch (type) { case CmdStart: ret_str.assign("CmdStart"); break; case CmdStop: ret_str.assign("CmdStop"); break; case CmdStControl: ret_str.assign("CmdStControl"); break; case CmdTextDialog: ret_str.assign("CmdTextDialog"); break; case CmdExecuteDialog: ret_str.assign("CmdExecuteDialog"); break; case CmdWarkWord: ret_str.assign("CmdWarkWord"); break; case CmdCancel: ret_str.assign("CmdCancel"); break; case CmdSendText: ret_str.assign("CmdSendText"); break; case CmdSendPing: ret_str.assign("CmdSendPing"); break; case CmdSendFlush: ret_str.assign("CmdSendFlush"); break; } return ret_str; } bool ConnectNode::updateDestroyStatus() { MUTEX_LOCK(_mtxNode); bool ret = true; if (!_isDestroy) { #ifdef __LINUX__ if (_url._enableSysGetAddr) { if (_dnsThread) { LOG_WARN("Node(%p) dnsThread(%lu) still exist, waiting exiting", this, _dnsThread); _dnsThreadExit = true; pthread_join(_dnsThread, NULL); LOG_WARN("Node(%p) dnsThread(%lu) exited.", this, _dnsThread); _dnsThread = 0; } else { LOG_DEBUG("Node(%p) dnsThread has exited.", this); } } #endif _isDestroy = true; ret = false; } else { LOG_DEBUG("Node(%p) _isDestroy is true, do nothing ...", this); } MUTEX_UNLOCK(_mtxNode); return ret; } bool ConnectNode::getWakeStatus() { MUTEX_LOCK(_mtxNode); bool ret = _isWakeStop; MUTEX_UNLOCK(_mtxNode); return ret; } bool ConnectNode::checkConnectCount() { MUTEX_LOCK(_mtxNode); bool result = false; if (_retryConnectCount < RetryConnectCount) { _retryConnectCount++; result = true; } else { _retryConnectCount = 0; // return false : restart connect failed } LOG_INFO("Node(%p) check connection count: %d.", this, _retryConnectCount); MUTEX_UNLOCK(_mtxNode); return result; } bool ConnectNode::parseUrlInformation(char *directIp) { if (_request == NULL) { LOG_ERROR("Node(%p) this request is nullptr.", this); return false; } const char *address = _request->getRequestParam()->_url.c_str(); const char *token = _request->getRequestParam()->_token.c_str(); size_t tokenSize = _request->getRequestParam()->_token.size(); memset(&_url, 0x0, sizeof(struct urlAddress)); if (directIp && strnlen(directIp, 64) > 0) { LOG_DEBUG("Node(%p) direct ip address: %s.", this, directIp); if (sscanf(directIp, "%256[^:/]:%d", _url._address, &_url._port) == 2) { _url._directIp = true; } else if (sscanf(directIp, "%255s", _url._address) == 1) { _url._directIp = true; } else { LOG_ERROR("Node(%p) could not parse WebSocket direct ip:%s.", this, directIp); return false; } } LOG_INFO("Node(%p) address: %s.", this, address); if (WebSocketTcp::parseUrlAddress(_url, address) != Success) { LOG_ERROR("Node(%p) could not parse WebSocket url: %s.", this, address); return false; } memcpy(_url._token, token, tokenSize); LOG_INFO("Node(%p) type:%s, host:%s, port:%d, path:%s.", this, _url._type, _url._host, _url._port, _url._path); return true; } /** * @brief: 关闭ssl并释放event, 调用后会进行重链操作 * @return: */ void ConnectNode::disconnectProcess() { bool lock_ret = true; MUTEX_TRY_LOCK(_mtxCloseNode, 2000, lock_ret); if (!lock_ret) { LOG_ERROR("Node(%p) disconnectProcess, deadlock has occurred", this); if (_releasingFlag || _exitStatus == ExitCancel) { LOG_ERROR( "Node(%p) in the process of releasing/canceling, skip " "disconnectProcess.", this); _isConnected = false; LOG_DEBUG( "Node(%p) disconnectProcess done, current node status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); return; } } LOG_DEBUG( "Node(%p) disconnectProcess begin, current node status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); if (_socketFd != INVALID_SOCKET) { if (_sslHandle == _nativeSslHandle) { if (_url._isSsl) { _sslHandle->sslClose(); } evutil_closesocket(_socketFd); _socketFd = INVALID_SOCKET; } else { LOG_INFO( "Node(%p) _sslHandle:%p and _socketFd:%d does not belong to it with " "_nativeSslHandle:%p, should not close.", this, _sslHandle, _socketFd, _nativeSslHandle); } if (_url._enableSysGetAddr && _dnsEvent) { event_free(_dnsEvent); _dnsEvent = NULL; } } _isConnected = false; LOG_DEBUG( "Node(%p) disconnectProcess done, current node status:%s exit status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); if (lock_ret) { MUTEX_UNLOCK(_mtxCloseNode); } } /** * @brief: 当前Node切换到close状态, 而不进行断网, 用于长链接模式 * @return: */ void ConnectNode::closeStatusConnectNode() { MUTEX_LOCK(_mtxCloseNode); LOG_DEBUG( "Node(%p) closeStatusConnectNode begin, current node status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); if (_nlsEncoder) { _nlsEncoder->nlsEncoderSoftRestart(); } if (_audioFrame) { free(_audioFrame); _audioFrame = NULL; } _audioFrameSize = 0; _maxFrameSize = 0; _isFirstAudioFrame = true; _connectTimerFlag = false; LOG_DEBUG( "Node(%p) closeStatusConnectNode done, current node status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); MUTEX_UNLOCK(_mtxCloseNode); } /** * @brief: 当前Node切换到close状态, 而不进行断网, 用于预链接模式 * @return: */ void ConnectNode::closeStatusConnectNodeForConnectedPool() { bool lock_ret = true; MUTEX_TRY_LOCK(_mtxCloseNode, 2000, lock_ret); if (!lock_ret) { LOG_ERROR( "Node(%p) closeStatusConnectNodeForConnectedPool, deadlock has " "occurred", this); if (_releasingFlag || _exitStatus == ExitCancel) { LOG_ERROR( "Node(%p) in the process of releasing/canceling, skip " "closeStatusConnectNodeForConnectedPool.", this); _isConnected = false; if (_audioFrame) { free(_audioFrame); _audioFrame = NULL; } _audioFrameSize = 0; _maxFrameSize = 0; _isFirstAudioFrame = true; LOG_INFO( "Node(%p) closeStatusConnectNodeForConnectedPool done, current node " "status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); return; } } LOG_DEBUG( "Node(%p) closeStatusConnectNodeForConnectedPool begin, current node " "status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); if (_nlsEncoder) { _nlsEncoder->nlsEncoderSoftRestart(); } _isConnected = false; if (_url._enableSysGetAddr && _dnsEvent) { event_del(_dnsEvent); event_free(_dnsEvent); _dnsEvent = NULL; } if (_readEvent) { event_del(_readEvent); event_free(_readEvent); _readEvent = NULL; // LOG_INFO("Node(%p) remove _readEvent", this); } if (_writeEvent) { event_del(_writeEvent); event_free(_writeEvent); _writeEvent = NULL; } if (_connectEvent) { event_del(_connectEvent); event_free(_connectEvent); _connectEvent = NULL; } if (_launchEvent) { event_del(_launchEvent); event_free(_launchEvent); _launchEvent = NULL; } #ifdef ENABLE_PRECONNECTED_POOL if (_startWithPoolEvent) { event_del(_startWithPoolEvent); event_free(_startWithPoolEvent); _startWithPoolEvent = NULL; } #endif #ifdef ENABLE_HIGH_EFFICIENCY if (_connectTimerEvent != NULL) { if (_connectTimerFlag) { evtimer_del(_connectTimerEvent); _connectTimerFlag = false; } event_free(_connectTimerEvent); _connectTimerEvent = NULL; } #endif if (_audioFrame) { free(_audioFrame); _audioFrame = NULL; } _audioFrameSize = 0; _maxFrameSize = 0; _isFirstAudioFrame = true; _connectTimerFlag = false; LOG_DEBUG( "Node(%p) closeStatusConnectNodeForConnectedPool done, current node " "status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); if (lock_ret) { MUTEX_UNLOCK(_mtxCloseNode); } } /** * @brief: 关闭ssl并释放event, 并设置node状态, 调用后往往进行释放操作. * @return: */ void ConnectNode::closeConnectNode() { bool lock_ret = true; MUTEX_TRY_LOCK(_mtxCloseNode, 2000, lock_ret); if (!lock_ret) { LOG_ERROR("Node(%p) closeConnectNode, deadlock has occurred", this); if (_releasingFlag || _exitStatus == ExitCancel) { LOG_ERROR( "Node(%p) in the process of releasing/canceling, skip " "closeConnectNode.", this); _isConnected = false; if (_audioFrame) { free(_audioFrame); _audioFrame = NULL; } _audioFrameSize = 0; _maxFrameSize = 0; _isFirstAudioFrame = true; LOG_INFO( "Node(%p) closeConnectNode done, current node status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); return; } } LOG_DEBUG( "Node(%p) closeConnectNode begin, current node status:%s exit status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); if (_socketFd != INVALID_SOCKET) { if (_sslHandle == _nativeSslHandle) { if (_url._isSsl) { _sslHandle->sslClose(); } evutil_closesocket(_socketFd); _socketFd = INVALID_SOCKET; } else { LOG_INFO( "Node(%p) _sslHandle:%p and _socketFd:%d does not belong to it with " "_nativeSslHandle:%p, should not close SSL, which will close and " "release by itself.", this, _sslHandle, _socketFd, _nativeSslHandle); } } _isConnected = false; if (_url._enableSysGetAddr && _dnsEvent) { event_del(_dnsEvent); event_free(_dnsEvent); _dnsEvent = NULL; } if (_readEvent) { event_del(_readEvent); event_free(_readEvent); _readEvent = NULL; } if (_writeEvent) { event_del(_writeEvent); event_free(_writeEvent); _writeEvent = NULL; } if (_connectEvent) { event_del(_connectEvent); event_free(_connectEvent); _connectEvent = NULL; } if (_launchEvent) { event_del(_launchEvent); event_free(_launchEvent); _launchEvent = NULL; } #ifdef ENABLE_PRECONNECTED_POOL if (_startWithPoolEvent) { event_del(_startWithPoolEvent); event_free(_startWithPoolEvent); _startWithPoolEvent = NULL; } #endif #ifdef ENABLE_HIGH_EFFICIENCY if (_connectTimerEvent != NULL) { if (_connectTimerFlag) { evtimer_del(_connectTimerEvent); _connectTimerFlag = false; } event_free(_connectTimerEvent); _connectTimerEvent = NULL; } #endif if (_audioFrame) { free(_audioFrame); _audioFrame = NULL; } _audioFrameSize = 0; _maxFrameSize = 0; _isFirstAudioFrame = true; LOG_INFO( "Node(%p) closeConnectNode done, current node status:%s exit status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); if (lock_ret) { MUTEX_UNLOCK(_mtxCloseNode); } } int ConnectNode::socketWrite(const uint8_t *buffer, size_t len) { #if defined(__ANDROID__) || defined(__linux__) int wLen = send(_socketFd, (const char *)buffer, len, MSG_NOSIGNAL); #else int wLen = send(_socketFd, (const char *)buffer, len, 0); #endif if (wLen < 0) { int errorCode = utility::getLastErrorCode(); if (NLS_ERR_RW_RETRIABLE(errorCode)) { // LOG_DEBUG("Node(%p) socketWrite continue.", this); return Success; } else { return -(SocketWriteFailed); } } else { return wLen; } } int ConnectNode::socketRead(uint8_t *buffer, size_t len) { int rLen = recv(_socketFd, (char *)buffer, len, 0); if (rLen <= 0) { // rLen == 0, close socket, need do int errorCode = utility::getLastErrorCode(); if (NLS_ERR_RW_RETRIABLE(errorCode)) { // LOG_DEBUG("Node(%p) socketRead continue.", this); return Success; } else { return -(SocketReadFailed); } } else { return rLen; } } int ConnectNode::gatewayRequest() { REQUEST_CHECK(_request, this); if (NULL == _readEvent) { LOG_ERROR("Node(%p) _readEvent is nullptr.", this); return -(EventEmpty); } if (_enableRecvTv) { utility::TextUtils::GetTimevalFromMs( &_recvTv, _request->getRequestParam()->getRecvTimeout()); event_add(_readEvent, &_recvTv); } else { event_add(_readEvent, NULL); } char tmp[NodeFrameSize] = {0}; int tmpLen = _webSocket.requestPackage( &_url, tmp, _request->getRequestParam()->GetHttpHeader()); if (tmpLen < 0) { LOG_DEBUG("Node(%p) WebSocket request string failed.", this); return -(GetHttpHeaderFailed); }; evbuffer_add(_cmdEvBuffer, (void *)tmp, tmpLen); return Success; } /** * @brief: 获取gateway的响应 * @return: 成功则返回收到的字节数, 失败则返回负值. */ int ConnectNode::gatewayResponse() { int ret = 0; int read_len = 0; uint8_t *frame = (uint8_t *)calloc(ReadBufferSize, sizeof(char)); if (frame == NULL) { LOG_ERROR("Node(%p) %s %d calloc failed.", this, __func__, __LINE__); return -(MallocFailed); } read_len = nlsReceive(frame, ReadBufferSize); if (read_len < 0) { LOG_ERROR("Node(%p) nlsReceive failed, read_len:%d", this, read_len); free(frame); return -(NlsReceiveFailed); } else if (read_len == 0) { LOG_WARN("Node(%p) nlsReceive empty, read_len:%d", this, read_len); free(frame); return -(NlsReceiveEmpty); } int frameSize = evbuffer_get_length(_readEvBuffer); if (frameSize > ReadBufferSize) { uint8_t *tmp = (uint8_t *)realloc(frame, frameSize + 1); if (NULL == tmp) { LOG_ERROR("Node(%p) %s %d realloc failed.", this, __func__, __LINE__); free(frame); return -(ReallocFailed); } else { frame = tmp; } } evbuffer_copyout(_readEvBuffer, frame, frameSize); // evbuffer_peek ret = _webSocket.responsePackage((const char *)frame, frameSize); if (ret == 0) { evbuffer_drain(_readEvBuffer, frameSize); } else if (ret > 0) { LOG_DEBUG("Node(%p) GateWay Middle response: %d\n %s", this, frameSize, frame); } else { _nodeErrMsg = _webSocket.getFailedMsg(); LOG_ERROR("Node(%p) webSocket.responsePackage: %s", this, _nodeErrMsg.c_str()); } if (frame) free(frame); frame = NULL; return ret; } /** * @brief: 将buffer中剩余音频数据ws封包并发送 * @return: 成功发送的字节数, 失败则返回负值. */ int ConnectNode::addRemainAudioData() { int ret = 0; if (_audioFrame != NULL && _audioFrameSize > 0) { ret = addAudioDataBuffer(_audioFrame, _audioFrameSize); memset(_audioFrame, 0, _maxFrameSize); _audioFrameSize = 0; } return ret; } /** * @brief: 将音频数据切片或填充后再ws封包并发送 * @param frame 用户传入的数据 * @param length 用户传入的数据字节数 * @return: 成功发送的字节数(可能为0, 留下一包数据发送), 失败则返回负值. */ int ConnectNode::addSlicedAudioDataBuffer(const uint8_t *frame, size_t length) { int filling_ret = 0; if (_nlsEncoder && _encoderType != ENCODER_NONE) { #ifdef ENABLE_NLS_DEBUG // LOG_DEBUG("Node(%p) addSlicedAudioDataBuffer input data %d bytes.", this, // length); #endif _maxFrameSize = _nlsEncoder->getFrameSampleBytes(); if (_maxFrameSize <= 0) { return _maxFrameSize; } if (_audioFrame == NULL) { _audioFrame = (unsigned char *)calloc(_maxFrameSize, sizeof(unsigned char *)); if (_audioFrame == NULL) { LOG_ERROR("Node(%p) malloc audio_data_buffer failed.", this); return -(MallocFailed); #ifdef ENABLE_NLS_DEBUG } else { LOG_DEBUG("Node(%p) create audio frame data %d bytes.", this, _maxFrameSize); #endif } _audioFrameSize = 0; } int ret = 0; size_t frame_used_size = 0; /*frame已经传入buffer的字节数*/ size_t frame_remain_size = length; /*frame未传入buffer的字节数*/ do { size_t buffer_space_size = _maxFrameSize - _audioFrameSize; /*buffer中空闲空间, 最大为_maxFrameSize*/ if (frame_remain_size < buffer_space_size) { memcpy(_audioFrame + _audioFrameSize, frame + frame_used_size, frame_remain_size); _audioFrameSize += frame_remain_size; frame_used_size += frame_remain_size; frame_remain_size = 0; } else { memcpy(_audioFrame + _audioFrameSize, frame + frame_used_size, buffer_space_size); _audioFrameSize += buffer_space_size; frame_used_size += buffer_space_size; frame_remain_size -= buffer_space_size; } if (_audioFrameSize >= _maxFrameSize) { /*每次填充完整的一包数据*/ ret = addAudioDataBuffer(_audioFrame, _maxFrameSize); memset(_audioFrame, 0, _maxFrameSize); filling_ret += _maxFrameSize; _audioFrameSize = 0; #ifdef ENABLE_NLS_DEBUG // LOG_DEBUG( // "Node(%p) ready to push audio data(%d) into nls_buffer, has // digest " // "%dbytes, ret:%d.", // this, _maxFrameSize, frame_used_size, ret); #endif if (ret < 0) { filling_ret = ret; break; } } else { if (_isFirstAudioFrame == false && _encoderType != ENCODER_OPU) { /*数据不足一包, 且非第一包数据. OPU第一包如果未满, 则会编码失败*/ ret = addAudioDataBuffer(_audioFrame, _audioFrameSize); filling_ret += _audioFrameSize; #ifdef ENABLE_NLS_DEBUG // LOG_DEBUG( // "Node(%p) ready to push audio data(%d) into nls_buffer, has " // "digest %dbytes, ret:%d.", // this, _audioFrameSize, frame_used_size, ret); #endif memset(_audioFrame, 0, _maxFrameSize); _audioFrameSize = 0; if (ret < 0) { filling_ret = ret; break; } } else { #ifdef ENABLE_NLS_DEBUG // LOG_DEBUG("Node(%p) leave audio data(%d) for the next round.", // this, // _audioFrameSize); #endif break; } } } while (frame_used_size < length); } else { filling_ret = addAudioDataBuffer(frame, length); } #ifdef ENABLE_NLS_DEBUG // LOG_DEBUG("Node(%p) pushed audio data(%d:%d) into audio_data_tmp_buffer.", // this, length, filling_ret); #endif return filling_ret; } /** * @brief: 将音频数据进行ws封包并发送 * @param frame 用户传入的数据 * @param frameSize 用户传入的数据字节数 * @return: 成功发送的字节数(可能为0, 留下一包数据发送), 失败则返回负值. */ int ConnectNode::addAudioDataBuffer(const uint8_t *frame, size_t frameSize) { REQUEST_CHECK(_request, this); int ret = 0; uint8_t *tmp = NULL; size_t tmpSize = 0; size_t length = 0; struct evbuffer *buff = NULL; if (frame == NULL || frameSize == 0) { return -(NlsEncodingFailed); } if (_nlsEncoder && _encoderType != ENCODER_NONE) { uint8_t *outputBuffer = new uint8_t[frameSize]; if (outputBuffer == NULL) { LOG_ERROR("Node(%p) new outputBuffer failed.", this); return -(NewOutputBufferFailed); } else { memset(outputBuffer, 0, frameSize); int nSize = _nlsEncoder->nlsEncoding(frame, (int)frameSize, outputBuffer, (int)frameSize); #ifdef ENABLE_NLS_DEBUG // LOG_DEBUG( // "Node(%p) Opus encoder(%d) encoding %dbytes data, and return " // "nSize:%d.", // this, _encoderType, frameSize, nSize); #endif if (nSize < 0) { LOG_ERROR("Node(%p) Opus encoder failed:%d.", this, nSize); delete[] outputBuffer; return -(NlsEncodingFailed); } _webSocket.binaryFrame(outputBuffer, nSize, &tmp, &tmpSize); delete[] outputBuffer; } } else { // pack frame data _webSocket.binaryFrame(frame, frameSize, &tmp, &tmpSize); } if (_request && _request->getRequestParam()->_enableWakeWord == true && !getWakeStatus()) { buff = _wwvEvBuffer; } else { buff = _binaryEvBuffer; } evbuffer_lock(buff); length = evbuffer_get_length(buff); if (length >= _limitSize) { LOG_WARN("Too many audio data in evbuffer."); evbuffer_unlock(buff); return -(EvbufferTooMuch); } evbuffer_add(buff, (void *)tmp, tmpSize); if (tmp) free(tmp); tmp = NULL; evbuffer_unlock(buff); if (length == 0 && _workStatus == NodeStarted) { MUTEX_LOCK(_mtxNode); if (!_isStop) { ret = nlsSendFrame(buff); } MUTEX_UNLOCK(_mtxNode); } if (length == 0 && _workStatus == NodeWakeWording) { MUTEX_LOCK(_mtxNode); if (!_isStop) { ret = nlsSendFrame(buff); } MUTEX_UNLOCK(_mtxNode); } if (ret == 0) { ret = sendControlDirective(); } if (ret < 0) { disconnectProcess(); handlerTaskFailedEvent(getErrorMsg()); } else { ret = frameSize; _isFirstAudioFrame = false; } return ret; } /** * @brief: 发送控制命令 * @return: 成功发送的字节数, 失败则返回负值. */ int ConnectNode::sendControlDirective() { MUTEX_LOCK(_mtxNode); int ret = 0; if (_workStatus == NodeStarted && _exitStatus == ExitInvalid) { size_t length = evbuffer_get_length(getCmdEvBuffer()); if (length != 0) { // LOG_DEBUG("Node(%p) cmd buffer isn't empty.", this); ret = nlsSendFrame(getCmdEvBuffer()); } } else { if (_exitStatus == ExitStopping && _isStop == false) { // LOG_DEBUG("Node(%p) audio has send done. And invoke stop command.", // this); addCmdDataBuffer(CmdStop); ret = nlsSendFrame(getCmdEvBuffer()); _isStop = true; } } MUTEX_UNLOCK(_mtxNode); return ret; } /** * @brief: 将命令字符串加入buffer用于发送 * @param type CmdType * @param message 待发送命令 * @return: */ void ConnectNode::addCmdDataBuffer(CmdType type, const char *message) { char *cmd = NULL; LOG_DEBUG("Request(%p) Node(%p) get command type: %s with %s", _request, this, getCmdTypeString(type).c_str(), getConnectNodeStatusStringLocked().c_str()); if (_request == NULL) { LOG_ERROR("The rquest of node(%p) is nullptr.", this); return; } if (_request->getRequestParam() == NULL) { LOG_ERROR("The requestParam of request(%p) node(%p) is nullptr.", _request, this); return; } switch (type) { case CmdStart: if (_reconnection.state == NodeReconnection::TriggerReconnection) { // setting tw_time_offset and tw_index_offset Json::Value root; Json::FastWriter writer; root["tw_time_offset"] = Json::UInt64(_reconnection.interruption_timestamp_ms - _reconnection.first_audio_timestamp_ms); root["tw_index_offset"] = (Json::UInt64)_reconnection.tw_index_offset; std::string buf = writer.write(root); _request->getRequestParam()->setPayloadParam(buf.c_str()); } cmd = (char *)_request->getRequestParam()->getStartCommand(); if (_reconnection.state == NodeReconnection::TriggerReconnection) { // cleaning tw_time_offset and tw_index_offset _request->getRequestParam()->removePayloadParam("tw_time_offset"); _request->getRequestParam()->removePayloadParam("tw_index_offset"); } _reconnection.state = NodeReconnection::NewReconnectionStarting; break; case CmdStControl: cmd = (char *)_request->getRequestParam()->getControlCommand(message); break; case CmdStop: cmd = (char *)_request->getRequestParam()->getStopCommand(); break; case CmdTextDialog: cmd = (char *)_request->getRequestParam()->getExecuteDialog(); break; case CmdWarkWord: cmd = (char *)_request->getRequestParam()->getStopWakeWordCommand(); break; case CmdCancel: LOG_DEBUG("Node(%p) add cancel command, do nothing.", this); return; case CmdSendText: cmd = (char *)_request->getRequestParam()->getRunFlowingSynthesisCommand( message); break; case CmdSendPing: cmd = (char *)"{ping}"; break; case CmdSendFlush: cmd = (char *)_request->getRequestParam()->getFlushFlowingTextCommand( message); break; default: LOG_WARN("Node(%p) add unknown command, do nothing.", this); return; } if (cmd) { std::string buf_str; LOG_INFO("Node(%p) get command: %s, and add into evbuffer.", this, utility::TextUtils::securityDisposalForLog(cmd, &buf_str, "appkey\":\"", 4, 'Z')); uint8_t *frame = NULL; size_t frameSize = 0; if (type == CmdSendPing) { _webSocket.pingFrame(&frame, &frameSize); } else { _webSocket.textFrame((uint8_t *)cmd, strlen(cmd), &frame, &frameSize); } evbuffer_add(_cmdEvBuffer, (void *)frame, frameSize); if (frame) free(frame); frame = NULL; } } /** * @brief: 命令发送 * @param type CmdType * @param message 待发送命令 * @return: 成功则返回发送字节数, 失败则返回负值 */ int ConnectNode::cmdNotify(CmdType type, const char *message) { int ret = Success; LOG_DEBUG("Node(%p) invoke CmdNotify: %s with %s", this, getCmdTypeString(type).c_str(), getConnectNodeStatusString().c_str()); if (type == CmdStop) { #ifdef ENABLE_REQUEST_RECORDING updateNodeProcess("stop", NodeStop, true, 0); #endif addRemainAudioData(); _exitStatus = ExitStopping; if (_workStatus == NodeStarted) { size_t length = evbuffer_get_length(_binaryEvBuffer); if (length == 0) { ret = sendControlDirective(); } else { LOG_DEBUG( "Node(%p) invoke CmdNotify: %s, and continue send audio data " "%zubytes.", this, getCmdTypeString(type).c_str(), length); } } } else if (type == CmdCancel) { #ifdef ENABLE_REQUEST_RECORDING updateNodeProcess("cancel", NodeCancel, true, 0); #endif _exitStatus = ExitCancel; } else if (type == CmdStControl) { #ifdef ENABLE_REQUEST_RECORDING updateNodeProcess("ctrl", NodeSendControl, true, 0); #endif addCmdDataBuffer(CmdStControl, message); if (_workStatus == NodeStarted) { size_t length = evbuffer_get_length(_binaryEvBuffer); if (length == 0) { ret = nlsSendFrame(_cmdEvBuffer); } } } else if (type == CmdWarkWord) { _isWakeStop = true; size_t length = evbuffer_get_length(_wwvEvBuffer); if (length == 0) { addCmdDataBuffer(CmdWarkWord); ret = nlsSendFrame(_cmdEvBuffer); } } else if (type == CmdSendText) { #ifdef ENABLE_REQUEST_RECORDING updateNodeProcess("send_text", NodeSendText, true, 0); #endif addCmdDataBuffer(CmdSendText, message); if (_workStatus == NodeStarted) { ret = nlsSendFrame(_cmdEvBuffer); _isSendSingleRoundText = false; } } else if (type == CmdSendPing) { addCmdDataBuffer(CmdSendPing, NULL); ret = nlsSendFrame(_cmdEvBuffer); } else if (type == CmdSendFlush) { addCmdDataBuffer(CmdSendFlush, message); ret = nlsSendFrame(_cmdEvBuffer); } else { LOG_ERROR("Node(%p) invoke unknown command.", this); } if (ret < 0) { disconnectProcess(); handlerTaskFailedEvent(getErrorMsg()); } #ifdef ENABLE_REQUEST_RECORDING if (type == CmdStop) { updateNodeProcess("stop", NodeStop, false, 0); } else if (type == CmdCancel) { updateNodeProcess("cancel", NodeCancel, false, 0); } else if (type == CmdStControl) { updateNodeProcess("ctrl", NodeSendControl, false, 0); } else if (type == CmdSendText) { updateNodeProcess("send_text", NodeSendText, false, 0); } #endif return ret; } #ifdef ENABLE_PRECONNECTED_POOL int ConnectNode::syncPingCmd() { if (_sslHandle == NULL || _socketFd == INVALID_SOCKET) { LOG_ERROR("Request(%p) Node(%p) invalid sslHandle(%p) and socketFd(%d).", _request, this, _sslHandle, _socketFd); return -(SslCtxEmpty); } uint8_t *frame = NULL; size_t frameSize = 0; _webSocket.pingFrame(&frame, &frameSize); int ret = nlsSend(frame, frameSize); if (ret <= 0) { LOG_ERROR("Request(%p) Node(%p) send ping frame failed(%d).", _request, this, ret); return -(SslWriteFailed); } else { // LOG_DEBUG("Node(%p) send ping frame success(%d).", ret); } return Success; } #endif int ConnectNode::nlsSend(const uint8_t *frame, size_t length) { int sLen = 0; if ((frame == NULL) || (length == 0)) { return 0; } if (_url._isSsl) { sLen = _sslHandle->sslWrite(frame, length); } else { sLen = socketWrite(frame, length); } if (sLen < 0) { if (_url._isSsl) { _nodeErrMsg = _sslHandle->getFailedMsg(); } else { _nodeErrMsg = evutil_socket_error_to_string(evutil_socket_geterror(_socketFd)); } LOG_ERROR("Node(%p) with sslHandle(%p) send failed: %s.", this, _sslHandle, _nodeErrMsg.c_str()); } return sLen; } /** * @brief: 发送一帧数据 * @return: 成功发送的字节数, 失败则返回负值. */ int ConnectNode::nlsSendFrame(struct evbuffer *eventBuffer, bool audio_frame) { int sLen = 0; uint8_t buffer[NodeFrameSize] = {0}; size_t bufferSize = 0; evbuffer_lock(eventBuffer); size_t length = evbuffer_get_length(eventBuffer); if (length == 0) { // LOG_DEBUG("Node(%p) eventBuffer is NULL.", this); evbuffer_unlock(eventBuffer); return 0; } if (length > NodeFrameSize) { bufferSize = NodeFrameSize; } else { bufferSize = length; } evbuffer_copyout(eventBuffer, buffer, bufferSize); // evbuffer_peek if (bufferSize > 0) { sLen = nlsSend(buffer, bufferSize); } if (sLen < 0) { LOG_ERROR("Node(%p) nlsSend failed, nlsSend return:%d.", this, sLen); evbuffer_unlock(eventBuffer); return -(NlsSendFailed); } else { // send data success if (audio_frame && _isFirstBinaryFrame) { _isFirstBinaryFrame = false; #ifdef ENABLE_CONTINUED _reconnection.first_audio_timestamp_ms = utility::TextUtils::GetTimestampMs(); #endif } evbuffer_drain(eventBuffer, sLen); length = evbuffer_get_length(eventBuffer); if (length > 0) { if (NULL == _writeEvent) { LOG_ERROR("Node(%p) event is nullptr.", this); evbuffer_unlock(eventBuffer); return -(EventEmpty); } utility::TextUtils::GetTimevalFromMs( &_sendTv, _request->getRequestParam()->getSendTimeout()); event_add(_writeEvent, &_sendTv); } evbuffer_unlock(eventBuffer); return length; } } int ConnectNode::nlsReceive(uint8_t *buffer, int max_size) { int rLen = 0; int read_buffer_size = max_size; if (_url._isSsl) { rLen = _sslHandle->sslRead((uint8_t *)buffer, read_buffer_size); } else { rLen = socketRead((uint8_t *)buffer, read_buffer_size); } if (rLen < 0) { if (_url._isSsl) { _nodeErrMsg = _sslHandle->getFailedMsg(); } else { _nodeErrMsg = evutil_socket_error_to_string(evutil_socket_geterror(_socketFd)); } LOG_ERROR("Request(%p) Node(%p) _sslHandle(%p) recv failed: %s.", _request, this, _sslHandle, _nodeErrMsg.c_str()); return -(ReadFailed); } evbuffer_add(_readEvBuffer, (void *)buffer, rLen); return rLen; } /** * @brief: 接收一帧数据 * @return: 成功接收的字节数, 失败则返回负值. */ int ConnectNode::webSocketResponse() { int ret = 0; int read_len = 0; if (_releasingFlag) { LOG_WARN("Node(%p) is releasing!!! skipping ...", this); return -(InvalidStatusWhenReleasing); } uint8_t *frame = (uint8_t *)calloc(ReadBufferSize, sizeof(char)); if (frame == NULL) { LOG_ERROR("%s %d calloc failed.", __func__, __LINE__); return 0; } #ifdef ENABLE_NLS_DEBUG_2 struct timeval timewait_start, timewait_a, timewait_b, timewait_end; gettimeofday(&timewait_start, NULL); #endif // receive buffer from SSL into _readEvBuffer read_len = nlsReceive(frame, ReadBufferSize); if (read_len < 0) { LOG_ERROR("Request(%p Node(%p) nlsReceive failed, read_len:%d", _request, this, read_len); free(frame); return -(NlsReceiveFailed); } else if (read_len == 0) { #ifdef ENABLE_NLS_DEBUG_2 LOG_DEBUG("Request(%p) Node(%p) nlsReceive empty, read_len:%d", _request, this, read_len); #endif free(frame); return 0; #ifdef ENABLE_NLS_DEBUG_2 } else { LOG_DEBUG("Request(%p Node(%p) nlsReceive %dbytes", _request, this, read_len); #endif } const int maxTryAgain = 3; int tryAgain = maxTryAgain; bool eLoop = false; do { ret = 0; size_t frameSize = evbuffer_get_length(_readEvBuffer); if (frameSize == 0) { free(frame); frame = NULL; ret = 0; break; } else if (frameSize > ReadBufferSize) { uint8_t *tmp = (uint8_t *)realloc(frame, frameSize + 1); if (NULL == tmp) { LOG_ERROR("Node(%p) realloc failed.", this); free(frame); frame = NULL; ret = -(ReallocFailed); break; } else { frame = tmp; LOG_WARN("Node(%p) websocket frame realloc, new size:%d.", this, frameSize + 1); } #ifdef ENABLE_NLS_DEBUG_2 } else { LOG_DEBUG("Node(%p) nlsReceive %dbytes in readEvBuffer.", this, frameSize); #endif } size_t cur_data_size = frameSize; evbuffer_copyout(_readEvBuffer, frame, frameSize); WebSocketFrame wsFrame; memset(&wsFrame, 0x0, sizeof(struct WebSocketFrame)); int recv_ret = _webSocket.receiveFullWebSocketFrame(frame, frameSize, &_wsType, &wsFrame); if (recv_ret == Success) { // LOG_DEBUG("Request(%p) Node(%p) parse websocket frame, len:%zu, frame // size:%zu, _wsType.opCode:%d, wsFrame.type:%d.", // _request, this, wsFrame.length, frameSize, _wsType.opCode, // wsFrame.type); if (_releasingFlag) { LOG_WARN("Node(%p) is releasing!!! skipping ...", this); ret = -(InvalidStatusWhenReleasing); break; } if (_wsType.opCode == WebSocketHeaderType::PONG) { LOG_DEBUG("Node(%p) receive PONG.", this); // memset(&_wsType, 0x0, sizeof(struct WebSocketHeaderType)); wsFrame.type = _wsType.opCode; } #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_a, NULL); #endif /* * Will invoke callback in parseFrame. * If blocking in callback, will block in parseFrame */ int result = parseFrame(&wsFrame); #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_b, NULL); #endif if (result) { LOG_ERROR("Node(%p) parse WS frame failed:%d.", this, result); ret = result; break; } /* Should check node here */ if (_instance == NULL) { /* Maybe user has released instance */ ret = -(EventClientEmpty); break; } else { /* Maybe user has released request.*/ NlsNodeManager *node_manager = _instance->getNodeManger(); int status = NodeStatusInvalid; ret = node_manager->checkNodeExist(this, &status); if (ret != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", this, ret); break; } } evbuffer_drain(_readEvBuffer, wsFrame.length + _wsType.headerSize); cur_data_size = cur_data_size - (wsFrame.length + _wsType.headerSize); ret = wsFrame.length + _wsType.headerSize; tryAgain = maxTryAgain; } else if (recv_ret == -(InvalidWsFrameHeaderSize) || recv_ret == -(InvalidWsFrameHeaderBody)) { if (tryAgain-- > 0) { LOG_WARN( "Request(%p) Node(%p) the WS data is insufficient, and continues " "to be " "received", _request, this); usleep(5 * 1000); read_len = nlsReceive(frame, ReadBufferSize); if (read_len < 0) { LOG_ERROR("Request(%p) Node(%p) nlsReceive failed, read_len:%d", _request, this, read_len); if (frame) free(frame); return -(NlsReceiveFailed); } else { // LOG_WARN("Request(%p) Node(%p) nlsReceive again ...", _request, // this); } continue; } } else if (recv_ret == -(InvalidWsFrameBody)) { LOG_DEBUG( "Request(%p) Node(%p) the WS data is insufficient, and continues to " "be " "received later! Read frame size:%dbytes, wsType: " "headerSize:%dbytes, fin:0x%x opCode:%d mask:0x%x N0:%d N:%d.", _request, this, frameSize, _wsType.headerSize, _wsType.fin, _wsType.opCode, _wsType.mask, _wsType.N0, _wsType.N); ret = 0; } else { LOG_ERROR("Request(%p) Node(%p) receive full WebSocket Frame failed:%d", _request, this, recv_ret); } /* 解析成功并还有剩余数据, 则尝试再解析 */ if (ret > 0 && cur_data_size > 0) { LOG_DEBUG( "Request(%p) Node(%p) current data remainder size:%d, ret:%d, " "receive ws frame " "continue...", _request, this, cur_data_size, ret); eLoop = true; } else { eLoop = false; } } while (eLoop); if (frame) free(frame); frame = NULL; #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_end, NULL); uint64_t time_consuming_a = timewait_a.tv_sec * 1000 + timewait_a.tv_usec / 1000 - timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000; uint64_t time_consuming_parseFrame = timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 - timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000; uint64_t time_consuming = timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 - timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000; if (time_consuming > 50) { LOG_WARN( "Request(%p) Node(%p) webSocketResponse done with excessive time " "%llums, including recv:%llu parseFrame:%llums.", _request, this, time_consuming, time_consuming_a, time_consuming_parseFrame); } else { LOG_DEBUG("Request(%p) Node(%p) webSocketResponse done", _request, this); } #endif return ret; } NlsEvent *ConnectNode::convertResult(WebSocketFrame *wsFrame, int *ret) { NlsEvent *wsEvent = NULL; if (_request == NULL) { LOG_ERROR("Node(%p) this request is nullptr.", this); *ret = -(RequestEmpty); return NULL; } #ifdef ENABLE_NLS_DEBUG_2 struct timeval timewait_start, timewait_a, timewait_b, timewait_c, timewait_d, timewait_end; gettimeofday(&timewait_start, NULL); #endif if (wsFrame->type == WebSocketHeaderType::BINARY_FRAME) { #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_a, NULL); #endif if (wsFrame->length > 0) { wsEvent = new NlsEvent(wsFrame->data, wsFrame->length, Success, NlsEvent::Binary, _request->getRequestParam()->_task_id); if (wsEvent == NULL) { LOG_ERROR("Node(%p) new NlsEvent failed!", this); handlerEvent(TASKFAILED_NEW_NLSEVENT_FAILED, MemNotEnough, NlsEvent::TaskFailed, _enableOnMessage); *ret = -(NewNlsEventFailed); } } else { LOG_WARN("Node(%p) this ws frame length is invalid %d.", wsFrame->length); *ret = -(WsFrameBodyEmpty); return NULL; } #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_b, NULL); #endif } else if (wsFrame->type == WebSocketHeaderType::TEXT_FRAME) { #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_c, NULL); #endif /* 打印这个string,可能会因为太长而崩溃 */ std::string result((char *)wsFrame->data, wsFrame->length); if (wsFrame->length > 1024) { std::string part_result((char *)wsFrame->data, 1024); LOG_DEBUG( "Node(%p) ws frame len:%d is too long, part response(1024): %s.", this, wsFrame->length, part_result.c_str()); } else if (wsFrame->length == 0) { LOG_ERROR("Node(%p) ws frame len is zero!", this); } else { LOG_DEBUG( "Request(%p) Node(%p) _sslHandle(%p) socketFd(%d) response(ws frame " "len:%d): %s", _request, this, _sslHandle, _socketFd, wsFrame->length, result.c_str()); } if ("GBK" == _request->getRequestParam()->_outputFormat) { result = utility::TextUtils::utf8ToGbk(result); } if (result.empty()) { LOG_ERROR("Node(%p) response result is empty!", this); handlerEvent(TASKFAILED_UTF8_JSON_STRING, Utf8ConvertError, NlsEvent::TaskFailed, _enableOnMessage); *ret = -(WsResponsePackageEmpty); return NULL; } #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_d, NULL); #endif wsEvent = new NlsEvent(result); if (wsEvent == NULL) { LOG_ERROR("Node(%p) new NlsEvent failed!", this); handlerEvent(TASKFAILED_NEW_NLSEVENT_FAILED, MemNotEnough, NlsEvent::TaskFailed, _enableOnMessage); *ret = -(NewNlsEventFailed); } else { *ret = wsEvent->parseJsonMsg(_enableOnMessage); if (*ret < 0) { LOG_ERROR("Node(%p) parseJsonMsg(%s) failed! ret:%d.", this, wsEvent->getAllResponse(), ret); delete wsEvent; wsEvent = NULL; handlerEvent(TASKFAILED_PARSE_JSON_STRING, JsonStringParseFailed, NlsEvent::TaskFailed, _enableOnMessage); } else { // LOG_DEBUG("Node(%p) parseJsonMsg success.", this); } } } else { LOG_ERROR("Node(%p) unknow WebSocketHeaderType:%d", this, wsFrame->type); handlerEvent(TASKFAILED_WS_JSON_STRING, UnknownWsHeadType, NlsEvent::TaskFailed, _enableOnMessage); *ret = -(UnknownWsFrameHeadType); } #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_end, NULL); uint64_t time_consuming = timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 - timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000; uint64_t time_consuming_BINARY_FRAME = timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 - timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000; uint64_t time_consuming_TEXT_FRAME = timewait_d.tv_sec * 1000 + timewait_d.tv_usec / 1000 - timewait_c.tv_sec * 1000 - timewait_c.tv_usec / 1000; uint64_t time_consuming_parseJsonMsg = timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 - timewait_d.tv_sec * 1000 - timewait_d.tv_usec / 1000; if (time_consuming > 50) { LOG_WARN( "Request(%p) Node(%p) convertResult done with excessive time " "%llums, including BINARY_FRAME:%llums TEXT_FRAME:%llums " "parseJsonMsg:%llums.", _request, this, time_consuming, time_consuming_BINARY_FRAME, time_consuming_TEXT_FRAME, time_consuming_parseJsonMsg); } #endif return wsEvent; } /** * @brief: 解析websocket帧, 产出当前node的事件帧(frameEvent) * @return: */ int ConnectNode::parseFrame(WebSocketFrame *wsFrame) { REQUEST_CHECK(_request, this); int result = Success; NlsEvent *frameEvent = NULL; #ifdef ENABLE_NLS_DEBUG_2 struct timeval timewait_start, timewait_a, timewait_b, timewait_c, timewait_d, timewait_end; gettimeofday(&timewait_start, NULL); #endif if (wsFrame->type == WebSocketHeaderType::CLOSE) { LOG_INFO("Node(%p) get CLOSE wsFrame closeCode:%d.", this, wsFrame->closeCode); if (NodeClosed != _workStatus) { std::string msg((char *)wsFrame->data, wsFrame->length); if (!msg.empty()) { LOG_ERROR("Node(%p) get error message:%s", this, msg.c_str()); } char tmp_msg[2048] = {0}; snprintf(tmp_msg, 2048 - 1, "{\"TaskFailed\":\"%s\"}", msg.c_str()); std::string failedMsg = tmp_msg; LOG_ERROR("Node(%p) failed msg:%s.", this, failedMsg.c_str()); frameEvent = new NlsEvent(failedMsg.c_str(), wsFrame->closeCode, NlsEvent::TaskFailed, _request->getRequestParam()->_task_id); if (frameEvent == NULL) { LOG_ERROR("Node(%p) new NlsEvent failed!", this); handlerEvent(TASKFAILED_NEW_NLSEVENT_FAILED, MemNotEnough, NlsEvent::TaskFailed, _enableOnMessage); return -(NewNlsEventFailed); } } else { LOG_INFO("Node(%p) NlsEvent::Close has invoked, skip CLOSE_FRAME.", this); } } else if (wsFrame->type == WebSocketHeaderType::PONG) { LOG_DEBUG("Node(%p) get PONG.", this); return Success; } else { #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_a, NULL); #endif frameEvent = convertResult(wsFrame, &result); #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_b, NULL); uint64_t time_consuming_convertResult0 = timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 - timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000; if (time_consuming_convertResult0 > 50) { LOG_WARN("Request(%p) Node(%p) parseFrame after convertResult:%llu.", _request, this, time_consuming_convertResult0); } #endif } if (frameEvent == NULL) { if (result == -(WsFrameBodyEmpty)) { LOG_WARN( "Node(%p) convert result failed, result:%d. Maybe recv dirty data, " "skip here ...", this, result); return Success; } else { LOG_ERROR("Node(%p) convert result failed, result:%d.", this, result); closeConnectNode(); if (result != Success) { std::string tmp_buf; handlerEvent(genCloseMsg(&tmp_buf), CloseCode, NlsEvent::Close, _enableOnMessage); } return -(NlsEventEmpty); } } LOG_DEBUG( "Node(%p) begin HandlerFrame, msg type:%s node status:%s exit status:%s.", this, frameEvent->getMsgTypeString().c_str(), getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); // invoked cancel() if (_exitStatus == ExitCancel) { LOG_WARN("Node(%p) has been canceled.", this); if (frameEvent) delete frameEvent; frameEvent = NULL; return -(InvalidExitStatus); } int msg_type = frameEvent->getMsgType(); switch (msg_type) { case NlsEvent::RecognitionStarted: case NlsEvent::TranscriptionStarted: case NlsEvent::SynthesisStarted: // reset task_id from server, which will use in channelClose callback. if (frameEvent->getTaskId()) { std::string taskId(frameEvent->getTaskId()); if (!taskId.empty()) { _request->getRequestParam()->setTaskId(taskId); } } break; default: break; } #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_c, NULL); #endif result = handlerFrame(frameEvent); #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_d, NULL); #endif if (result) { delete frameEvent; frameEvent = NULL; return result; } LOG_DEBUG( "Node(%p) HandlerFrame finish, current node status:%s, ready to set " "workStatus.", this, getConnectNodeStatusString().c_str()); // after callback bool closeFlag = false; switch (msg_type) { case NlsEvent::RecognitionStarted: case NlsEvent::TranscriptionStarted: case NlsEvent::SynthesisStarted: if (_request->getRequestParam()->_requestType == SpeechWakeWordDialog) { _workStatus = NodeWakeWording; } else { _workStatus = NodeStarted; if (_request->getRequestParam()->_mode == TypeStreamInputTts) { FlowingSynthesizerParam *param = (FlowingSynthesizerParam *)_request->getRequestParam(); if (param->getSingleRoundText().size() > 0) { _isSendSingleRoundText = true; if (event_add(getSingleRoundTextEvent(), NULL) == Success) { event_active(getSingleRoundTextEvent(), EV_READ, 0); } } } } #ifdef ENABLE_CONTINUED // reconnecting finished _reconnection.state = NodeReconnection::NoReconnection; #endif break; case NlsEvent::Close: case NlsEvent::RecognitionCompleted: _workStatus = NodeCompleted; if (_request->getRequestParam()->_mode == TypeDialog) { closeFlag = false; } else { closeFlag = true; } break; case NlsEvent::TaskFailed: _workStatus = NodeFailed; closeFlag = true; break; case NlsEvent::TranscriptionCompleted: _workStatus = NodeCompleted; closeFlag = true; break; case NlsEvent::SynthesisCompleted: _workStatus = NodeCompleted; closeFlag = true; break; case NlsEvent::DialogResultGenerated: closeFlag = true; break; case NlsEvent::WakeWordVerificationCompleted: _workStatus = NodeStarted; break; case NlsEvent::Binary: if (_isFirstBinaryFrame) { LOG_DEBUG( "Node(%p) get first binary frame, set work status to NodeStarted", this); _isFirstBinaryFrame = false; _workStatus = NodeStarted; } break; default: closeFlag = false; break; } if (frameEvent) delete frameEvent; frameEvent = NULL; if (closeFlag) { _isPreconnecting = false; if (!_isLongConnection || _workStatus == NodeFailed) { #ifdef ENABLE_PRECONNECTED_POOL if (_usePreconnection && _workStatus != NodeFailed) { // 启用preconnected功能更并且未发生错误, 则进行记录 closeStatusConnectNodeForConnectedPool(); _isPreconnecting = true; /* 用于标记此Node进行交互并存储到ConnectedPool */ } else #endif { closeConnectNode(); } } else { closeStatusConnectNode(); } #ifdef ENABLE_PRECONNECTED_POOL if (_isPreconnecting) { // 启用preconnected功能并且未发生错误, 则进行记录 // if (_request->getRequestParam()->_mode == TypeTts) { // LOG_DEBUG( // "Node(%p) reset NodeStatus to NodeHandshaked, will push " // "prestarted node into ConnectedPool.", // this); // // 把Node存入ConnectedPool // if (NlsEventNetWork::_eventClient && // NlsEventNetWork::_eventClient->getPreconnectedPool()) { // uint64_t push_prestarted_begin_ms = // utility::TextUtils::GetTimestampMs(); // NlsEventNetWork::_eventClient->getPreconnectedPool() // ->pushPrestartedNode(_request, // _request->getRequestParam()->_mode, // _sslHandle == _nativeSslHandle); // uint64_t push_prestarted_end_ms = // utility::TextUtils::GetTimestampMs(); // if (push_prestarted_end_ms - push_prestarted_begin_ms > 50) { // LOG_WARN("Node(%p) pushPrestartedNode excessive latency:%llums.", // this, push_prestarted_end_ms - // push_prestarted_begin_ms); // } // } // LOG_DEBUG("Node(%p) reset NodeStatus to NodeHandshaked done.", this); // } else { // LOG_DEBUG( // "Node(%p) reset NodeStatus to NodeHandshaked, will push // prestarted " "node into ConnectedPool when Started.", this); // // 将在Started时把Node存入ConnectedPool // } LOG_DEBUG( "Node(%p) reset NodeStatus to NodeHandshaked, will push " "prestarted node into ConnectedPool.", this); // 把Node存入ConnectedPool if (NlsEventNetWork::_eventClient && NlsEventNetWork::_eventClient->getPreconnectedPool()) { uint64_t push_prestarted_begin_ms = utility::TextUtils::GetTimestampMs(); if (_nodeProcess.connect_type == ConnectWithPreconnectedNodePool && _sslHandle != _nativeSslHandle) { NlsEventNetWork::_eventClient->getPreconnectedPool() ->pushPrestartedNodeFromPreconnected( _request, _request->getRequestParam()->_mode); } else { NlsEventNetWork::_eventClient->getPreconnectedPool() ->pushPrestartedNode(_request, _request->getRequestParam()->_mode, _sslHandle == _nativeSslHandle); } uint64_t push_prestarted_end_ms = utility::TextUtils::GetTimestampMs(); if (push_prestarted_end_ms - push_prestarted_begin_ms > 50) { LOG_WARN("Node(%p) pushPrestartedNode excessive latency:%llums.", this, push_prestarted_end_ms - push_prestarted_begin_ms); } } LOG_DEBUG("Node(%p) reset NodeStatus to NodeHandshaked done.", this); } #endif std::string tmp_buf; handlerEvent(genCloseMsg(&tmp_buf), CloseCode, NlsEvent::Close, _enableOnMessage); } // closeFlag #ifdef ENABLE_NLS_DEBUG_2 gettimeofday(&timewait_end, NULL); uint64_t time_consuming = timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 - timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000; uint64_t time_consuming_convertResult = timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 - timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000; uint64_t time_consuming_handlerFrame = timewait_d.tv_sec * 1000 + timewait_d.tv_usec / 1000 - timewait_c.tv_sec * 1000 - timewait_c.tv_usec / 1000; if (time_consuming > 50) { LOG_WARN( "Request(%p) Node(%p) parseFrame done with excessive time " "%llums, including convertResult:%llums and handlerFrame:%llums.", _request, this, time_consuming, time_consuming_convertResult, time_consuming_handlerFrame); } #endif return Success; } /** * @brief: 触发回调,将事件送给用户 * @return: */ int ConnectNode::handlerFrame(NlsEvent *frameEvent) { #ifdef ENABLE_NLS_DEBUG_2 uint64_t timewait_start, timewait_b, timewait_c, timewait_d, timewait_end; timewait_start = utility::TextUtils::GetTimestampMs(); #endif if (_workStatus == NodeInvalid) { LOG_ERROR( "Request(%p) Node(%p) current node status:%s is invalid, skip " "callback.", _request, this, getConnectNodeStatusString().c_str()); return -(InvaildNodeStatus); } else { if (_handler == NULL) { LOG_ERROR("Request(%p) Node(%p) _handler is nullptr!", _request, this) return -(NlsEventEmpty); } #ifdef ENABLE_REQUEST_RECORDING #ifdef ENABLE_NLS_DEBUG_2 if (frameEvent->getMsgType() != NlsEvent::Binary) { LOG_INFO("Request(%p) Node(%p) %s", _request, this, frameEvent->getAllResponse()); } #endif updateNodeProcess("callback", frameEvent->getMsgType(), true, frameEvent->getMsgType() == NlsEvent::Binary ? frameEvent->getBinaryData().size() : 0); #ifdef ENABLE_NLS_DEBUG_2 timewait_b = utility::TextUtils::GetTimestampMs(); #endif #endif // LOG_DEBUG( // "Node:%p current node status:%s is valid, msg type:%s handle " // "message... ", // this, getConnectNodeStatusString().c_str(), // frameEvent->getMsgTypeString().c_str()); // LOG_DEBUG("Node:%p current response:%s.", this, // frameEvent->getAllResponse()); bool ignore_flag = false; #ifdef ENABLE_CONTINUED updateTwIndexOffset(frameEvent); ignore_flag = ignoreCallbackWhenReconnecting(frameEvent->getMsgType(), frameEvent->getStatusCode()); #endif #ifdef ENABLE_NLS_DEBUG_2 timewait_c = utility::TextUtils::GetTimestampMs(); #endif /* callback to user */ if (_enableOnMessage) { sendFinishCondSignal(NlsEvent::Message); if (!ignore_flag) { handlerMessage(frameEvent->getAllResponse(), NlsEvent::Message); } } else { #ifdef ENABLE_NLS_DEBUG_2 uint64_t timewait_c1, timewait_c2, timewait_c4; timewait_c1 = utility::TextUtils::GetTimestampMs(); #endif sendFinishCondSignal(frameEvent->getMsgType()); #ifdef ENABLE_NLS_DEBUG_2 timewait_c2 = utility::TextUtils::GetTimestampMs(); #endif if (!ignore_flag) { _handler->handlerFrame(*frameEvent); } #ifdef ENABLE_NLS_DEBUG_2 timewait_c4 = utility::TextUtils::GetTimestampMs(); if (timewait_c4 - timewait_c1 > 50) { LOG_WARN( "Request(%p) Node(%p) handlerFrame callback with excessive time " "%llums, including sendFinishCondSignal:%llu handlerFrame:%llums.", _request, this, timewait_c4 - timewait_c1, timewait_c2 - timewait_c1, timewait_c4 - timewait_c2); } #endif } #ifdef ENABLE_NLS_DEBUG_2 timewait_d = utility::TextUtils::GetTimestampMs(); #endif #ifdef ENABLE_REQUEST_RECORDING updateNodeProcess("callback", NodeInvalid, false, 0); #endif if (frameEvent->getMsgType() == NlsEvent::Close) { #ifdef ENABLE_CONTINUED nodeReconnecting(); if (!ignore_flag) { _reconnection.reconnected_count = 0; LOG_DEBUG("Node(%p) reconnected_count reset.", this); } #endif _retryConnectCount = 0; } else if (frameEvent->getMsgType() == NlsEvent::TaskFailed) { LOG_WARN("Request(%p) Node(%p) occur TaskFailed: %s", _request, this, frameEvent->getAllResponse()); #ifdef ENABLE_PRECONNECTED_POOL if (!ignore_flag && _request && NlsEventNetWork::_eventClient && NlsEventNetWork::_eventClient->getPreconnectedPool()) { NlsType type = _request->getRequestParam()->_mode; NlsEventNetWork::_eventClient->getPreconnectedPool() ->curRequestIsAbnormal(_request, type); } #endif } } #ifdef ENABLE_NLS_DEBUG_2 timewait_end = utility::TextUtils::GetTimestampMs(); if (timewait_end - timewait_start > 50) { LOG_WARN( "Request(%p) Node(%p) handlerFrame done with excessive time " "%llums, including updateNodeProcess:%llu ignore_flag:%llums " "callback:%llums close:%llums.", _request, this, timewait_end - timewait_start, timewait_b - timewait_start, timewait_c - timewait_b, timewait_d - timewait_c, timewait_end - timewait_d); } #endif return Success; } /** * @brief: 事件最终处理,并进行回调 * @return: */ void ConnectNode::handlerEvent(const char *errorMsg, int errorCode, NlsEvent::EventType eventType, bool ignore) { LOG_DEBUG("Node(%p) 's exit status:%s, eventType:%d.", this, getExitStatusString().c_str(), eventType); if (_exitStatus == ExitCancel) { LOG_WARN("Node(%p) invoke cancel command, callback won't be invoked.", this); return; } if (_request == NULL) { LOG_ERROR("The request of this node(%p) is nullptr.", this); return; } if (errorCode != CloseCode) { _nodeErrCode = errorCode; } std::string error_str(errorMsg); if (error_str.empty()) { LOG_WARN("Node(%p) errorMsg is empty!", this); } #ifdef ENABLE_REQUEST_RECORDING if (eventType == NlsEvent::Close || eventType == NlsEvent::TaskFailed) { if (eventType == NlsEvent::TaskFailed) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.failed_timestamp_ms = _nodeProcess.last_op_timestamp_ms; } else if (eventType == NlsEvent::Close) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.closed_timestamp_ms = _nodeProcess.last_op_timestamp_ms; } error_str.assign(replenishNodeProcess(errorMsg)); if (eventType == NlsEvent::TaskFailed) { LOG_ERROR("Node(%p) trigger message: %s", this, error_str.c_str()); } else if (eventType == NlsEvent::Close) { LOG_INFO("Node(%p) trigger message: %s", this, error_str.c_str()); } } #endif NlsEvent *useEvent = NULL; useEvent = new NlsEvent(error_str.c_str(), errorCode, eventType, _request->getRequestParam()->_task_id); if (useEvent == NULL) { LOG_ERROR("Node(%p) new NlsEvent failed.", this); return; } if (eventType == NlsEvent::Close) { LOG_INFO("Node(%p) will callback NlsEvent::Close frame.", this); } if (_handler == NULL) { LOG_ERROR("Node(%p) event type:%d 's _handler is nullptr!", this, eventType) } else { if (NodeClosed == _workStatus) { LOG_WARN("Node(%p) NlsEvent::Close has invoked, skip CloseCallback.", this); } else { handlerFrame(useEvent); if (eventType == NlsEvent::Close) { _workStatus = NodeClosed; LOG_INFO("Node(%p) callback NlsEvent::Close frame done.", this); } else { LOG_INFO("Node(%p) callback NlsEvent::%s frame done.", this, useEvent->getMsgTypeString().c_str()); } } } delete useEvent; useEvent = NULL; return; } void ConnectNode::handlerMessage(const char *response, NlsEvent::EventType eventType) { NlsEvent *useEvent = NULL; useEvent = new NlsEvent(response, Success, eventType, _request->getRequestParam()->_task_id); if (useEvent == NULL) { LOG_ERROR("Node(%p) new NlsEvent failed.", this); return; } if (_workStatus == NodeInvalid) { LOG_ERROR("Node(%p) node status:%s is invalid, skip callback.", this, getConnectNodeStatusString().c_str()); } else { _handler->handlerFrame(*useEvent); } delete useEvent; useEvent = NULL; return; } /** * @brief: 解析错误信息获得对应错误码 * @return: 错误码 */ int ConnectNode::getErrorCodeFromMsg(const char *msg) { int code = DefaultErrorCode; try { Json::Reader reader; Json::Value root(Json::objectValue); // parse json if existent if (reader.parse(msg, root)) { if (!root["header"].isNull() && root["header"].isObject()) { Json::Value head = root["header"]; if (!head["status"].isNull() && head["status"].isInt()) { code = head["status"].asInt(); } } } else { if (strstr(msg, "return of SSL_read:")) { if (strstr(msg, "error:00000000:lib(0):func(0):reason(0)") || strstr(msg, "shutdown while in init")) { code = DefaultErrorCode; } } else if (strstr(msg, "ACCESS_DENIED")) { if (strstr(msg, "The token")) { if (strstr(msg, "has expired")) { code = TokenHasExpired; } else if (strstr(msg, "is invalid")) { code = TokenIsInvalid; } } else if (strstr(msg, "No privilege to this voice")) { code = NoPrivilegeToVoice; } else if (strstr(msg, "Missing authorization header")) { code = MissAuthHeader; } } else if (strstr(msg, "Got bad status")) { if (strstr(msg, "403 Forbidden")) { code = HttpGotBadStatusWith403; } } else if (strstr(msg, "Operation now in progress")) { code = OpNowInProgress; } else if (strstr(msg, "Broken pipe")) { code = BrokenPipe; } else if (strstr(msg, "connect failed")) { code = SysConnectFailed; } } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return code; } return code; } void ConnectNode::handlerTaskFailedEvent(std::string failedInfo, int code) { char tmp_msg[1024] = {0}; if (failedInfo.empty()) { strcpy(tmp_msg, "{\"TaskFailed\":\"Unknown failed.\"}"); code = DefaultErrorCode; } else { snprintf(tmp_msg, 1024 - 1, "{\"TaskFailed\":\"%s\"}", failedInfo.c_str()); if (code == DefaultErrorCode) { code = getErrorCodeFromMsg(failedInfo.c_str()); } } #ifdef ENABLE_DNS_IP_CACHE // clear IP cache _eventThread->setIpCache(NULL, NULL); #endif handlerEvent(tmp_msg, code, NlsEvent::TaskFailed, _enableOnMessage); std::string tmp_buf; handlerEvent(genCloseMsg(&tmp_buf), CloseCode, NlsEvent::Close, _enableOnMessage); return; } #ifdef __LINUX__ void *ConnectNode::async_dns_resolve_thread_fn(void *arg) { pthread_detach(pthread_self()); ConnectNode *node = static_cast<ConnectNode *>(arg); LOG_DEBUG("Node(%p) dnsThread(%lu) is working ...", node, pthread_self()); if (node->_gaicbRequest[0] == NULL) { node->_gaicbRequest[0] = (struct gaicb *)malloc(sizeof(*node->_gaicbRequest[0])); } if (node->_gaicbRequest[0] == NULL) { LOG_ERROR("Node(%p) malloc _gaicbRequest failed.", arg); } else { memset(node->_gaicbRequest[0], 0, sizeof(*node->_gaicbRequest[0])); node->_gaicbRequest[0]->ar_name = node->_nodename; int err = getaddrinfo_a(GAI_NOWAIT, node->_gaicbRequest, 1, NULL); if (err) { LOG_ERROR("Node(%p) getaddrinfo_a failed, err:%d(%s).", arg, err, gai_strerror(err)); } else { /* check this node is alive. */ if (node->_request == NULL) { LOG_ERROR("The request of this node(%p) is nullptr.", node); goto dnsExit; } NlsClientImpl *instance = node->getInstance(); NlsNodeManager *node_manager = instance->getNodeManger(); int status = NodeStatusInvalid; int result = node_manager->checkNodeExist(node, &status); if (result != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result); goto dnsExit; } time_t timeout_ms = node->getRequest()->getRequestParam()->getTimeout(); // ms struct timespec outtime; utility::TextUtils::GetTimespecFromMs(&outtime, timeout_ms); // LOG_DEBUG("Node(%p) ready to gai_suspend.", arg); err = gai_suspend(node->_gaicbRequest, 1, &outtime); // LOG_DEBUG("Node(%p) gai_suspend finish, err:%d(%s).", node, err, // gai_strerror(err)); if (node->_dnsThreadExit) { LOG_WARN("Node(%p) ConnectNode is exiting, skip gai.", node); goto dnsExit; } if (node->_dnsThread == 0) { LOG_WARN("Node(%p) ConnectNode has exited, skip gai.", node); goto dnsExit; } if (err) { LOG_ERROR("Node(%p) gai_suspend failed, err:%d(%s).", arg, err, gai_strerror(err)); if (err == EAI_SYSTEM || err == EAI_AGAIN) { LOG_ERROR( "Node(%p) gai_suspend err:%d is mean timeout. please try again " "...", arg, err); } } else { /* check this node is alive again after gai_suspend. */ result = node_manager->checkNodeExist(node, &status); if (result != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result); goto dnsExit; } err = gai_error(node->_gaicbRequest[0]); if (err) { LOG_WARN("Node(%p) cannot get addrinfo, err:%d(%s).", arg, err, gai_strerror(err)); } else { node->_addrinfo = node->_gaicbRequest[0]->ar_result; } } } node->_dnsErrorCode = err; if (node->_dnsEvent) { event_del(node->_dnsEvent); event_assign(node->_dnsEvent, node->_eventThread->_workBase, -1, EV_READ, WorkThread::sysDnsEventCallback, node); } else { node->_dnsEvent = event_new(node->_eventThread->_workBase, -1, EV_READ, WorkThread::sysDnsEventCallback, node); if (NULL == node->_dnsEvent) { LOG_ERROR("Node(%p) new event(_dnsEvent) failed.", node); goto dnsExit; } } event_add(node->_dnsEvent, NULL); event_active(node->_dnsEvent, EV_READ, 0); LOG_INFO("Node(%p) dnsThread(%lu) event_active done.", node, pthread_self()); } dnsExit: if (node->_gaicbRequest[0]) { free(node->_gaicbRequest[0]); node->_gaicbRequest[0] = NULL; } node->_dnsThread = 0; node->_dnsThreadRunning = false; LOG_INFO("Node(%p) dnsThread(%lu) is exited.", node, pthread_self()); pthread_exit(NULL); } /** * @brief: 异步方式使用系统的getaddrinfo()方法获得dns * @return: 成功则为0, 否则为失败 */ static int native_getaddrinfo(const char *nodename, const char *servname, evdns_getaddrinfo_cb cb, void *arg) { ConnectNode *node = static_cast<ConnectNode *>(arg); NlsNodeManager *node_manager = node->getInstance()->getNodeManger(); int status = NodeStatusInvalid; int result = node_manager->checkNodeExist(node, &status); if (result != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result); return result; } else { LOG_DEBUG( "Node(%p) use native_getaddrinfo, ready to create dnsThread(%lu).", node, node->_dnsThread); } if (node->getExitStatus() == ExitCancel) { LOG_WARN("Node(%p) is ExitCancel, skip here ...", node); return -(CancelledExitStatus); } MUTEX_LOCK(node->_mtxNode); node->_dnsThreadRunning = true; node->_nodename = (char *)nodename; node->_servname = (char *)servname; if (node->_dnsThread) { pthread_join(node->_dnsThread, NULL); } int err = pthread_create(&node->_dnsThread, NULL, &ConnectNode::async_dns_resolve_thread_fn, arg); if (err) { node->_dnsThreadRunning = false; LOG_ERROR("Node(%p) dnsThread(%lu) create failed.", node, node->_dnsThread); } else { LOG_DEBUG("Node(%p) dnsThread(%lu) create success.", node, node->_dnsThread); } MUTEX_UNLOCK(node->_mtxNode); return err; } #endif /** * @brief: 进行DNS解析 * @return: 成功则为非负, 失败则为负值 */ int ConnectNode::dnsProcess(int aiFamily, char *directIp, bool sysGetAddr) { EXIT_CANCEL_CHECK(_exitStatus, this); struct evutil_addrinfo hints; NlsNodeManager *node_manager = _instance->getNodeManger(); int status = NodeStatusInvalid; int result = node_manager->checkNodeExist(this, &status); if (result != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", this, result); return result; } /* 当node处于长链接模式且已经链接, 无需进入握手阶段, 直接进入starting阶段. */ if (_isLongConnection && _isConnected) { LOG_DEBUG( "Node(%p) has connected, current is longConnection and connected.", this); #ifdef ENABLE_REQUEST_RECORDING if (_isLongConnection) { _nodeProcess.connect_type = ConnectWithLongConnect; } #endif _workStatus = NodeStarting; node_manager->updateNodeStatus(this, NodeStatusRunning); if (_request->getRequestParam()->_requestType == SpeechTextDialog) { addCmdDataBuffer(CmdTextDialog); } else { addCmdDataBuffer(CmdStart); } result = nlsSendFrame(getCmdEvBuffer()); if (result < 0) { LOG_ERROR("Node(%p) response failed, result:%d.", this, result); handlerTaskFailedEvent(getErrorMsg()); closeConnectNode(); return result; } return Success; } /* 尝试链接校验 */ if (!checkConnectCount()) { LOG_ERROR("Node(%p) restart connect failed.", this); handlerTaskFailedEvent(TASKFAILED_CONNECT_JSON_STRING, SysConnectFailed); return -(ConnectFailed); } _workStatus = NodeConnecting; node_manager->updateNodeStatus(this, NodeStatusConnecting); if (!parseUrlInformation(directIp)) { return -(ParseUrlFailed); } _url._enableSysGetAddr = sysGetAddr; if (_url._isSsl) { LOG_INFO("Node(%p) _url._isSsl is True, _url._enableSysGetAddr is %s.", this, _url._enableSysGetAddr ? "True" : "False"); } else { LOG_INFO("Node(%p) _url._isSsl is False, _url._enableSysGetAddr is %s.", this, _url._enableSysGetAddr ? "True" : "False"); } if (_url._directIp) { LOG_INFO("Node(%p) _url._directIp is True.", this); #ifdef ENABLE_REQUEST_RECORDING _nodeProcess.connect_type = ConnectWithDirectIP; #endif WorkThread::directConnect(this, _url._address); } else { LOG_INFO("Node(%p) _url._directIp is False.", this); if (aiFamily != AF_UNSPEC && aiFamily != AF_INET && aiFamily != AF_INET6) { LOG_WARN("Node(%p) aiFamily is invalid, use default AF_INET.", this, aiFamily); aiFamily = AF_INET; } #ifdef ENABLE_DNS_IP_CACHE std::string tmp_ip = _eventThread->getIpFromCache( (char *)_request->getRequestParam()->_url.c_str()); if (tmp_ip.length() > 0) { LOG_INFO("Node(%p) find IP in cache, connect directly.", this); // 从dns cache中获得IP进行直连 #ifdef ENABLE_REQUEST_RECORDING _nodeProcess.connect_type = ConnectWithIpCache; #endif WorkThread::directConnect(this, (char *)tmp_ip.c_str()); } else #endif { memset(&hints, 0, sizeof(hints)); hints.ai_family = aiFamily; hints.ai_flags = EVUTIL_AI_CANONNAME; hints.ai_socktype = SOCK_STREAM; hints.ai_protocol = IPPROTO_TCP; LOG_INFO("Node(%p) dns url:%s, enableSysGetAddr:%s.", this, _request->getRequestParam()->_url.c_str(), _url._enableSysGetAddr ? "True" : "False"); if (_url._enableSysGetAddr) { #ifdef __LINUX__ /* * 在内部ws协议下或者主动使用系统getaddrinfo_a的情况下, * 使用系统的getaddrinfo_a() */ result = native_getaddrinfo(_url._host, NULL, WorkThread::dnsEventCallback, this); if (result != Success) { result = -(GetAddrinfoFailed); } #else if (NULL == _eventThread || NULL == _eventThread->_dnsBase) { LOG_ERROR("Node:%p dns source is invalid.", this); return -(InvalidDnsSource); } _dnsRequest = evdns_getaddrinfo(_eventThread->_dnsBase, _url._host, NULL, &hints, WorkThread::dnsEventCallback, this); #endif } else { if (NULL == _eventThread || NULL == _eventThread->_dnsBase) { LOG_ERROR("Node(%p) dns source is invalid.", this); return -(InvalidDnsSource); } _dnsRequest = evdns_getaddrinfo(_eventThread->_dnsBase, _url._host, NULL, &hints, WorkThread::dnsEventCallback, this); if (_dnsRequest == NULL) { LOG_ERROR("Node:%p dnsRequest evdns_getaddrinfo failed!", this); /* * No need to free user_data ordecrement n_pending_requests; that * happened in the callback. */ return -(InvalidDnsSource); } } } } return result; } /** * @brief: 开始进行链接处理, 创建socket, 设置libevent, 并开始socket链接. * @return: 成功则为0, 阻塞则为1, 失败则负值. */ int ConnectNode::connectProcess(const char *ip, int aiFamily) { EXIT_CANCEL_CHECK(_exitStatus, this); evutil_socket_t sockFd = socket(aiFamily, SOCK_STREAM, 0); if (sockFd < 0) { LOG_ERROR("Node(%p) socket failed. aiFamily:%d, sockFd:%d. error mesg:%s.", this, aiFamily, sockFd, evutil_socket_error_to_string(evutil_socket_geterror(sockFd))); return -(SocketFailed); } else { // LOG_DEBUG("Node(%p) socket success, sockFd:%d.", this, sockFd); } struct linger so_linger; so_linger.l_onoff = 1; so_linger.l_linger = 0; if (setsockopt(sockFd, SOL_SOCKET, SO_LINGER, (char *)&so_linger, sizeof(struct linger)) < 0) { LOG_ERROR("Node(%p) set SO_LINGER failed.", this); return -(SetSocketoptFailed); } if (evutil_make_socket_nonblocking(sockFd) < 0) { LOG_ERROR("Node(%p) evutil_make_socket_nonblocking failed.", this); return -(EvutilSocketFalied); } LOG_INFO("Node(%p) new socket ip:%s port:%d Fd:%d.", this, ip, _url._port, sockFd); short events = EV_READ | EV_WRITE | EV_TIMEOUT | EV_FINALIZE; // LOG_DEBUG("Node(%p) set events(%d) for connectEventCallback.", this, // events); if (NULL == _connectEvent) { _connectEvent = event_new(_eventThread->_workBase, sockFd, events, WorkThread::connectEventCallback, this); if (NULL == _connectEvent) { LOG_ERROR("Node(%p) new event(_connectEvent) failed.", this); return -(EventEmpty); } } else { event_del(_connectEvent); event_assign(_connectEvent, _eventThread->_workBase, sockFd, events, WorkThread::connectEventCallback, this); } #ifdef ENABLE_HIGH_EFFICIENCY if (NULL == _connectTimerEvent) { _connectTimerEvent = evtimer_new( _eventThread->_workBase, WorkThread::connectTimerEventCallback, this); if (NULL == _connectTimerEvent) { LOG_ERROR("Node(%p) new event(_connectTimerEvent) failed.", this); return -(EventEmpty); } } #endif if (_enableRecvTv) { events = EV_READ | EV_TIMEOUT | EV_PERSIST | EV_FINALIZE; } else { events = EV_READ | EV_PERSIST | EV_FINALIZE; } // LOG_DEBUG("Node(%p) set events(%d) for readEventCallback with sockFd(%d)", // this, events, sockFd); if (NULL == _readEvent) { _readEvent = event_new(_eventThread->_workBase, sockFd, events, WorkThread::readEventCallBack, this); if (NULL == _readEvent) { LOG_ERROR("Node(%p) new event(_readEvent) failed.", this); return -(EventEmpty); } } else { event_del(_readEvent); event_assign(_readEvent, _eventThread->_workBase, sockFd, events, WorkThread::readEventCallBack, this); } events = EV_WRITE | EV_TIMEOUT | EV_FINALIZE; // LOG_DEBUG("Node(%p) set events(%d) for writeEventCallback with sockFd(%d)", // this, events, sockFd); if (NULL == _writeEvent) { _writeEvent = event_new(_eventThread->_workBase, sockFd, events, WorkThread::writeEventCallBack, this); if (NULL == _writeEvent) { LOG_ERROR("Node(%p) new event(_writeEvent) failed.", this); return -(EventEmpty); } } else { event_del(_writeEvent); event_assign(_writeEvent, _eventThread->_workBase, sockFd, events, WorkThread::writeEventCallBack, this); } _aiFamily = aiFamily; if (aiFamily == AF_INET) { memset(&_addrV4, 0, sizeof(_addrV4)); _addrV4.sin_family = AF_INET; _addrV4.sin_port = htons(_url._port); if (inet_pton(AF_INET, ip, &_addrV4.sin_addr) <= 0) { LOG_ERROR("Node(%p) IpV4 inet_pton failed.", this); evutil_closesocket(sockFd); return -(SetSocketoptFailed); } } else if (aiFamily == AF_INET6) { memset(&_addrV6, 0, sizeof(_addrV6)); _addrV6.sin6_family = AF_INET6; _addrV6.sin6_port = htons(_url._port); if (inet_pton(AF_INET6, ip, &_addrV6.sin6_addr) <= 0) { LOG_ERROR("Node(%p) IpV6 inet_pton failed.", this); evutil_closesocket(sockFd); return -(SetSocketoptFailed); } } _socketFd = sockFd; return socketConnect(); } #ifdef ENABLE_PRECONNECTED_POOL int ConnectNode::syncConnectProcess(const char *ip, int aiFamily) { EXIT_CANCEL_CHECK(_exitStatus, this); evutil_socket_t sockFd = socket(aiFamily, SOCK_STREAM, 0); if (sockFd < 0) { LOG_ERROR("Node(%p) socket failed. aiFamily:%d, sockFd:%d. error mesg:%s.", this, aiFamily, sockFd, evutil_socket_error_to_string(evutil_socket_geterror(sockFd))); return -(SocketFailed); } else { // LOG_DEBUG("Node(%p) socket success, sockFd:%d.", this, sockFd); } struct linger so_linger; so_linger.l_onoff = 1; so_linger.l_linger = 0; if (setsockopt(sockFd, SOL_SOCKET, SO_LINGER, (char *)&so_linger, sizeof(struct linger)) < 0) { LOG_ERROR("Node(%p) set SO_LINGER failed.", this); return -(SetSocketoptFailed); } LOG_INFO("Node(%p) new socket ip:%s port:%d Fd:%d.", this, ip, _url._port, sockFd); _aiFamily = aiFamily; if (aiFamily == AF_INET) { memset(&_addrV4, 0, sizeof(_addrV4)); _addrV4.sin_family = AF_INET; _addrV4.sin_port = htons(_url._port); if (inet_pton(AF_INET, ip, &_addrV4.sin_addr) <= 0) { LOG_ERROR("Node(%p) IpV4 inet_pton failed.", this); evutil_closesocket(sockFd); return -(SetSocketoptFailed); } } else if (aiFamily == AF_INET6) { memset(&_addrV6, 0, sizeof(_addrV6)); _addrV6.sin6_family = AF_INET6; _addrV6.sin6_port = htons(_url._port); if (inet_pton(AF_INET6, ip, &_addrV6.sin6_addr) <= 0) { LOG_ERROR("Node(%p) IpV6 inet_pton failed.", this); evutil_closesocket(sockFd); return -(SetSocketoptFailed); } } _socketFd = sockFd; return syncSocketConnect(); } #endif int ConnectNode::prestartProcess() { short events = EV_READ | EV_WRITE | EV_TIMEOUT | EV_FINALIZE; if (_enableRecvTv) { events = EV_READ | EV_TIMEOUT | EV_PERSIST | EV_FINALIZE; } else { events = EV_READ | EV_PERSIST | EV_FINALIZE; } LOG_DEBUG("Node(%p) set events(%d) for readEventCallback with sockFd(%d)", this, events, _socketFd); if (NULL == _readEvent) { LOG_DEBUG("Node(%p) event_new _readEvent with sockFd(%d)", this, _socketFd); _readEvent = event_new(_eventThread->_workBase, _socketFd, events, WorkThread::readEventCallBack, this); if (NULL == _readEvent) { LOG_ERROR("Node(%p) new event(_readEvent) failed.", this); } } else { LOG_DEBUG("Node(%p) event_del _readEvent first with sockFd(%d)", this, _socketFd); event_del(_readEvent); event_assign(_readEvent, _eventThread->_workBase, _socketFd, events, WorkThread::readEventCallBack, this); } if (NULL == _readEvent) { LOG_ERROR("Node(%p) _readEvent is nullptr.", this); } else { LOG_DEBUG("Node(%p) event_add _readEvent with sockFd(%d)", this, _socketFd); if (_enableRecvTv) { utility::TextUtils::GetTimevalFromMs( &_recvTv, _request->getRequestParam()->getRecvTimeout()); event_add(_readEvent, &_recvTv); } else { event_add(_readEvent, NULL); } } events = EV_WRITE | EV_TIMEOUT | EV_FINALIZE; // LOG_DEBUG("Node(%p) set events(%d) for writeEventCallback with sockFd(%d)", // this, events, _socketFd); if (NULL == _writeEvent) { _writeEvent = event_new(_eventThread->_workBase, _socketFd, events, WorkThread::writeEventCallBack, this); if (NULL == _writeEvent) { LOG_ERROR("Node(%p) new event(_writeEvent) failed.", this); } } else { event_del(_writeEvent); event_assign(_writeEvent, _eventThread->_workBase, _socketFd, events, WorkThread::writeEventCallBack, this); } return Success; } int ConnectNode::prestartEventDelProcess() { if (NULL != _readEvent) { event_del(_readEvent); event_free(_readEvent); _readEvent = NULL; } if (NULL != _writeEvent) { event_del(_writeEvent); event_free(_writeEvent); _writeEvent = NULL; } return Success; } /** * @brief:进行socket链接 * @return: 成功则为0, 阻塞则为1, 失败则负值. */ int ConnectNode::socketConnect() { int retCode = 0; NlsClientImpl *client = _instance; NlsNodeManager *node_manager = client->getNodeManger(); int status = NodeStatusInvalid; int ret = node_manager->checkNodeExist(this, &status); if (ret != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, ret:%d.", this, ret); return ret; } if (ExitCancel == _exitStatus) { LOG_WARN("Node(%p) current ExitCancel, skip sslProcess.", this); return -(InvalidExitStatus); } // LOG_DEBUG("Node(%p) connecting with socketFd(%d) ...", this, _socketFd); if (_aiFamily == AF_INET) { retCode = connect(_socketFd, (const sockaddr *)&_addrV4, sizeof(_addrV4)); } else { retCode = connect(_socketFd, (const sockaddr *)&_addrV6, sizeof(_addrV6)); } if (retCode == -1) { int connectErrCode = utility::getLastErrorCode(); if (NLS_ERR_CONNECT_RETRIABLE(connectErrCode)) { /* * connectErrCode == 115(EINPROGRESS) * means connection is in progress, * normally the socket connecting timeout is 75s. * after the socket fd is ready to read. */ /* 开启用于链接的libevent */ if (NULL == _connectEvent) { LOG_ERROR("Node(%p) event is nullptr.", this); return -(EventEmpty); } time_t timeout_ms = _request->getRequestParam()->getTimeout(); utility::TextUtils::GetTimevalFromMs(&_connectTv, timeout_ms); event_add(_connectEvent, &_connectTv); LOG_DEBUG("Node(%p) will connect later, errno:%d. timeout:%ldms.", this, connectErrCode, timeout_ms); return 1; } else { LOG_ERROR( "Node(%p) connect failed:%s. retCode:%d connectErrCode:%d. retry ...", this, evutil_socket_error_to_string(evutil_socket_geterror(_socketFd)), retCode, connectErrCode); #ifdef ENABLE_DNS_IP_CACHE std::string tmp_ip = _eventThread->getIpFromCache( (char *)_request->getRequestParam()->_url.c_str()); if (tmp_ip.length() == 0) { LOG_ERROR("Node(%p) connect failed, clear IpCache, will dns later ...", this); _eventThread->setIpCache(NULL, NULL); } #endif return -(SocketConnectFailed); } } else { LOG_DEBUG("Node(%p) connected directly. retCode:%d.", this, retCode); _workStatus = NodeConnected; node_manager->updateNodeStatus(this, NodeStatusConnected); _isConnected = true; } return Success; } #ifdef ENABLE_PRECONNECTED_POOL int ConnectNode::syncSocketConnect() { int retCode = 0; NlsClientImpl *client = _instance; NlsNodeManager *node_manager = client->getNodeManger(); int status = NodeStatusInvalid; int ret = node_manager->checkNodeExist(this, &status); if (ret != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, ret:%d.", this, ret); return ret; } if (ExitCancel == _exitStatus) { LOG_WARN("Node(%p) current ExitCancel, skip sslProcess.", this); return -(InvalidExitStatus); } if (_aiFamily == AF_INET) { retCode = connect(_socketFd, (const sockaddr *)&_addrV4, sizeof(_addrV4)); } else { retCode = connect(_socketFd, (const sockaddr *)&_addrV6, sizeof(_addrV6)); } if (retCode == -1) { int connectErrCode = utility::getLastErrorCode(); if (NLS_ERR_CONNECT_RETRIABLE(connectErrCode)) { LOG_DEBUG("Node(%p) will connect later, errno:%d.", this, connectErrCode); return 1; } else { LOG_ERROR( "Node(%p) connect failed:%s. retCode:%d connectErrCode:%d. retry ...", this, evutil_socket_error_to_string(evutil_socket_geterror(_socketFd)), retCode, connectErrCode); #ifdef ENABLE_DNS_IP_CACHE std::string tmp_ip = _eventThread->getIpFromCache( (char *)_request->getRequestParam()->_url.c_str()); if (tmp_ip.length() == 0) { LOG_ERROR("Node(%p) connect failed, clear IpCache, will dns later ...", this); _eventThread->setIpCache(NULL, NULL); } #endif return -(SocketConnectFailed); } } else { LOG_DEBUG("Node(%p) connected directly. retCode:%d.", this, retCode); _workStatus = NodeConnected; node_manager->updateNodeStatus(this, NodeStatusConnected); _isConnected = true; } return Success; } #endif /** * @brief: 进行SSL握手 * @return: 成功 等待链接则返回1, 正在握手则返回0, 失败则返回负值 */ int ConnectNode::sslProcess() { EXIT_CANCEL_CHECK(_exitStatus, this); int ret = 0; NlsClientImpl *client = _instance; NlsNodeManager *node_manager = client->getNodeManger(); int status = NodeStatusInvalid; ret = node_manager->checkNodeExist(this, &status); if (ret != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, ret:%d", this, ret); return ret; } if (_url._isSsl) { ret = _sslHandle->sslHandshake(_socketFd, _url._host); if (ret == SSL_ERROR_WANT_READ || ret == SSL_ERROR_WANT_WRITE) { // current status:NodeConnected // LOG_DEBUG("Node(%p) status:%s, wait ssl process ...", // this, getConnectNodeStatusString().c_str()); // trigger connectEventCallback() #ifdef ENABLE_HIGH_EFFICIENCY // connect回调和定时connect回调交替调用, 降低事件量的同时保证稳定 if (!_connectTimerFlag) { if (NULL == _connectTimerEvent) { LOG_ERROR("Node(%p) event is nullptr.", this); return -(EventEmpty); } // LOG_DEBUG("Node(%p) add evtimer _connectTimerEvent.", this); evtimer_add(_connectTimerEvent, &_connectTimerTv); _connectTimerFlag = true; } else { if (NULL == _connectEvent) { LOG_ERROR("Node(%p) event is nullptr.", this); return -(EventEmpty); } // LOG_DEBUG("Node(%p) add events _connectEvent.", this); utility::TextUtils::GetTimevalFromMs( &_connectTv, _request->getRequestParam()->getTimeout()); // set _connectEvent to pending status. event_add(_connectEvent, &_connectTv); _connectTimerFlag = false; } #else if (NULL == _connectEvent) { LOG_ERROR("Node(%p) event is nullptr.", this); return -(EventEmpty); } utility::TextUtils::GetTimevalFromMs( &_connectTv, _request->getRequestParam()->getTimeout()); // LOG_DEBUG("Node(%p) add events _connectEvent.", this); event_add(_connectEvent, &_connectTv); #endif return 1; } else if (ret < 0) { _nodeErrMsg = _sslHandle->getFailedMsg(); LOG_ERROR("Node(%p) _sslHandle(%p) sslHandshake failed(%d), %s.", this, _sslHandle, ret, _nodeErrMsg.c_str()); return -(SslHandshakeFailed); } else { _workStatus = NodeHandshaking; node_manager->updateNodeStatus(this, NodeStatusHandshaking); LOG_DEBUG( "Node(%p) _sslHandle(%p) sslHandshake done, ret:%d, set " "node:NodeHandshaking.", this, _sslHandle, ret); return 0; } } else { _workStatus = NodeHandshaking; node_manager->updateNodeStatus(this, NodeStatusHandshaking); LOG_INFO("Node(%p) it's not ssl process, set node:NodeHandshaking.", this); } return 0; } #ifdef ENABLE_PRECONNECTED_POOL /** * @brief: 进行SSL握手 * @return: 成功则返回0, 失败则返回负值 */ int ConnectNode::syncSslProcess() { EXIT_CANCEL_CHECK(_exitStatus, this); int ret = 0; NlsClientImpl *client = _instance; NlsNodeManager *node_manager = client->getNodeManger(); int status = NodeStatusInvalid; ret = node_manager->checkNodeExist(this, &status); if (ret != Success) { LOG_ERROR("Node(%p) checkNodeExist failed, ret:%d", this, ret); return ret; } if (_url._isSsl) { int try_count = 50; ret = _sslHandle->sslHandshake(_socketFd, _url._host); while ((ret == SSL_ERROR_WANT_READ || ret == SSL_ERROR_WANT_WRITE) && try_count-- > 0) { usleep(10 * 1000); ret = _sslHandle->sslHandshake(_socketFd, _url._host); } if (ret == SSL_ERROR_WANT_READ || ret == SSL_ERROR_WANT_WRITE) { return 1; } else if (ret < 0) { _nodeErrMsg = _sslHandle->getFailedMsg(); LOG_ERROR("Node(%p) _sslHandle(%p) sslHandshake failed(%d), %s.", this, _sslHandle, ret, _nodeErrMsg.c_str()); return -(SslHandshakeFailed); } else { _workStatus = NodeHandshaking; node_manager->updateNodeStatus(this, NodeStatusHandshaking); LOG_DEBUG( "Node(%p) _sslHandle(%p) sslHandshake done, ret:%d, set " "node:NodeHandshaking.", this, _sslHandle, ret); return 0; } } else { _workStatus = NodeHandshaking; node_manager->updateNodeStatus(this, NodeStatusHandshaking); LOG_INFO("Node(%p) it's not ssl process, set node:NodeHandshaking.", this); } return 0; } /** * @brief: 从IpCache中获得IP并进行链接 * @return: 链接成功, 否则不支持此功能或链接失败 */ bool ConnectNode::directLinkIpFromCache() { bool result = false; NlsNodeManager *node_manager = _instance->getNodeManger(); WorkThread::insertListNode(_eventThread, _request); updateParameters(); _workStatus = NodeConnecting; node_manager->updateNodeStatus(this, NodeStatusConnecting); if (!parseUrlInformation(_eventThread->getDirectIp())) { return false; } _url._enableSysGetAddr = _eventThread->getEnableSysGetAddr(); if (_url._isSsl) { LOG_INFO( "Request(%p) Node(%p) _url._isSsl is True, _url._enableSysGetAddr is " "%s.", _request, this, _url._enableSysGetAddr ? "True" : "False"); } else { LOG_INFO( "Request(%p) Node(%p) _url._isSsl is False, _url._enableSysGetAddr is " "%s.", _request, this, _url._enableSysGetAddr ? "True" : "False"); } if (_url._directIp) { LOG_INFO("Node(%p) _url._directIp is True.", this); result = WorkThread::syncDirectConnect(this, _url._address); } else { std::string tmp_ip = _eventThread->getIpFromCache( (char *)_request->getRequestParam()->_url.c_str(), true); if (tmp_ip.length() > 0) { LOG_INFO("Request(%p) Node(%p) find IP in cache, connect directly.", _request, this); // 从dns cache中获得IP进行直连 result = WorkThread::syncDirectConnect(this, (char *)tmp_ip.c_str()); } else { LOG_INFO("Request(%p) Node(%p) cannot find IP in cache.", _request, this); return false; } } return result; } #endif /** * @brief: 初始化音频编码器 * @return: */ void ConnectNode::initNlsEncoder() { MUTEX_LOCK(_mtxNode); if (NULL == _nlsEncoder) { if (NULL == _request) { LOG_ERROR("The request of node(%p) is nullptr.", this); MUTEX_UNLOCK(_mtxNode); return; } if (_request->getRequestParam()->_format == "opu") { _encoderType = ENCODER_OPU; } else if (_request->getRequestParam()->_format == "opus") { _encoderType = ENCODER_OPUS; } if (_encoderType != ENCODER_NONE) { _nlsEncoder = new NlsEncoder(); if (NULL == _nlsEncoder) { LOG_ERROR("Node(%p) new _nlsEncoder failed.", this); MUTEX_UNLOCK(_mtxNode); return; } int errorCode = 0; int ret = _nlsEncoder->createNlsEncoder( _encoderType, 1, _request->getRequestParam()->_sampleRate, &errorCode); if (ret < 0) { LOG_ERROR("Node(%p) createNlsEncoder failed, errcode:%d.", this, errorCode); delete _nlsEncoder; _nlsEncoder = NULL; } } } MUTEX_UNLOCK(_mtxNode); } /** * @brief: 更新并生效所有ConnectNode中设置的参数 * @return: */ void ConnectNode::updateParameters() { MUTEX_LOCK(_mtxNode); if (_request->getRequestParam()->_sampleRate == SampleRate16K) { _limitSize = Buffer16kMaxLimit; } else { _limitSize = Buffer8kMaxLimit; } time_t timeout_ms = _request->getRequestParam()->getTimeout(); utility::TextUtils::GetTimevalFromMs(&_connectTv, timeout_ms); LOG_INFO("Node(%p) set connect timeout: %ldms.", this, timeout_ms); _enableRecvTv = _request->getRequestParam()->getEnableRecvTimeout(); if (_enableRecvTv) { LOG_INFO("Node(%p) enable recv timeout.", this); } else { LOG_INFO("Node(%p) disable recv timeout.", this); } timeout_ms = _request->getRequestParam()->getRecvTimeout(); utility::TextUtils::GetTimevalFromMs(&_recvTv, timeout_ms); LOG_INFO("Node(%p) set recv timeout: %ldms.", this, timeout_ms); timeout_ms = _request->getRequestParam()->getSendTimeout(); utility::TextUtils::GetTimevalFromMs(&_sendTv, timeout_ms); LOG_INFO("Node(%p) set send timeout: %ldms.", this, timeout_ms); _enableOnMessage = _request->getRequestParam()->getEnableOnMessage(); if (_enableOnMessage) { LOG_INFO("Node(%p) enable OnMessage Callback.", this); } else { LOG_INFO("Node(%p) disable OnMessage Callback.", this); } MUTEX_UNLOCK(_mtxNode); } /** * @brief: 等待所有EventCallback退出并删除和停止EventCallback的队列 * @return: */ void ConnectNode::delAllEvents() { LOG_DEBUG( "Node(%p) delete all events begin, current node status:%s exit " "status:%s.", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str()); /* 当Node处于Invoking状态, 即还未进入正式运行, * 需要等待其进入Invoked才可进行操作 */ int try_count = 2000; while (_workStatus == NodeInvoking && try_count-- > 0) { #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } if (try_count <= 0) { LOG_WARN("Node(%p) waiting exit NodeInvoking failed.", this); } else { LOG_DEBUG("Node(%p) waiting exit NodeInvoking success.", this); } /* 当Node处于异步dns线程状态, 通知线程退出并等待其退出再进行释放 */ if (_url._enableSysGetAddr && _dnsThreadRunning) { _dnsThreadExit = true; try_count = 5000; LOG_WARN("Node(%p) waiting exit dnsThread ...", this); while (_dnsThreadRunning && try_count-- > 0) { #ifdef _MSC_VER Sleep(1); #else usleep(1000); #endif } if (try_count <= 0) { LOG_WARN("Node(%p) waiting exit dnsThread failed.", this); } else { LOG_WARN("Node(%p) waiting exit dnsThread success.", this); } } waitEventCallback(); bool del_all_events_lock_ret = true; MUTEX_TRY_LOCK(_mtxCloseNode, 2000, del_all_events_lock_ret); if (!del_all_events_lock_ret) { LOG_ERROR("Node(%p) delAllEvents, deadlock has occurred", this); } if (_dnsEvent) { event_del(_dnsEvent); event_free(_dnsEvent); _dnsEvent = NULL; } if (_readEvent) { event_del(_readEvent); event_free(_readEvent); _readEvent = NULL; } if (_writeEvent) { event_del(_writeEvent); event_free(_writeEvent); _writeEvent = NULL; } if (_connectEvent) { event_del(_connectEvent); event_free(_connectEvent); _connectEvent = NULL; } #ifdef ENABLE_HIGH_EFFICIENCY if (_connectTimerEvent != NULL) { if (_connectTimerFlag) { evtimer_del(_connectTimerEvent); _connectTimerFlag = false; } event_free(_connectTimerEvent); _connectTimerEvent = NULL; } #endif if (del_all_events_lock_ret) { MUTEX_UNLOCK(_mtxCloseNode); } waitEventCallback(); LOG_DEBUG("Node(%p) delete all events done.", this); } /** * @brief: 等待所有EventCallback退出, 默认1s超时. * @return: */ void ConnectNode::waitEventCallback() { LOG_DEBUG( "Node(%p) waiting EventCallback, current node status:%s exit status:%s " "_inEventCallbackNode:%s", this, getConnectNodeStatusString().c_str(), getExitStatusString().c_str(), _inEventCallbackNode ? "true" : "false"); #if defined(_MSC_VER) ; #else struct timespec outtime; struct timeval now; gettimeofday(&now, NULL); uint64_t time_ms = now.tv_sec * 1000 + now.tv_usec / 1000 + 1000; // plus timeout 1000ms utility::TextUtils::GetTimespecFromMs(&outtime, time_ms); MUTEX_LOCK(_mtxEventCallbackNode); if (_inEventCallbackNode) { if (ETIMEDOUT == pthread_cond_timedwait(&_cvEventCallbackNode, &_mtxEventCallbackNode, &outtime)) { LOG_WARN("Node(%p) waiting EventCallback timeout.", this); _waitEventCallbackAbnormally = true; } } MUTEX_UNLOCK(_mtxEventCallbackNode); #endif LOG_DEBUG("Node(%p) waiting EventCallback done with abnormal flag(%s).", this, _waitEventCallbackAbnormally ? "true" : "false"); // LOG_DEBUG("Node(%p) wait all EventCallback exit done.", this); } /** * @brief: 等待调用完成, 默认10s超时. * @return: */ void ConnectNode::waitInvokeFinish() { if (_syncCallTimeoutMs > 0) { LOG_DEBUG("Node(%p) waiting invoke finish, timeout %dms...", this, _syncCallTimeoutMs); #if defined(_MSC_VER) MUTEX_LOCK(_mtxInvokeSyncCallNode); #else struct timespec outtime; struct timeval now; gettimeofday(&now, NULL); uint64_t time_ms = now.tv_sec * 1000 + now.tv_usec / 1000 + _syncCallTimeoutMs; utility::TextUtils::GetTimespecFromMs(&outtime, time_ms); MUTEX_LOCK(_mtxInvokeSyncCallNode); if (ETIMEDOUT == pthread_cond_timedwait(&_cvInvokeSyncCallNode, &_mtxInvokeSyncCallNode, &outtime)) { _nodeErrCode = InvokeTimeout; LOG_WARN("Node(%p) waiting invoke timeout.", this); } MUTEX_UNLOCK(_mtxInvokeSyncCallNode); #endif LOG_DEBUG("Node(%p) waiting invoke done.", this); } } /** * @brief: 发送调用完成的信号. * @return: */ void ConnectNode::sendFinishCondSignal(NlsEvent::EventType eventType) { if (_syncCallTimeoutMs > 0) { if (eventType == NlsEvent::Close || eventType == NlsEvent::RecognitionStarted || eventType == NlsEvent::TranscriptionStarted || eventType == NlsEvent::SynthesisStarted) { LOG_DEBUG("Node(%p) send finish cond signal, event type:%d.", this, eventType); bool useless_flag = false; #ifdef _MSC_VER SET_EVENT(useless_flag, _mtxInvokeSyncCallNode); #else SEND_COND_SIGNAL(_mtxInvokeSyncCallNode, _cvInvokeSyncCallNode, useless_flag); #endif } } } /** * @brief: 生成closed事件信息, 填入任务相关信息比如task_id * @return: */ const char *ConnectNode::genCloseMsg(std::string *buf_str) { if (_request == NULL) { return NULL; } Json::Value root; Json::FastWriter writer; std::string task_id = _request->getRequestParam()->_task_id; try { root["channelClosed"] = "nls request finished."; if (!task_id.empty()) { root["task_id"] = task_id; } *buf_str = writer.write(root); if (buf_str->empty()) { return CLOSE_JSON_STRING; } else { return buf_str->c_str(); } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return CLOSE_JSON_STRING; } } /** * @brief: 生成SynthesisStarted事件信息, 填入任务相关信息比如task_id * @return: */ const char *ConnectNode::genSynthesisStartedMsg() { if (_request == NULL) { return NULL; } /* fake: * { * "header":{ * "namespace":"SpeechSynthesizer", * "name":"SynthesisStarted", * "status":20000000, * "message_id":"94b682af3ee549349d25085e76d53610", * "task_id":"0d528c2bdba942689fd291b6b7760fd2", * "status_text":"Gateway:SUCCESS:Success." * } * } */ std::string fake_cmd = ""; Json::Value root, header; Json::FastWriter writer; std::string task_id = _request->getRequestParam()->getTaskId(); try { header["namespace"] = "SpeechSynthesizer"; header["name"] = "SynthesisStarted"; header["status"] = 20000000; header["task_id"] = task_id; header["status_text"] = "Gateway:SUCCESS:Success."; root["header"] = header; fake_cmd = writer.write(root); } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return NULL; } LOG_INFO("Node(%p) send %s to user", this, fake_cmd.c_str()); return fake_cmd.c_str(); } #ifdef ENABLE_REQUEST_RECORDING void ConnectNode::updateNodeProcess(std::string api, int status, bool enter, size_t size) { #ifdef ENABLE_NLS_DEBUG_2 uint64_t timewait_start, timewait_0, timewait_a, timewait_end; timewait_start = utility::TextUtils::GetTimestampMs(); #endif if (api == "start") { if (enter) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.start_timestamp_ms = _nodeProcess.last_op_timestamp_ms; _nodeProcess.last_status = NodeInvoking; _nodeProcess.api_start_run = true; _nodeProcess.last_api_timestamp_ms = utility::TextUtils::GetTimestampMs(); } else { _nodeProcess.api_start_run = false; } } else if (api == "stop") { if (enter) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.stop_timestamp_ms = _nodeProcess.last_op_timestamp_ms; _nodeProcess.last_status = NodeStop; _nodeProcess.api_stop_run = true; _nodeProcess.last_api_timestamp_ms = utility::TextUtils::GetTimestampMs(); } else { _nodeProcess.api_stop_run = false; } } else if (api == "cancel") { if (enter) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.cancel_timestamp_ms = _nodeProcess.last_op_timestamp_ms; _nodeProcess.last_status = NodeCancel; _nodeProcess.api_cancel_run = true; _nodeProcess.last_api_timestamp_ms = utility::TextUtils::GetTimestampMs(); } else { _nodeProcess.api_cancel_run = false; } } else if (api == "sendAudio") { if (enter) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.last_send_timestamp_ms = _nodeProcess.last_op_timestamp_ms; _nodeProcess.last_status = NodeSendAudio; _nodeProcess.recording_bytes += size; _nodeProcess.send_count++; _nodeProcess.api_send_run = true; _nodeProcess.last_api_timestamp_ms = utility::TextUtils::GetTimestampMs(); } else { _nodeProcess.api_send_run = false; } } else if (api == "ctrl") { if (enter) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.last_ctrl_timestamp_ms = _nodeProcess.last_op_timestamp_ms; _nodeProcess.last_status = NodeSendControl; _nodeProcess.api_ctrl_run = true; _nodeProcess.last_api_timestamp_ms = utility::TextUtils::GetTimestampMs(); } else { _nodeProcess.api_ctrl_run = false; } } else if (api == "send_text") { if (enter) { _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.last_status = NodeSendText; _nodeProcess.api_ctrl_run = true; _nodeProcess.last_api_timestamp_ms = utility::TextUtils::GetTimestampMs(); } else { _nodeProcess.api_ctrl_run = false; } } else if (api == "callback") { if (enter) { #ifdef ENABLE_NLS_DEBUG_2 timewait_0 = utility::TextUtils::GetTimestampMs(); #endif _nodeProcess.last_callback = (NlsEvent::EventType)status; switch (_nodeProcess.last_callback) { case NlsEvent::RecognitionStarted: case NlsEvent::TranscriptionStarted: _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.started_timestamp_ms = _nodeProcess.last_op_timestamp_ms; break; case NlsEvent::TranscriptionCompleted: case NlsEvent::RecognitionCompleted: case NlsEvent::SynthesisCompleted: _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.completed_timestamp_ms = _nodeProcess.last_op_timestamp_ms; break; case NlsEvent::Binary: _nodeProcess.last_op_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.play_bytes += size; _nodeProcess.play_count++; _nodeProcess.last_status = NodePlayAudio; if (_isFirstBinaryFrame) { _nodeProcess.first_binary_timestamp_ms = _nodeProcess.last_op_timestamp_ms; } break; } #ifdef ENABLE_NLS_DEBUG_2 timewait_a = utility::TextUtils::GetTimestampMs(); #endif _nodeProcess.last_cb_start_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.last_cb_end_timestamp_ms = 0; _nodeProcess.last_cb_run = true; } else { _nodeProcess.last_cb_end_timestamp_ms = utility::TextUtils::GetTimestampMs(); _nodeProcess.last_cb_run = false; } } #ifdef ENABLE_NLS_DEBUG_2 timewait_end = utility::TextUtils::GetTimestampMs(); if (timewait_end - timewait_start > 50) { LOG_WARN( "Request(%p) Node(%p) updateNodeProcess calback %d done with " "excessive time " "%llums, including 0:%llu a:%llu b:%llums.", _request, this, _nodeProcess.last_callback, timewait_end - timewait_start, timewait_0 - timewait_start, timewait_a - timewait_start, timewait_end - timewait_a); } #endif } const char *ConnectNode::dumpAllInfo() { try { Json::Value root(Json::objectValue); Json::Value request_process(Json::objectValue); Json::Value timestamp(Json::objectValue); Json::Value last(Json::objectValue); Json::Value data(Json::objectValue); Json::Value callback(Json::objectValue); Json::Value block(Json::objectValue); Json::StreamWriterBuilder writerBuilder; writerBuilder["indentation"] = ""; timestamp = updateNodeProcess4Timestamp(); last = updateNodeProcess4Last(); data = updateNodeProcess4Data(); callback = updateNodeProcess4Callback(); block = updateNodeProcess4Block(); if (!timestamp.isNull()) { root["timestamp"] = timestamp; } if (!last.isNull()) { root["last"] = last; } if (!data.isNull()) { root["data"] = data; } if (!callback.isNull()) { root["callback"] = callback; } if (!block.isNull() && block.isObject() && !block.empty()) { root["block"] = block; } root["sdkversion"] = NLS_SDK_VERSION_STR; #ifdef ENABLE_CONTINUED Json::Value reconnection(Json::objectValue); reconnection = updateNodeReconnection(); if (!reconnection.isNull() && reconnection.isObject() && !reconnection.empty()) { root["reconnection"] = reconnection; } #endif root["connect_type"] = getConnectTypeStr(); std::string info = Json::writeString(writerBuilder, root); LOG_INFO("Request(%p) Node(%p) current info: %s", _request, this, info.c_str()); return info.c_str(); } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return ""; } } std::string ConnectNode::getConnectTypeStr() { std::string result = "unknown"; ConnectType type = _nodeProcess.connect_type; switch (type) { case ConnectWithSSL: result = "connect_with_SSL"; break; case ConnectWithDirectIP: result = "connect_with_direct_IP"; break; case ConnectWithIpCache: result = "connect_with_IP_from_cache"; break; case ConnectWithLongConnect: result = "connect_with_long_connection"; break; case ConnectWithPrestartedNodePool: result = "connect_with_SSL_from_PrestartedPool"; break; case ConnectWithPreconnectedNodePool: result = "connect_with_SSL_from_PreconnectedPool"; break; } return result; } /** * @brief: 在closed事件或TaskFailed(由SDK引发)事件中填入此请求的记录 * @return: */ std::string ConnectNode::replenishNodeProcess(const char *error) { if (error == NULL) { return ""; } try { Json::Reader reader; Json::Value root(Json::objectValue); if (!reader.parse(error, root)) { return ""; } else { Json::Value request_process(Json::objectValue); Json::Value timestamp(Json::objectValue); Json::Value last(Json::objectValue); Json::Value data(Json::objectValue); Json::Value callback(Json::objectValue); Json::Value block(Json::objectValue); Json::StreamWriterBuilder writerBuilder; writerBuilder["indentation"] = ""; timestamp = updateNodeProcess4Timestamp(); last = updateNodeProcess4Last(); data = updateNodeProcess4Data(); callback = updateNodeProcess4Callback(); block = updateNodeProcess4Block(); if (!timestamp.isNull()) { root["timestamp"] = timestamp; } if (!last.isNull()) { root["last"] = last; } if (!data.isNull()) { root["data"] = data; } if (!callback.isNull()) { root["callback"] = callback; } if (!block.isNull() && block.isObject() && !block.empty()) { root["block"] = block; } root["sdkversion"] = NLS_SDK_VERSION_STR; #ifdef ENABLE_CONTINUED Json::Value reconnection(Json::objectValue); reconnection = updateNodeReconnection(); if (!reconnection.isNull() && reconnection.isObject() && !reconnection.empty()) { root["reconnection"] = reconnection; } #endif root["connect_type"] = getConnectTypeStr(); return Json::writeString(writerBuilder, root); } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return ""; } } Json::Value ConnectNode::updateNodeProcess4Data() { Json::Value data(Json::objectValue); try { if (_nodeProcess.recording_bytes > 0) { data["recording_bytes"] = (Json::UInt64)_nodeProcess.recording_bytes; } if (_nodeProcess.send_count > 0) { data["send_count"] = (Json::UInt64)_nodeProcess.send_count; } if (_nodeProcess.play_bytes > 0) { data["play_bytes"] = (Json::UInt64)_nodeProcess.play_bytes; } if (_nodeProcess.play_count > 0) { data["play_count"] = (Json::UInt64)_nodeProcess.play_count; } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return data; } return data; } Json::Value ConnectNode::updateNodeProcess4Last() { Json::Value last(Json::objectValue); try { last["status"] = getConnectNodeStatusString(_nodeProcess.last_status); if (_nodeProcess.last_send_timestamp_ms > 0) { last["send"] = utility::TextUtils::GetTimeFromMs( _nodeProcess.last_send_timestamp_ms); } if (_nodeProcess.last_ctrl_timestamp_ms > 0) { last["control"] = utility::TextUtils::GetTimeFromMs( _nodeProcess.last_ctrl_timestamp_ms); } if (_nodeProcess.last_op_timestamp_ms > 0) { last["action"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.last_op_timestamp_ms); } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return last; } return last; } Json::Value ConnectNode::updateNodeProcess4Timestamp() { Json::Value timestamp(Json::objectValue); try { timestamp["create"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.create_timestamp_ms); if (_nodeProcess.start_timestamp_ms > 0) { timestamp["start"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.start_timestamp_ms); } if (_nodeProcess.started_timestamp_ms > 0) { timestamp["started"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.started_timestamp_ms); } if (_nodeProcess.stop_timestamp_ms > 0) { timestamp["stop"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.stop_timestamp_ms); } if (_nodeProcess.cancel_timestamp_ms > 0) { timestamp["cancel"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.cancel_timestamp_ms); } if (_nodeProcess.failed_timestamp_ms > 0) { timestamp["failed"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.failed_timestamp_ms); } if (_nodeProcess.completed_timestamp_ms > 0) { timestamp["completed"] = utility::TextUtils::GetTimeFromMs( _nodeProcess.completed_timestamp_ms); timestamp["completed_latency"] = (Json::UInt64)(_nodeProcess.completed_timestamp_ms - _nodeProcess.start_timestamp_ms); } if (_nodeProcess.closed_timestamp_ms > 0) { timestamp["closed"] = utility::TextUtils::GetTimeFromMs(_nodeProcess.closed_timestamp_ms); } if (_nodeProcess.first_binary_timestamp_ms > 0) { timestamp["first_binary"] = utility::TextUtils::GetTimeFromMs( _nodeProcess.first_binary_timestamp_ms); uint64_t first_binary_ms = _nodeProcess.first_binary_timestamp_ms - _nodeProcess.start_timestamp_ms; timestamp["first_binary_latency"] = (Json::UInt64)(first_binary_ms); if (first_binary_ms > 1000) { LOG_WARN("Request(%p) Node(%p) include abnormal first_binary:%llums", _request, this, first_binary_ms); #ifdef ENABLE_NLS_DEBUG_2 } else if (first_binary_ms > 500) { LOG_DEBUG( "Request(%p) Node(%p) include poor health first_binary:%llums", _request, this, first_binary_ms); #endif } } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return timestamp; } return timestamp; } Json::Value ConnectNode::updateNodeProcess4Callback() { Json::Value callback(Json::objectValue); try { NlsEvent event; std::string cb_name = event.getMsgTypeString(_nodeProcess.last_callback); if (!cb_name.empty() && cb_name != "Unknown") { callback["name"] = cb_name; if (_nodeProcess.last_cb_start_timestamp_ms > 0) { callback["start"] = utility::TextUtils::GetTimeFromMs( _nodeProcess.last_cb_start_timestamp_ms); } if (_nodeProcess.last_cb_end_timestamp_ms > 0) { callback["end"] = utility::TextUtils::GetTimeFromMs( _nodeProcess.last_cb_end_timestamp_ms); } if (_nodeProcess.last_cb_run) { callback["status"] = "running"; } } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return callback; } return callback; } Json::Value ConnectNode::updateNodeProcess4Block() { Json::Value block(Json::objectValue); try { bool running_flag = false; std::string timestamp_name = ""; std::string duration_name = ""; if (_nodeProcess.api_start_run) { block["start"] = "running"; timestamp_name = "start_timestamp"; duration_name = "start_duration_ms"; running_flag = true; } if (_nodeProcess.api_stop_run) { block["stop"] = "running"; timestamp_name = "stop_timestamp"; duration_name = "stop_duration_ms"; running_flag = true; } if (_nodeProcess.api_cancel_run) { block["cancel"] = "running"; timestamp_name = "cancel_timestamp"; duration_name = "cancel_duration_ms"; running_flag = true; } if (_nodeProcess.api_send_run) { block["send"] = "running"; timestamp_name = "send_timestamp"; duration_name = "send_duration_ms"; running_flag = true; } if (_nodeProcess.api_ctrl_run) { block["ctrl"] = "running"; timestamp_name = "ctrl_timestamp"; duration_name = "ctrl_duration_ms"; running_flag = true; } if (running_flag && _nodeProcess.last_api_timestamp_ms > 0) { block[timestamp_name] = utility::TextUtils::GetTimeFromMs(_nodeProcess.last_api_timestamp_ms); uint64_t current = utility::TextUtils::GetTimestampMs(); block[duration_name] = Json::UInt64(current - _nodeProcess.last_api_timestamp_ms); } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return block; } return block; } #endif #ifdef ENABLE_CONTINUED Json::Value ConnectNode::updateNodeReconnection() { Json::Value reconnection(Json::objectValue); try { if (_reconnection.first_audio_timestamp_ms > 0) { reconnection["first_audio_timestamp"] = utility::TextUtils::GetTimeFromMs( _reconnection.first_audio_timestamp_ms); } if (_reconnection.interruption_timestamp_ms > 0) { reconnection["interruption_timestamp"] = utility::TextUtils::GetTimeFromMs( _reconnection.interruption_timestamp_ms); } if (_reconnection.tw_index_offset > 0) { reconnection["tw_index_offset"] = (Json::UInt64)_reconnection.tw_index_offset; } if (_reconnection.state != NodeReconnection::NoReconnection) { reconnection["should_reconnecting"] = true; reconnection["reconnected_count"] = _reconnection.reconnected_count; } } catch (const std::exception &e) { LOG_ERROR("Json failed: %s", e.what()); return reconnection; } return reconnection; } void ConnectNode::updateTwIndexOffset(NlsEvent *frameEvent) { if (frameEvent) { if (frameEvent->getMsgType() == NlsEvent::SentenceBegin) { _reconnection.tw_index_offset = frameEvent->getSentenceIndex(); } } } bool ConnectNode::nodeReconnecting() { if (_reconnection.state == NodeReconnection::WillReconnect) { struct timeval _reconnectTv; utility::TextUtils::GetTimevalFromMs( &_reconnectTv, NodeReconnection::reconnect_interval_ms); LOG_INFO("reconnect node(%p) after %dms.", this, NodeReconnection::reconnect_interval_ms); int event_ret = event_add(getReconnectEvent(), &_reconnectTv); if (event_ret == Success) { LOG_INFO("reconnect node(%p) event_add success.", this); _reconnection.state = NodeReconnection::TriggerReconnection; } else { LOG_WARN("reconnect node(%p) event_add failed with %d.", this, event_ret); } return true; } else if (_reconnection.state == NodeReconnection::TriggerReconnection || _reconnection.state == NodeReconnection::NewReconnectionStarting) { LOG_DEBUG( "Node(%p) reconnection has launched[(%d)" "0:NoReconnection,1:WillReconnect,2:TriggerReconnection,3:" "NewReconnectionStarting]", this, _reconnection.state); } return false; } /** * @brief: 获得用于重启当前node的libevent * @return: libevent的event指针 */ struct event *ConnectNode::getReconnectEvent() { if (_reconnectEvent != NULL) { event_del(_reconnectEvent); event_free(_reconnectEvent); _reconnectEvent = NULL; } _reconnectEvent = event_new(_eventThread->_workBase, -1, EV_READ | EV_TIMEOUT, WorkThread::launchEventCallback, this); if (NULL == _reconnectEvent) { LOG_ERROR("Node(%p) new event(_reconnectEvent) failed.", this); } else { LOG_DEBUG("Node(%p) new event(_reconnectEvent).", this); } return _reconnectEvent; } #endif void ConnectNode::sendFakeSynthesisStarted() { // send SynthesisStarted if speechSynthesizer if (_request->getRequestParam()->getNlsRequestType() == SpeechSynthesizer) { NlsEvent *useEvent = new NlsEvent(genSynthesisStartedMsg(), Success, NlsEvent::SynthesisStarted, _request->getRequestParam()->_task_id); if (useEvent) { handlerFrame(useEvent); delete useEvent; } } } #ifdef ENABLE_PRECONNECTED_POOL int ConnectNode::tryToGetPreconnection() { ConnectedStatus result = PreNodeInvalid; if (NlsEventNetWork::_eventClient && NlsEventNetWork::_eventClient->getPreconnectedPool()) { bool popPreNode = NlsEventNetWork::_eventClient->getPreconnectedPool()->popPrestartedNode( _request, _request->getRequestParam()->_mode); if (popPreNode) { result = PreNodeStarted; } else { popPreNode = NlsEventNetWork::_eventClient->getPreconnectedPool() ->popPreconnectedNode( _request, _request->getRequestParam()->_mode); if (popPreNode) { result = PreNodeConnected; } } } return result; } int ConnectNode::getPoolIndex() { #ifdef ENABLE_NLS_DEBUG_2 LOG_DEBUG("Request(%p) Node(%p) getPoolIndex %d.", _request, this, _poolIndex); #endif return _poolIndex; } void ConnectNode::setPoolIndex(int index) { #ifdef ENABLE_NLS_DEBUG_2 LOG_DEBUG("Request(%p) Node(%p) setPoolIndex %d.", _request, this, index); #endif _poolIndex = index; } #endif bool ConnectNode::ignoreCallbackWhenReconnecting(NlsEvent::EventType eventType, int code) { #ifdef ENABLE_CONTINUED if (_request) { if (_request->getRequestParam()->_enableReconnect) { if (eventType == NlsEvent::TaskFailed) { if ((code >= 50000000 && code < 60000000) || code == DefaultErrorCode || code == SysErrorCode || code == UnknownWsHeadType || code == HttpConnectFailed || code == SysConnectFailed || code == ClientError || code == ConcurrencyExceed) { if (_reconnection.state == NodeReconnection::NoReconnection || _reconnection.state == NodeReconnection::NewReconnectionStarting) { _reconnection.interruption_timestamp_ms = utility::TextUtils::GetTimestampMs(); _reconnection.reconnected_count++; _reconnection.state = NodeReconnection::WillReconnect; } } } if (_reconnection.state == NodeReconnection::WillReconnect || _reconnection.state == NodeReconnection::TriggerReconnection) { if (eventType == NlsEvent::TaskFailed || eventType == NlsEvent::Close) { if (_reconnection.reconnected_count <= NodeReconnection::max_try_count) { NlsEvent tmp; LOG_INFO( "Node(%p) state(%d) get status code is %d with %s, try %d " "times, should reconnecting " "and " "ignore callback.", this, _reconnection.state, code, tmp.getMsgTypeString(eventType).c_str(), _reconnection.reconnected_count); return true; } else { _reconnection.state = NodeReconnection::NoReconnection; LOG_INFO("Node(%p) failed %d times, should boot normally.", this, _reconnection.reconnected_count); } } } } } #endif return false; } } // namespace AlibabaNls