in nlsCppSdk/event/workThread.cpp [321:1657]
unsigned __stdcall WorkThread::loopEventCallback(LPVOID arg) {
#else
void *WorkThread::loopEventCallback(void *arg) {
#endif
WorkThread *eventParam = static_cast<WorkThread *>(arg);
#if defined(__ANDROID__) || defined(__linux__)
sigset_t signal_mask;
if (-1 == sigemptyset(&signal_mask)) {
LOG_ERROR("sigemptyset failed.");
exit(1);
}
if (-1 == sigaddset(&signal_mask, SIGPIPE)) {
LOG_ERROR("sigaddset failed.");
exit(1);
}
if (pthread_sigmask(SIG_BLOCK, &signal_mask, NULL) != 0) {
LOG_ERROR("pthread_sigmask failed.");
exit(1);
}
prctl(PR_SET_NAME, "eventThread");
#endif
LOG_DEBUG("workThread(%p) create loopEventCallback.", arg);
if (eventParam->_workBase) {
LOG_DEBUG("workThread(%p) event_base_dispatch ...", arg);
event_base_dispatch(eventParam->_workBase);
}
if (eventParam->_dnsBase) {
evdns_base_free(eventParam->_dnsBase, 0);
eventParam->_dnsBase = NULL;
}
if (eventParam->_workBase) {
event_base_free(eventParam->_workBase);
eventParam->_workBase = NULL;
}
eventParam->_workThreadId = 0;
LOG_DEBUG("workThread(%p) loopEventCallback exit.", arg);
#if defined(_MSC_VER)
return Success;
#else
return NULL;
#endif
}
#ifdef ENABLE_HIGH_EFFICIENCY
/**
* @brief: 定时进行connect()后检查链接状态并开启ssl握手.
* @return:
*/
void WorkThread::connectTimerEventCallback(evutil_socket_t socketFd,
short event, void *arg) {
int errorCode = 0;
ConnectNode *node = static_cast<ConnectNode *>(arg);
node->_inEventCallbackNode = true;
LOG_DEBUG("Node(%p) connectTimerEventCallback node status:%s ...", node,
node->getConnectNodeStatusString().c_str());
if (event == EV_CLOSED) {
LOG_DEBUG("Node(%p) connect get EV_CLOSED.", node);
goto ConnectTimerProcessFailed;
} else {
// event == EV_TIMEOUT
if (node->getConnectNodeStatus() == NodeConnecting) {
socklen_t len = sizeof(errorCode);
getsockopt(socketFd, SOL_SOCKET, SO_ERROR, (char *)&errorCode, &len);
if (!errorCode) {
LOG_DEBUG(
"Node(%p) connect return ev_write, check ok, set "
"NodeStatus:NodeConnected.",
node);
node->setConnectNodeStatus(NodeConnected);
#ifndef _MSC_VER
// get client ip and port from socketFd
struct sockaddr_in client;
char client_ip[20];
socklen_t client_len = sizeof(client);
getsockname(socketFd, (struct sockaddr *)&client, &client_len);
inet_ntop(AF_INET, &client.sin_addr, client_ip, sizeof(client_ip));
LOG_DEBUG("Node(%p) local %s:%d", node, client_ip,
ntohs(client.sin_port));
#endif
node->setConnected(true);
} else {
/* 再次尝试connect(), 并启动下一次connectEventCallback */
if (node->socketConnect() < 0) {
/* socket connect 失败 */
goto ConnectTimerProcessFailed;
}
}
}
/* connect成功, 开始握手 */
if (node->getConnectNodeStatus() == NodeConnected) {
int ret = node->sslProcess();
switch (ret) {
case 0:
LOG_DEBUG("Node(%p) begin gateway request process.", node);
/* 进入gatewayRequest()的ssl握手阶段 */
if (nodeRequestProcess(node) < 0) {
destroyConnectNode(node);
}
break;
case 1:
/* sslProcess()中已经启动了下一次_connectEvent */
// LOG_DEBUG("wait connecting ...");
break;
default:
goto HandshakeTimerProcessFailed;
}
}
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
LOG_DEBUG("Node(%p) connectTimerEventCallback done.", node);
return;
HandshakeTimerProcessFailed:
ConnectTimerProcessFailed:
/*
* connect失败, 或者connect成功但是handshake失败.
* 进行断链并重回connecting阶段, 然后再开始dns解析.
*/
LOG_ERROR("Node(%p) connect or handshake failed, socket error mesg:%s.", node,
evutil_socket_error_to_string(
evutil_socket_geterror(node->getSocketFd())));
node->disconnectProcess();
node->setConnectNodeStatus(NodeConnecting);
if (node->dnsProcess(node->getEventThread()->_addrInFamily,
node->getEventThread()->_directIp,
node->getEventThread()->_enableSysGetAddr) < 0) {
LOG_ERROR("Node(%p) try delete request.", node);
destroyConnectNode(node);
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
LOG_DEBUG("Node(%p) connectTimerEventCallback done with failure.", node);
return;
}
#endif
/**
* @brief: connect()后检查链接状态并开启ssl握手.
* @return:
*/
void WorkThread::connectEventCallback(evutil_socket_t socketFd, short event,
void *arg) {
int errorCode = 0;
ConnectNode *node = static_cast<ConnectNode *>(arg);
node->_inEventCallbackNode = true;
// LOG_DEBUG("Node(%p) connectEventCallback node status:%s ...",
// node, node_manager->getNodeStatusString(status).c_str());
if (event == EV_TIMEOUT) {
LOG_ERROR("Node(%p) connect get EV_TIMEOUT.", node);
goto ConnectProcessFailed;
} else if (event == EV_CLOSED) {
LOG_DEBUG("Node(%p) connect get EV_CLOSED.", node);
goto ConnectProcessFailed;
} else {
LOG_DEBUG("Node(%p) current connect node status:%s, EV:%02x.", node,
node->getConnectNodeStatusString().c_str(), event);
if (node->getConnectNodeStatus() == NodeConnecting) {
socklen_t len = sizeof(errorCode);
getsockopt(socketFd, SOL_SOCKET, SO_ERROR, (char *)&errorCode, &len);
if (!errorCode) {
LOG_DEBUG(
"Node(%p) connect return ev_write, check ok, set "
"NodeStatus:NodeConnected.",
node);
node->setConnectNodeStatus(NodeConnected);
node->setConnected(true);
#ifndef _MSC_VER
// get client ip and port from socketFd
struct sockaddr_in client;
char client_ip[20];
socklen_t client_len = sizeof(client);
getsockname(socketFd, (struct sockaddr *)&client, &client_len);
inet_ntop(AF_INET, &client.sin_addr, client_ip, sizeof(client_ip));
LOG_DEBUG("Node(%p) local %s:%d", node, client_ip,
ntohs(client.sin_port));
#endif
} else {
/* 再次尝试connect(), 并启动下一次connectEventCallback */
if (node->socketConnect() < 0) {
/* socket connect 失败 */
goto ConnectProcessFailed;
}
}
}
/* connect成功, 开始握手 */
if (node->getConnectNodeStatus() == NodeConnected) {
int ret = node->sslProcess();
switch (ret) {
case 0:
LOG_DEBUG("Node(%p) begin gateway request process.", node);
/* 进入gatewayRequest()的ssl握手阶段 */
if (nodeRequestProcess(node) < 0) {
destroyConnectNode(node);
}
break;
case 1:
/* sslProcess()中已经启动了下一次_connectEvent */
// LOG_DEBUG("wait connecting.");
break;
default:
goto HandshakeProcessFailed;
}
}
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
LOG_DEBUG("Node(%p) connectEventCallback done.", node);
return;
HandshakeProcessFailed:
ConnectProcessFailed:
/*
* connect失败, 或者connect成功但是handshake失败.
* 进行断链并重回connecting阶段, 然后再开始dns解析.
*/
LOG_ERROR("Node(%p) connect or handshake failed, socket error mesg:%s.", node,
evutil_socket_error_to_string(
evutil_socket_geterror(node->getSocketFd())));
#ifdef ENABLE_DNS_IP_CACHE
node->getEventThread()->setIpCache(NULL, NULL);
#endif
node->disconnectProcess();
node->setConnectNodeStatus(NodeConnecting);
if (node->dnsProcess(node->getEventThread()->_addrInFamily,
node->getEventThread()->_directIp,
node->getEventThread()->_enableSysGetAddr) < 0) {
LOG_ERROR("Node(%p) try delete request.", node);
destroyConnectNode(node);
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
LOG_DEBUG("Node(%p) connectEventCallback done with failure.", node);
return;
}
void WorkThread::readEventCallBack(evutil_socket_t socketFd, short what,
void *arg) {
char tmp_msg[512] = {0};
int ret = Success;
ConnectNode *node = static_cast<ConnectNode *>(arg);
if (node == NULL) {
LOG_ERROR("Node is nullptr!!!");
return;
}
NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
int status = NodeStatusInvalid;
int result = node_manager->checkNodeExist(node, &status);
if (result != Success) {
LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
return;
}
node->_inEventCallbackNode = true;
#ifdef ENABLE_NLS_DEBUG_2
struct timeval start, end;
gettimeofday(&start, NULL);
LOG_DEBUG(
"Request(%p) Node(%p) readEventCallBack begin, current socketFd:%d "
"event:%d, node "
"status:%s, exit status:%s.",
node->getRequest(), node, socketFd, what,
node->getConnectNodeStatusString().c_str(),
node->getExitStatusString().c_str());
#endif
if (node->getExitStatus() == ExitCancel) {
LOG_WARN("Node(%p) skip this operation ...", node);
node->closeConnectNode();
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
}
if (what == EV_READ) {
ret = nodeResponseProcess(node);
if (ret == -(InvalidRequest)) {
LOG_ERROR("Node(%p) has invalid request, skip all operation.", node);
node->closeConnectNode();
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
} else if (ret == -(EventClientEmpty)) {
LOG_ERROR("Instance has released, skip all operation.");
return;
} else if (ret == -(InvalidStatusWhenReleasing)) {
LOG_ERROR("Node(%p) is releasing, skip all operation.", node);
return;
}
} else if (what == EV_TIMEOUT) {
snprintf(tmp_msg, 512 - 1, "Recv timeout. socket error:%s.",
evutil_socket_error_to_string(
evutil_socket_geterror(node->getSocketFd())));
LOG_ERROR("Node(%p) error msg:%s.", node, tmp_msg);
node->closeConnectNode();
node->handlerTaskFailedEvent(tmp_msg, EvRecvTimeout);
} else {
snprintf(tmp_msg, 512 - 1, "Unknown event:%02x. %s", what,
evutil_socket_error_to_string(
evutil_socket_geterror(node->getSocketFd())));
LOG_ERROR("Node(%p) error msg:%s.", node, tmp_msg);
node->closeConnectNode();
node->handlerTaskFailedEvent(tmp_msg, EvUnknownEvent);
}
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&end, NULL);
uint64_t time_consuming = end.tv_sec * 1000 + end.tv_usec / 1000 -
start.tv_sec * 1000 - start.tv_usec / 1000;
if (time_consuming >= 100) {
LOG_WARN(
"Request(%p) Node(%p) readEventCallBack done with excessive time "
"%llums.",
node->getRequest(), node, time_consuming);
}
#endif
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
#ifdef ENABLE_NLS_DEBUG_2
LOG_DEBUG(
"Request(%p) Node(%p) readEventCallBack done with "
"node->_inEventCallbackNode:%s.",
node->getRequest(), node, node->_inEventCallbackNode ? "true" : "false");
#endif
return;
}
void WorkThread::writeEventCallBack(evutil_socket_t socketFd, short what,
void *arg) {
char tmp_msg[512] = {0};
ConnectNode *node = static_cast<ConnectNode *>(arg);
if (node == NULL) {
LOG_ERROR("Node is nullptr!!!");
return;
}
NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
int status = NodeStatusInvalid;
int result = node_manager->checkNodeExist(node, &status);
if (result != Success) {
LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
return;
}
node->_inEventCallbackNode = true;
// LOG_DEBUG(
// "Request(%p) Node(%p) writeEventCallBack current event:%d, node "
// "status:%s, exit status:%s.",
// node->getRequest(), node, what,
// node->getConnectNodeStatusString().c_str(),
// node->getExitStatusString().c_str());
if (node->getExitStatus() == ExitCancel) {
LOG_WARN("Node(%p) skip this operation ...", node);
node->closeConnectNode();
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
}
if (what == EV_WRITE) {
nodeRequestProcess(node);
} else if (what == EV_TIMEOUT) {
snprintf(tmp_msg, 512 - 1, "Send timeout. socket error:%s",
evutil_socket_error_to_string(
evutil_socket_geterror(node->getSocketFd())));
LOG_ERROR("Node(%p) %s", node, tmp_msg);
node->closeConnectNode();
node->handlerTaskFailedEvent(tmp_msg, EvSendTimeout);
} else {
snprintf(tmp_msg, 512 - 1, "Unknown event:%02x. %s", what,
evutil_socket_error_to_string(
evutil_socket_geterror(node->getSocketFd())));
LOG_ERROR("Node(%p) %s.", node, tmp_msg);
node->closeConnectNode();
node->handlerTaskFailedEvent(tmp_msg, EvUnknownEvent);
}
if (node->getConnectNodeStatus() == NodeInvalid) {
destroyConnectNode(node);
}
// LOG_DEBUG("Node(%p) writeEventCallBack done.", node);
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
}
/**
* @brief: IP直连
* @return:
*/
void WorkThread::directConnect(void *arg, char *ip) {
ConnectNode *node = static_cast<ConnectNode *>(arg);
if (ip) {
LOG_DEBUG("Node(%p) direct IpV4:%s.", node, ip);
int ret = node->connectProcess(ip, AF_INET);
if (ret == 0) {
ret = node->sslProcess();
if (ret == Success) {
LOG_DEBUG("Node(%p) begin gateway request process.", node);
if (nodeRequestProcess(node) < 0) {
destroyConnectNode(node);
}
return;
}
}
if (ret == 1) {
LOG_DEBUG("Node(%p) connectProcess return 1, will try connect ...", node);
// connect EINPROGRESS
return;
} else {
LOG_ERROR(
"Node(%p) goto DirectConnectRetry with ret:%d and node status:%s "
"exit status:%s.",
node, ret, node->getConnectNodeStatusString().c_str(),
node->getExitStatusString().c_str());
#ifdef ENABLE_DNS_IP_CACHE
node->getEventThread()->setIpCache(NULL, NULL);
#endif
goto DirectConnectRetry;
}
}
DirectConnectRetry:
node->disconnectProcess();
node->setConnectNodeStatus(NodeConnecting);
if (node->dnsProcess(node->getEventThread()->_addrInFamily, ip,
node->getEventThread()->_enableSysGetAddr) < 0) {
destroyConnectNode(node);
}
return;
}
#ifdef ENABLE_PRECONNECTED_POOL
bool WorkThread::syncDirectConnect(void *arg, char *ip) {
ConnectNode *node = static_cast<ConnectNode *>(arg);
if (ip) {
LOG_DEBUG("Node(%p) direct IpV4:%s.", node, ip);
int ret = node->syncConnectProcess(ip, AF_INET);
if (ret == 0) {
ret = node->syncSslProcess();
if (ret == Success) {
LOG_DEBUG("Node(%p) begin gateway request process with prestart.",
node);
// new _readEvent and _writeEvent
node->prestartProcess();
if (nodeRequestProcess(node) < 0) {
destroyConnectNode(node);
node->prestartEventDelProcess();
return false;
}
bool result = true;
if (node->isPreNodeStartStepByStep()) {
int tryCount = 50; // 500ms
while (tryCount-- > 0 &&
node->getConnectNodeStatus() != NodeStarting) {
usleep(10 * 1000);
}
LOG_INFO("Request(%p) Node(%p) now status:%s and try count:%d.",
node->getRequest(), node,
node->getConnectNodeStatusString().c_str(), tryCount);
if (tryCount == 0) {
result = false;
}
}
// delete _readEvent & _writeEvent
node->prestartEventDelProcess();
return result;
}
} else if (ret == 1) {
LOG_DEBUG("Node(%p) connectProcess return 1, will try connect ...", node);
// connect EINPROGRESS
return false;
} else {
LOG_ERROR(
"Node(%p) syncDirectConnect failed with ret:%d and node status:%s "
"exit status:%s.",
node, ret, node->getConnectNodeStatusString().c_str(),
node->getExitStatusString().c_str());
#ifdef ENABLE_DNS_IP_CACHE
node->getEventThread()->setIpCache(NULL, NULL);
#endif
return false;
}
}
return false;
}
#endif
/**
* @brief: 启动语音交互请求
* @return:
*/
void WorkThread::launchEventCallback(evutil_socket_t fd, short which,
void *arg) {
ConnectNode *node = static_cast<ConnectNode *>(arg);
if (NULL == node) {
LOG_ERROR("Node is nullptr!!!");
return;
}
NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
int status = NodeStatusInvalid;
int result = node_manager->checkNodeExist(node, &status);
if (result != Success) {
LOG_ERROR("The node(%p) checkNodeExist failed, result:%d.", node, result);
return;
}
INlsRequest *request = node->getRequest();
WorkThread *pThread = node->getEventThread();
if (pThread == NULL) {
LOG_ERROR("The WorkThread of Node(%p) is nullptr, skipping ...", node);
return;
}
node->_inEventCallbackNode = true;
LOG_DEBUG(
"WorkThread(%p) Node(%p) Request(%p) trigger launchEventCallback with "
"reconnection mechanism(%s) and isUsingPreconnection flag(%s) "
"isPreconnecting flag(%s), current status "
"is %s.",
pThread, node, request,
request->getRequestParam()->_enableReconnect ? "true" : "false",
node->isUsingPreconnection() ? "true" : "false",
node->isPreconnecting() ? "true" : "false",
node->getConnectNodeStatusString().c_str());
if (node->getExitStatus() == ExitCancel ||
node->getExitStatus() == ExitStopping) {
LOG_WARN(
"WorkThread(%p) Node(%p) is canceling/stopping, current node "
"status:%s, skip "
"here.",
pThread, node, node->getConnectNodeStatusString().c_str());
node->setConnectNodeStatus(NodeInvoked);
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
}
insertListNode(pThread, request);
node->setConnectNodeStatus(NodeInvoked);
/* 将request设置的参数传入node */
node->updateParameters();
LOG_DEBUG("WorkThread(%p) Node(%p) begin dnsProcess.", pThread, node);
if (node->dnsProcess(node->getEventThread()->_addrInFamily,
node->getEventThread()->_directIp,
node->getEventThread()->_enableSysGetAddr) < 0) {
LOG_WARN(
"WorkThread(%p) Node(%p) dnsProcess failed, ready to "
"destroyConnectNode.",
pThread, node);
destroyConnectNode(node);
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
}
#ifdef ENABLE_PRECONNECTED_POOL
void WorkThread::startWithPoolEventCallback(evutil_socket_t fd, short which,
void *arg) {
ConnectNode *node = static_cast<ConnectNode *>(arg);
if (NULL == node) {
LOG_ERROR("Node is nullptr!!!");
return;
}
NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
int status = NodeStatusInvalid;
int result = node_manager->checkNodeExist(node, &status);
if (result != Success) {
LOG_ERROR("The node(%p) checkNodeExist failed, result:%d.", node, result);
return;
}
INlsRequest *request = node->getRequest();
WorkThread *pThread = node->getEventThread();
if (pThread == NULL) {
LOG_ERROR("The WorkThread of Node(%p) is nullptr, skipping ...", node);
return;
}
node->_inEventCallbackNode = true;
// LOG_INFO("Request(%p) Node(%p) %s start with pre-node ...", request, node,
// node->getConnectNodeStatusString().c_str());
node->prestartProcess();
node->setConnectNodeStatus(NodeStarting);
node->addCmdDataBuffer(CmdStart);
int ret = node->nlsSendFrame(node->getCmdEvBuffer());
if (request->getRequestParam()->_mode == TypeTts && ret >= 0) {
node->sendFakeSynthesisStarted();
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
}
#endif
void WorkThread::singleRoundTextEventCallback(evutil_socket_t fd, short which,
void *arg) {
ConnectNode *node = static_cast<ConnectNode *>(arg);
if (NULL == node) {
LOG_ERROR("Node is nullptr!!!");
return;
}
NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
int status = NodeStatusInvalid;
int result = node_manager->checkNodeExist(node, &status);
if (result != Success) {
LOG_ERROR("The node(%p) checkNodeExist failed, result:%d.", node, result);
return;
}
node->_inEventCallbackNode = true;
INlsRequest *request = node->getRequest();
FlowingSynthesizerParam *param =
(FlowingSynthesizerParam *)request->getRequestParam();
node->cmdNotify(CmdSendText, param->getSingleRoundText().c_str());
node->cmdNotify(CmdStop, NULL);
param->clearSingleRoundText();
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
return;
}
#ifdef __LINUX__
void WorkThread::sysDnsEventCallback(evutil_socket_t socketFd, short what,
void *arg) {
if (what == EV_READ) {
/* check this node is alive */
NlsClientImpl *client = _instance;
NlsNodeManager *node_manager = client->getNodeManger();
int status = NodeStatusInvalid;
ConnectNode *node = static_cast<ConnectNode *>(arg);
int ret = node_manager->checkNodeExist(node, &status);
if (ret != Success) {
LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
return;
}
dnsEventCallback(node->_dnsErrorCode, node->_addrinfo, arg);
node->_dnsErrorCode = 0;
}
return;
}
#endif
/**
* @brief: 进行DNS获得IP后开始链接
* @return:
*/
void WorkThread::dnsEventCallback(int errorCode,
struct evutil_addrinfo *address, void *arg) {
ConnectNode *node = static_cast<ConnectNode *>(arg);
NlsClientImpl *client = _instance;
NlsNodeManager *node_manager = client->getNodeManger();
int status = NodeStatusInvalid;
int ret = node_manager->checkNodeExist(node, &status);
if (ret != Success) {
LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
return;
} else {
if (status >= NodeStatusCancelling) {
LOG_WARN(
"Node(%p) checkNodeExist failed, status:%s, node status:%s, do "
"nothing later...",
node, node->getConnectNodeStatusString().c_str(),
node_manager->getNodeStatusString(status).c_str());
// maybe mem leak here
destroyConnectNode(node);
return;
}
}
WorkThread *pThread = node->getEventThread();
if (pThread == NULL) {
LOG_ERROR("The WorkThread of Node(%p) is nullptr, skipping ...", node);
return;
}
node->_dnsRequestCallbackStatus = 1;
node->_inEventCallbackNode = true;
if (errorCode) {
LOG_ERROR("WorkThread(%p) Node(%p) %s dns failed: %s.", pThread, node,
node->getUrlAddress()._host, evutil_gai_strerror(errorCode));
node->setConnectNodeStatus(NodeConnecting);
if (node->dnsProcess(node->getEventThread()->_addrInFamily,
node->getEventThread()->_directIp,
node->getEventThread()->_enableSysGetAddr) < 0) {
destroyConnectNode(node);
}
// check node again!!!
ret = node_manager->checkNodeExist(node, &status);
if (ret != Success) {
LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
return;
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
node->_dnsRequestCallbackStatus = 2;
return;
}
if (address->ai_canonname) {
LOG_DEBUG("WorkThread(%p) Node(%p) ai_canonname: %s", pThread, node,
address->ai_canonname);
}
struct evutil_addrinfo *ai;
for (ai = address; ai; ai = ai->ai_next) {
char buffer[HostSize] = {0};
const char *ip = NULL;
if (ai->ai_family == AF_INET) {
struct sockaddr_in *sin = (struct sockaddr_in *)ai->ai_addr;
ip = evutil_inet_ntop(AF_INET, &sin->sin_addr, buffer, HostSize);
if (ip) {
LOG_DEBUG("WorkThread(%p) Node(%p) IpV4:%s.", pThread, node, ip);
#ifdef ENABLE_DNS_IP_CACHE
node->getEventThread()->setIpCache(
(char *)node->getRequest()->getRequestParam()->_url.c_str(),
(char *)ip);
#endif
int ret = node->connectProcess(ip, AF_INET);
if (ret == 0) {
ret = node->sslProcess();
if (ret == 0) {
LOG_DEBUG("WorkThread(%p) Node(%p) begin gateway request process.",
pThread, node);
if (nodeRequestProcess(node) < 0) {
destroyConnectNode(node);
}
// check node again!!!
ret = node_manager->checkNodeExist(node, &status);
if (ret != Success) {
LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
return;
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode,
node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
node->_dnsRequestCallbackStatus = 2;
return;
}
}
if (ret == 1) {
LOG_DEBUG(
"WorkThread(%p) Node(%p) connectProcess or sslProcess return 1, "
"will try "
"connect ...",
pThread, node);
// connect EINPROGRESS
break;
} else {
LOG_DEBUG("WorkThread(%p) Node(%p) goto ConnectRetry, ret:%d.",
pThread, node, ret);
goto ConnectRetry;
}
}
} else if (ai->ai_family == AF_INET6) {
struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)ai->ai_addr;
ip = evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buffer, HostSize);
if (ip) {
LOG_DEBUG("WorkThread(%p) Node(%p) IpV6:%s.", pThread, node, ip);
#ifdef ENABLE_DNS_IP_CACHE
node->getEventThread()->setIpCache(
(char *)node->getRequest()->getRequestParam()->_url.c_str(),
(char *)ip);
#endif
int ret = node->connectProcess(ip, AF_INET6);
if (ret == 0) {
LOG_DEBUG("WorkThread(%p) Node(%p) begin ssl process.", pThread,
node);
ret = node->sslProcess();
if (ret == 0) {
LOG_DEBUG("WorkThread(%p) Node(%p) begin gateway request process.",
pThread, node);
if (nodeRequestProcess(node) < 0) {
destroyConnectNode(node);
}
// check node again!!!
ret = node_manager->checkNodeExist(node, &status);
if (ret != Success) {
LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
return;
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode,
node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
node->_dnsRequestCallbackStatus = 2;
return;
}
}
if (ret == 1) {
LOG_DEBUG(
"WorkThread(%p) Node(%p) connectProcess return 1, will try "
"connect ...",
pThread, node);
break;
} else {
LOG_DEBUG("WorkThread(%p) Node(%p) goto ConnectRetry.", pThread,
node);
goto ConnectRetry;
}
}
}
} // for
evutil_freeaddrinfo(address);
// check node again!!!
ret = node_manager->checkNodeExist(node, &status);
if (ret != Success) {
LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
return;
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
node->_dnsRequestCallbackStatus = 2;
return;
ConnectRetry:
evutil_freeaddrinfo(address);
node->disconnectProcess();
node->setConnectNodeStatus(NodeConnecting);
if (node->dnsProcess(node->getEventThread()->_addrInFamily,
node->getEventThread()->_directIp,
node->getEventThread()->_enableSysGetAddr) < 0) {
destroyConnectNode(node);
}
// check node again!!!
ret = node_manager->checkNodeExist(node, &status);
if (ret != Success) {
LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
return;
}
#ifdef _MSC_VER
SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
node->_inEventCallbackNode);
#endif
node->_dnsRequestCallbackStatus = 2;
return;
}
/**
* @brief: 开始gateway的请求处理
* @return: 成功则Success, 失败则返回负值.
*/
int WorkThread::nodeRequestProcess(ConnectNode *node) {
int ret = Success;
if (node == NULL) {
LOG_ERROR("node is nullptr.");
return -(NodeEmpty);
}
NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
int status = NodeStatusInvalid;
int result = node_manager->checkNodeExist(node, &status);
if (result != Success) {
LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
return result;
}
ConnectStatus workStatus = node->getConnectNodeStatus();
// LOG_DEBUG("Request(%p) Node(%p) workStatus %d(%s).", node->getRequest(),
// node,
// workStatus, node->getConnectNodeStatusString().c_str());
switch (workStatus) {
/*connect to gateWay*/
case NodeHandshaking:
node->gatewayRequest();
ret = node->nlsSendFrame(node->getCmdEvBuffer());
node->setConnectNodeStatus(NodeHandshaked);
break;
case NodeHandshaked:
case NodeStarting:
ret = node->nlsSendFrame(node->getCmdEvBuffer());
break;
case NodeWakeWording:
ret = node->nlsSendFrame(node->getWwvEvBuffer());
if (ret == 0) {
if (node->getWakeStatus()) {
node->addCmdDataBuffer(CmdWarkWord);
ret = node->nlsSendFrame(node->getCmdEvBuffer());
}
}
break;
case NodeStarted:
ret = node->nlsSendFrame(node->getBinaryEvBuffer(), true);
/* 音频数据发送完毕,检测是否需要发送控制指令数据 */
if (ret == 0) {
ret = node->sendControlDirective();
}
break;
default:
ret = -(InvalidWorkStatus);
break;
}
if (ret < 0) {
LOG_ERROR("Node(%p) Send failed, ret:%d.", node, ret);
std::string failedInfo = node->getErrorMsg();
if (failedInfo.empty()) {
char tmp_msg[512] = {0};
snprintf(tmp_msg, 512 - 1,
"workThread workStatus(%s) Send failed. error_code(%d)",
node->getConnectNodeStatusString().c_str(), ret);
failedInfo.assign(tmp_msg);
}
node->handlerTaskFailedEvent(failedInfo);
node->closeConnectNode();
return ret;
}
// LOG_DEBUG("Node(%p) nodeResquestProcess done.", node);
return Success;
}
/**
* @brief: 接收gateway的响应
* @return: 成功则Success, 失败则返回负值.
*/
int WorkThread::nodeResponseProcess(ConnectNode *node) {
int ret = Success;
// LOG_DEBUG("Node(%p) nodeResponseProcess begin ...", node);
if (node == NULL) {
LOG_ERROR("node is nullptr.");
return -(NodeEmpty);
}
NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
int status = NodeStatusInvalid;
int result = node_manager->checkNodeExist(node, &status);
if (result != Success) {
LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
return result;
}
if (node->_releasingFlag) {
LOG_ERROR("Node(%p) is releasing!!! skipping ...", node);
return -(InvalidStatusWhenReleasing);
}
ConnectStatus workStatus = node->getConnectNodeStatus();
// LOG_DEBUG("Node(%p) current node status:%s.",
// node, node->getConnectNodeStatusString().c_str());
switch (workStatus) {
/*connect to gateway*/
case NodeHandshaking:
case NodeHandshaked:
ret = node->gatewayResponse();
if (ret == 0) {
/* ret == 0 mean parsing response successfully */
node->setConnectNodeStatus(NodeStarting);
if (node->isPreNodeStartStepByStep()) {
LOG_INFO(
"Request(%p) Node(%p) pre-node starts step by step, now break "
"here.",
node->getRequest(), node);
break;
}
if (node->getRequest()->getRequestParam()->_requestType ==
SpeechTextDialog) {
node->addCmdDataBuffer(CmdTextDialog);
} else {
node->addCmdDataBuffer(CmdStart);
}
ret = node->nlsSendFrame(node->getCmdEvBuffer());
if (ret >= 0) {
node->sendFakeSynthesisStarted();
}
} else if (ret == -(NlsReceiveEmpty)) {
LOG_WARN("Node(%p) nlsReceive empty, try again...", node);
return Success;
}
break;
/*send start command*/
case NodeStarting:
case NodeWakeWording:
ret = node->webSocketResponse();
workStatus = node->getConnectNodeStatus();
if (workStatus == NodeStarted) {
ret = node->nlsSendFrame(node->getBinaryEvBuffer(), true);
if (ret == 0) {
ret = node->sendControlDirective();
}
} else if (workStatus == NodeWakeWording) {
ret = node->nlsSendFrame(node->getWwvEvBuffer());
}
break;
case NodeStarted:
ret = node->webSocketResponse();
break;
case NodeConnecting:
if (node->isLongConnection()) {
/*
* 在长链接模式下, 可能存在进入NodeConnecting而非NodeStarted状态的情况
* 以NodeStarted来处理......
*/
LOG_WARN("Node(%p) NodeConnecting is abnormal.", node);
ret = node->webSocketResponse();
} else {
ret = -(InvalidWorkStatus);
}
break;
case NodeInvalid:
// request has released
ret = -(InvalidRequest);
break;
default:
LOG_WARN("Node(%p) current workStatus is %d.", node, NodeInvalid);
ret = -(InvalidWorkStatus);
break;
}
if (ret < 0) {
if (ret == -(EventClientEmpty)) {
LOG_ERROR("Instance has released, skip all operation.");
return ret;
}
if (ret == -(InvalidStatusWhenReleasing)) {
LOG_ERROR("Node(%p) is releasing, skip all operation.", node);
return ret;
}
if (NodeClosed == node->getConnectNodeStatus()) {
LOG_WARN(
"Node(%p) current node status is NodeClosed, please ignore this "
"warn.",
node);
return Success;
}
LOG_ERROR("Node(%p) response failed, ret:%d.", node, ret);
std::string failedInfo = node->getErrorMsg();
if (failedInfo.empty()) {
char tmp_msg[512] = {0};
snprintf(tmp_msg, 512 - 1,
"workThread workStatus(%s) Response failed. error_code(%d)",
node->getConnectNodeStatusString().c_str(), ret);
failedInfo.assign(tmp_msg);
}
if (ret == -(InvalidRequest)) {
LOG_ERROR(
"Node(%p) Response failed, errormsg:%s. But request has released, "
"ignore TaskFailed and Closed event.",
node, failedInfo.c_str());
} else {
node->closeConnectNode();
node->handlerTaskFailedEvent(failedInfo);
}
}
// LOG_DEBUG("Node(%p) nodeResponseProcess done.", node);
return ret;
}
void WorkThread::setAddrInFamily(int aiFamily) { _addrInFamily = aiFamily; }
void WorkThread::setDirectHost(char *directIp) {
memset(_directIp, 0, 64);
if (directIp && strnlen(directIp, 64) > 0) {
strncpy(_directIp, directIp, 64);
}
}
void WorkThread::setUseSysGetAddrInfo(bool enable) {
_enableSysGetAddr = enable;
}
void WorkThread::setInstance(NlsClientImpl *instance) { _instance = instance; }
#ifdef ENABLE_DNS_IP_CACHE
std::string WorkThread::getIpFromCache(char *host, bool force) {
MUTEX_LOCK(_mtxList);
std::string ip_str = "";
if (host != NULL) {
if (WebSocketTcp::urlWithAccess(host)) {
LOG_DEBUG("Using special host, without IpCache.");
MUTEX_UNLOCK(_mtxList);
return ip_str;
}
std::string host_str(host);
std::map<std::string, struct DnsIpCache>::iterator iter;
iter = _dnsIpCache.find(host_str);
if (iter != _dnsIpCache.end()) {
// find all IP info of this host
struct DnsIpCache ips = iter->second;
uint32_t count = ips.ip_list.size();
if (force && count > 0) {
int index = rand() % count;
ip_str = ips.ip_list[index];
LOG_INFO("Get Ip %s from host(%s) %d/%d.", ip_str.c_str(), host, index,
count);
} else {
if (ips.same_ip_count < DnsIpCache::WorkThreshold) {
LOG_DEBUG("Host(%s) try to get more IPs. (%d/%d)", host,
ips.same_ip_count, DnsIpCache::WorkThreshold);
} else {
if (count > 0) {
int index = rand() % count;
ip_str = ips.ip_list[index];
LOG_INFO("Get Ip %s from host(%s) %d/%d.", ip_str.c_str(), host,
index, count);
} else {
LOG_ERROR("Host(%s) is empty.", host);
}
}
}
}
}
MUTEX_UNLOCK(_mtxList);
return ip_str;
}
void WorkThread::setIpCache(char *host, char *ip) {
MUTEX_LOCK(_mtxList);
if (host == NULL || ip == NULL) {
LOG_INFO("Clear _dnsIpCache");
for (std::map<std::string, struct DnsIpCache>::iterator iter =
_dnsIpCache.begin();
iter != _dnsIpCache.end(); ++iter) {
iter->second.ip_list.clear();
}
_dnsIpCache.clear();
} else {
std::string host_str(host);
std::string ip_str(ip);
std::map<std::string, struct DnsIpCache>::iterator iter =
_dnsIpCache.find(host_str);
if (iter != _dnsIpCache.end()) {
// find all IP info of this host
struct DnsIpCache ips = iter->second;
uint32_t count = ips.ip_list.size();
if (count > 0) {
std::vector<std::string>::iterator ip_iter;
ip_iter = find(ips.ip_list.begin(), ips.ip_list.end(), ip_str);
if (ip_iter == ips.ip_list.end()) {
iter->second.ip_list.push_back(ip_str);
iter->second.same_ip_count = 0;
LOG_INFO("Push new ip(%s) into host(%s) cache", ip_str.c_str(),
host_str.c_str());
} else {
iter->second.same_ip_count++;
#ifdef ENABLE_NLS_DEBUG_2
LOG_DEBUG("Old ip(%s) and host(%s), same_count is %d and size is %d.",
ip_str.c_str(), host_str.c_str(),
iter->second.same_ip_count, _dnsIpCache.size());
#endif
}
} else {
LOG_ERROR("Host(%s) is empty.", host);
}
} else {
// cannot find address
struct DnsIpCache ip_cache;
ip_cache.ip_list.push_back(ip_str);
_dnsIpCache[host_str] = ip_cache;
LOG_INFO("New IP cache by host(%s) and ip(%s), size:%d.",
host_str.c_str(), ip_str.c_str(), _dnsIpCache.size());
}
}
MUTEX_UNLOCK(_mtxList);
}
#endif
void WorkThread::updateParameters(ConnectNode *node) {
if (node) {
if (_dnsBase) {
time_t timeout_ms = node->getRequest()->getRequestParam()->getTimeout();
float timeout_sec = (float)timeout_ms / 1000;
std::string timeout_sec_str = utility::TextUtils::to_string(timeout_sec);
LOG_DEBUG("WorkThread(%p) evdns_base setting timeout %s seconds.", this,
timeout_sec_str.c_str());
evdns_base_set_option(_dnsBase, "timeout", timeout_sec_str.c_str());
}
}
}
} // namespace AlibabaNls