unsigned __stdcall WorkThread::loopEventCallback()

in nlsCppSdk/event/workThread.cpp [321:1657]


unsigned __stdcall WorkThread::loopEventCallback(LPVOID arg) {
#else
void *WorkThread::loopEventCallback(void *arg) {
#endif

  WorkThread *eventParam = static_cast<WorkThread *>(arg);

#if defined(__ANDROID__) || defined(__linux__)
  sigset_t signal_mask;

  if (-1 == sigemptyset(&signal_mask)) {
    LOG_ERROR("sigemptyset failed.");
    exit(1);
  }

  if (-1 == sigaddset(&signal_mask, SIGPIPE)) {
    LOG_ERROR("sigaddset failed.");
    exit(1);
  }

  if (pthread_sigmask(SIG_BLOCK, &signal_mask, NULL) != 0) {
    LOG_ERROR("pthread_sigmask failed.");
    exit(1);
  }

  prctl(PR_SET_NAME, "eventThread");
#endif

  LOG_DEBUG("workThread(%p) create loopEventCallback.", arg);

  if (eventParam->_workBase) {
    LOG_DEBUG("workThread(%p) event_base_dispatch ...", arg);
    event_base_dispatch(eventParam->_workBase);
  }
  if (eventParam->_dnsBase) {
    evdns_base_free(eventParam->_dnsBase, 0);
    eventParam->_dnsBase = NULL;
  }
  if (eventParam->_workBase) {
    event_base_free(eventParam->_workBase);
    eventParam->_workBase = NULL;
  }

  eventParam->_workThreadId = 0;

  LOG_DEBUG("workThread(%p) loopEventCallback exit.", arg);

#if defined(_MSC_VER)
  return Success;
#else
  return NULL;
#endif
}

#ifdef ENABLE_HIGH_EFFICIENCY
/**
 * @brief: 定时进行connect()后检查链接状态并开启ssl握手.
 * @return:
 */
void WorkThread::connectTimerEventCallback(evutil_socket_t socketFd,
                                           short event, void *arg) {
  int errorCode = 0;
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  node->_inEventCallbackNode = true;

  LOG_DEBUG("Node(%p) connectTimerEventCallback node status:%s ...", node,
            node->getConnectNodeStatusString().c_str());

  if (event == EV_CLOSED) {
    LOG_DEBUG("Node(%p) connect get EV_CLOSED.", node);
    goto ConnectTimerProcessFailed;
  } else {
    // event == EV_TIMEOUT
    if (node->getConnectNodeStatus() == NodeConnecting) {
      socklen_t len = sizeof(errorCode);
      getsockopt(socketFd, SOL_SOCKET, SO_ERROR, (char *)&errorCode, &len);
      if (!errorCode) {
        LOG_DEBUG(
            "Node(%p) connect return ev_write, check ok, set "
            "NodeStatus:NodeConnected.",
            node);
        node->setConnectNodeStatus(NodeConnected);

#ifndef _MSC_VER
        // get client ip and port from socketFd
        struct sockaddr_in client;
        char client_ip[20];
        socklen_t client_len = sizeof(client);
        getsockname(socketFd, (struct sockaddr *)&client, &client_len);
        inet_ntop(AF_INET, &client.sin_addr, client_ip, sizeof(client_ip));
        LOG_DEBUG("Node(%p) local %s:%d", node, client_ip,
                  ntohs(client.sin_port));
#endif

        node->setConnected(true);
      } else {
        /* 再次尝试connect(), 并启动下一次connectEventCallback */
        if (node->socketConnect() < 0) {
          /* socket connect 失败 */
          goto ConnectTimerProcessFailed;
        }
      }
    }

    /* connect成功, 开始握手 */
    if (node->getConnectNodeStatus() == NodeConnected) {
      int ret = node->sslProcess();
      switch (ret) {
        case 0:
          LOG_DEBUG("Node(%p) begin gateway request process.", node);
          /* 进入gatewayRequest()的ssl握手阶段 */
          if (nodeRequestProcess(node) < 0) {
            destroyConnectNode(node);
          }
          break;
        case 1:
          /* sslProcess()中已经启动了下一次_connectEvent */
          // LOG_DEBUG("wait connecting ...");
          break;
        default:
          goto HandshakeTimerProcessFailed;
      }
    }
  }

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif

  LOG_DEBUG("Node(%p) connectTimerEventCallback done.", node);
  return;

HandshakeTimerProcessFailed:
ConnectTimerProcessFailed:
  /*
   * connect失败, 或者connect成功但是handshake失败.
   * 进行断链并重回connecting阶段, 然后再开始dns解析.
   */
  LOG_ERROR("Node(%p) connect or handshake failed, socket error mesg:%s.", node,
            evutil_socket_error_to_string(
                evutil_socket_geterror(node->getSocketFd())));

  node->disconnectProcess();
  node->setConnectNodeStatus(NodeConnecting);

  if (node->dnsProcess(node->getEventThread()->_addrInFamily,
                       node->getEventThread()->_directIp,
                       node->getEventThread()->_enableSysGetAddr) < 0) {
    LOG_ERROR("Node(%p) try delete request.", node);
    destroyConnectNode(node);
  }

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif

  LOG_DEBUG("Node(%p) connectTimerEventCallback done with failure.", node);
  return;
}
#endif

/**
 * @brief: connect()后检查链接状态并开启ssl握手.
 * @return:
 */
void WorkThread::connectEventCallback(evutil_socket_t socketFd, short event,
                                      void *arg) {
  int errorCode = 0;
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  node->_inEventCallbackNode = true;

  // LOG_DEBUG("Node(%p) connectEventCallback node status:%s ...",
  //     node, node_manager->getNodeStatusString(status).c_str());

  if (event == EV_TIMEOUT) {
    LOG_ERROR("Node(%p) connect get EV_TIMEOUT.", node);
    goto ConnectProcessFailed;
  } else if (event == EV_CLOSED) {
    LOG_DEBUG("Node(%p) connect get EV_CLOSED.", node);
    goto ConnectProcessFailed;
  } else {
    LOG_DEBUG("Node(%p) current connect node status:%s, EV:%02x.", node,
              node->getConnectNodeStatusString().c_str(), event);
    if (node->getConnectNodeStatus() == NodeConnecting) {
      socklen_t len = sizeof(errorCode);
      getsockopt(socketFd, SOL_SOCKET, SO_ERROR, (char *)&errorCode, &len);
      if (!errorCode) {
        LOG_DEBUG(
            "Node(%p) connect return ev_write, check ok, set "
            "NodeStatus:NodeConnected.",
            node);
        node->setConnectNodeStatus(NodeConnected);
        node->setConnected(true);

#ifndef _MSC_VER
        // get client ip and port from socketFd
        struct sockaddr_in client;
        char client_ip[20];
        socklen_t client_len = sizeof(client);
        getsockname(socketFd, (struct sockaddr *)&client, &client_len);
        inet_ntop(AF_INET, &client.sin_addr, client_ip, sizeof(client_ip));
        LOG_DEBUG("Node(%p) local %s:%d", node, client_ip,
                  ntohs(client.sin_port));
#endif
      } else {
        /* 再次尝试connect(), 并启动下一次connectEventCallback */
        if (node->socketConnect() < 0) {
          /* socket connect 失败 */
          goto ConnectProcessFailed;
        }
      }
    }

    /* connect成功, 开始握手 */
    if (node->getConnectNodeStatus() == NodeConnected) {
      int ret = node->sslProcess();
      switch (ret) {
        case 0:
          LOG_DEBUG("Node(%p) begin gateway request process.", node);
          /* 进入gatewayRequest()的ssl握手阶段 */
          if (nodeRequestProcess(node) < 0) {
            destroyConnectNode(node);
          }
          break;
        case 1:
          /* sslProcess()中已经启动了下一次_connectEvent */
          // LOG_DEBUG("wait connecting.");
          break;
        default:
          goto HandshakeProcessFailed;
      }
    }
  }

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif

  LOG_DEBUG("Node(%p) connectEventCallback done.", node);
  return;

HandshakeProcessFailed:
ConnectProcessFailed:
  /*
   * connect失败, 或者connect成功但是handshake失败.
   * 进行断链并重回connecting阶段, 然后再开始dns解析.
   */
  LOG_ERROR("Node(%p) connect or handshake failed, socket error mesg:%s.", node,
            evutil_socket_error_to_string(
                evutil_socket_geterror(node->getSocketFd())));

#ifdef ENABLE_DNS_IP_CACHE
  node->getEventThread()->setIpCache(NULL, NULL);
#endif
  node->disconnectProcess();
  node->setConnectNodeStatus(NodeConnecting);

  if (node->dnsProcess(node->getEventThread()->_addrInFamily,
                       node->getEventThread()->_directIp,
                       node->getEventThread()->_enableSysGetAddr) < 0) {
    LOG_ERROR("Node(%p) try delete request.", node);
    destroyConnectNode(node);
  }

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif

  LOG_DEBUG("Node(%p) connectEventCallback done with failure.", node);
  return;
}

void WorkThread::readEventCallBack(evutil_socket_t socketFd, short what,
                                   void *arg) {
  char tmp_msg[512] = {0};
  int ret = Success;
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  if (node == NULL) {
    LOG_ERROR("Node is nullptr!!!");
    return;
  }
  NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
  int status = NodeStatusInvalid;
  int result = node_manager->checkNodeExist(node, &status);
  if (result != Success) {
    LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
    return;
  }

  node->_inEventCallbackNode = true;

#ifdef ENABLE_NLS_DEBUG_2
  struct timeval start, end;
  gettimeofday(&start, NULL);

  LOG_DEBUG(
      "Request(%p) Node(%p) readEventCallBack begin, current socketFd:%d "
      "event:%d, node "
      "status:%s, exit status:%s.",
      node->getRequest(), node, socketFd, what,
      node->getConnectNodeStatusString().c_str(),
      node->getExitStatusString().c_str());
#endif

  if (node->getExitStatus() == ExitCancel) {
    LOG_WARN("Node(%p) skip this operation ...", node);
    node->closeConnectNode();
#ifdef _MSC_VER
    SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
    SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                     node->_inEventCallbackNode);
#endif
    return;
  }

  if (what == EV_READ) {
    ret = nodeResponseProcess(node);
    if (ret == -(InvalidRequest)) {
      LOG_ERROR("Node(%p) has invalid request, skip all operation.", node);
      node->closeConnectNode();
#ifdef _MSC_VER
      SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
      SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                       node->_inEventCallbackNode);
#endif
      return;
    } else if (ret == -(EventClientEmpty)) {
      LOG_ERROR("Instance has released, skip all operation.");
      return;
    } else if (ret == -(InvalidStatusWhenReleasing)) {
      LOG_ERROR("Node(%p) is releasing, skip all operation.", node);
      return;
    }
  } else if (what == EV_TIMEOUT) {
    snprintf(tmp_msg, 512 - 1, "Recv timeout. socket error:%s.",
             evutil_socket_error_to_string(
                 evutil_socket_geterror(node->getSocketFd())));

    LOG_ERROR("Node(%p) error msg:%s.", node, tmp_msg);

    node->closeConnectNode();
    node->handlerTaskFailedEvent(tmp_msg, EvRecvTimeout);
  } else {
    snprintf(tmp_msg, 512 - 1, "Unknown event:%02x. %s", what,
             evutil_socket_error_to_string(
                 evutil_socket_geterror(node->getSocketFd())));

    LOG_ERROR("Node(%p) error msg:%s.", node, tmp_msg);

    node->closeConnectNode();
    node->handlerTaskFailedEvent(tmp_msg, EvUnknownEvent);
  }

#ifdef ENABLE_NLS_DEBUG_2
  gettimeofday(&end, NULL);
  uint64_t time_consuming = end.tv_sec * 1000 + end.tv_usec / 1000 -
                            start.tv_sec * 1000 - start.tv_usec / 1000;
  if (time_consuming >= 100) {
    LOG_WARN(
        "Request(%p) Node(%p) readEventCallBack done with excessive time "
        "%llums.",
        node->getRequest(), node, time_consuming);
  }
#endif

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif

#ifdef ENABLE_NLS_DEBUG_2
  LOG_DEBUG(
      "Request(%p) Node(%p) readEventCallBack done with "
      "node->_inEventCallbackNode:%s.",
      node->getRequest(), node, node->_inEventCallbackNode ? "true" : "false");
#endif
  return;
}

void WorkThread::writeEventCallBack(evutil_socket_t socketFd, short what,
                                    void *arg) {
  char tmp_msg[512] = {0};
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  if (node == NULL) {
    LOG_ERROR("Node is nullptr!!!");
    return;
  }
  NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
  int status = NodeStatusInvalid;
  int result = node_manager->checkNodeExist(node, &status);
  if (result != Success) {
    LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
    return;
  }

  node->_inEventCallbackNode = true;

  // LOG_DEBUG(
  //     "Request(%p) Node(%p) writeEventCallBack current event:%d, node "
  //     "status:%s, exit status:%s.",
  //     node->getRequest(), node, what,
  //     node->getConnectNodeStatusString().c_str(),
  //     node->getExitStatusString().c_str());

  if (node->getExitStatus() == ExitCancel) {
    LOG_WARN("Node(%p) skip this operation ...", node);
    node->closeConnectNode();
#ifdef _MSC_VER
    SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
    SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                     node->_inEventCallbackNode);
#endif
    return;
  }

  if (what == EV_WRITE) {
    nodeRequestProcess(node);
  } else if (what == EV_TIMEOUT) {
    snprintf(tmp_msg, 512 - 1, "Send timeout. socket error:%s",
             evutil_socket_error_to_string(
                 evutil_socket_geterror(node->getSocketFd())));

    LOG_ERROR("Node(%p) %s", node, tmp_msg);

    node->closeConnectNode();
    node->handlerTaskFailedEvent(tmp_msg, EvSendTimeout);
  } else {
    snprintf(tmp_msg, 512 - 1, "Unknown event:%02x. %s", what,
             evutil_socket_error_to_string(
                 evutil_socket_geterror(node->getSocketFd())));

    LOG_ERROR("Node(%p) %s.", node, tmp_msg);

    node->closeConnectNode();
    node->handlerTaskFailedEvent(tmp_msg, EvUnknownEvent);
  }

  if (node->getConnectNodeStatus() == NodeInvalid) {
    destroyConnectNode(node);
  }

  // LOG_DEBUG("Node(%p) writeEventCallBack done.", node);
#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif
  return;
}

/**
 * @brief: IP直连
 * @return:
 */
void WorkThread::directConnect(void *arg, char *ip) {
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  if (ip) {
    LOG_DEBUG("Node(%p) direct IpV4:%s.", node, ip);

    int ret = node->connectProcess(ip, AF_INET);
    if (ret == 0) {
      ret = node->sslProcess();
      if (ret == Success) {
        LOG_DEBUG("Node(%p) begin gateway request process.", node);
        if (nodeRequestProcess(node) < 0) {
          destroyConnectNode(node);
        }
        return;
      }
    }

    if (ret == 1) {
      LOG_DEBUG("Node(%p) connectProcess return 1, will try connect ...", node);
      // connect  EINPROGRESS
      return;
    } else {
      LOG_ERROR(
          "Node(%p) goto DirectConnectRetry with ret:%d and node status:%s "
          "exit status:%s.",
          node, ret, node->getConnectNodeStatusString().c_str(),
          node->getExitStatusString().c_str());
#ifdef ENABLE_DNS_IP_CACHE
      node->getEventThread()->setIpCache(NULL, NULL);
#endif
      goto DirectConnectRetry;
    }
  }

DirectConnectRetry:
  node->disconnectProcess();
  node->setConnectNodeStatus(NodeConnecting);
  if (node->dnsProcess(node->getEventThread()->_addrInFamily, ip,
                       node->getEventThread()->_enableSysGetAddr) < 0) {
    destroyConnectNode(node);
  }

  return;
}

#ifdef ENABLE_PRECONNECTED_POOL
bool WorkThread::syncDirectConnect(void *arg, char *ip) {
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  if (ip) {
    LOG_DEBUG("Node(%p) direct IpV4:%s.", node, ip);

    int ret = node->syncConnectProcess(ip, AF_INET);
    if (ret == 0) {
      ret = node->syncSslProcess();
      if (ret == Success) {
        LOG_DEBUG("Node(%p) begin gateway request process with prestart.",
                  node);

        // new _readEvent and _writeEvent
        node->prestartProcess();

        if (nodeRequestProcess(node) < 0) {
          destroyConnectNode(node);
          node->prestartEventDelProcess();
          return false;
        }

        bool result = true;
        if (node->isPreNodeStartStepByStep()) {
          int tryCount = 50;  // 500ms
          while (tryCount-- > 0 &&
                 node->getConnectNodeStatus() != NodeStarting) {
            usleep(10 * 1000);
          }

          LOG_INFO("Request(%p) Node(%p) now status:%s and try count:%d.",
                   node->getRequest(), node,
                   node->getConnectNodeStatusString().c_str(), tryCount);

          if (tryCount == 0) {
            result = false;
          }
        }

        // delete _readEvent & _writeEvent
        node->prestartEventDelProcess();

        return result;
      }
    } else if (ret == 1) {
      LOG_DEBUG("Node(%p) connectProcess return 1, will try connect ...", node);
      // connect  EINPROGRESS
      return false;
    } else {
      LOG_ERROR(
          "Node(%p) syncDirectConnect failed with ret:%d and node status:%s "
          "exit status:%s.",
          node, ret, node->getConnectNodeStatusString().c_str(),
          node->getExitStatusString().c_str());
#ifdef ENABLE_DNS_IP_CACHE
      node->getEventThread()->setIpCache(NULL, NULL);
#endif
      return false;
    }
  }

  return false;
}
#endif

/**
 * @brief: 启动语音交互请求
 * @return:
 */
void WorkThread::launchEventCallback(evutil_socket_t fd, short which,
                                     void *arg) {
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  if (NULL == node) {
    LOG_ERROR("Node is nullptr!!!");
    return;
  }
  NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
  int status = NodeStatusInvalid;
  int result = node_manager->checkNodeExist(node, &status);
  if (result != Success) {
    LOG_ERROR("The node(%p) checkNodeExist failed, result:%d.", node, result);
    return;
  }

  INlsRequest *request = node->getRequest();
  WorkThread *pThread = node->getEventThread();
  if (pThread == NULL) {
    LOG_ERROR("The WorkThread of Node(%p) is nullptr, skipping ...", node);
    return;
  }

  node->_inEventCallbackNode = true;

  LOG_DEBUG(
      "WorkThread(%p) Node(%p) Request(%p) trigger launchEventCallback with "
      "reconnection mechanism(%s) and isUsingPreconnection flag(%s) "
      "isPreconnecting flag(%s), current status "
      "is %s.",
      pThread, node, request,
      request->getRequestParam()->_enableReconnect ? "true" : "false",
      node->isUsingPreconnection() ? "true" : "false",
      node->isPreconnecting() ? "true" : "false",
      node->getConnectNodeStatusString().c_str());

  if (node->getExitStatus() == ExitCancel ||
      node->getExitStatus() == ExitStopping) {
    LOG_WARN(
        "WorkThread(%p) Node(%p) is canceling/stopping, current node "
        "status:%s, skip "
        "here.",
        pThread, node, node->getConnectNodeStatusString().c_str());
    node->setConnectNodeStatus(NodeInvoked);
#ifdef _MSC_VER
    SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
    SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                     node->_inEventCallbackNode);
#endif
    return;
  }

  insertListNode(pThread, request);

  node->setConnectNodeStatus(NodeInvoked);
  /* 将request设置的参数传入node */
  node->updateParameters();

  LOG_DEBUG("WorkThread(%p) Node(%p) begin dnsProcess.", pThread, node);
  if (node->dnsProcess(node->getEventThread()->_addrInFamily,
                       node->getEventThread()->_directIp,
                       node->getEventThread()->_enableSysGetAddr) < 0) {
    LOG_WARN(
        "WorkThread(%p) Node(%p) dnsProcess failed, ready to "
        "destroyConnectNode.",
        pThread, node);
    destroyConnectNode(node);
  }

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif
  return;
}

#ifdef ENABLE_PRECONNECTED_POOL
void WorkThread::startWithPoolEventCallback(evutil_socket_t fd, short which,
                                            void *arg) {
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  if (NULL == node) {
    LOG_ERROR("Node is nullptr!!!");
    return;
  }
  NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
  int status = NodeStatusInvalid;
  int result = node_manager->checkNodeExist(node, &status);
  if (result != Success) {
    LOG_ERROR("The node(%p) checkNodeExist failed, result:%d.", node, result);
    return;
  }

  INlsRequest *request = node->getRequest();
  WorkThread *pThread = node->getEventThread();
  if (pThread == NULL) {
    LOG_ERROR("The WorkThread of Node(%p) is nullptr, skipping ...", node);
    return;
  }

  node->_inEventCallbackNode = true;

  // LOG_INFO("Request(%p) Node(%p) %s start with pre-node ...", request, node,
  //          node->getConnectNodeStatusString().c_str());

  node->prestartProcess();

  node->setConnectNodeStatus(NodeStarting);
  node->addCmdDataBuffer(CmdStart);
  int ret = node->nlsSendFrame(node->getCmdEvBuffer());
  if (request->getRequestParam()->_mode == TypeTts && ret >= 0) {
    node->sendFakeSynthesisStarted();
  }

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif
  return;
}
#endif

void WorkThread::singleRoundTextEventCallback(evutil_socket_t fd, short which,
                                              void *arg) {
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  if (NULL == node) {
    LOG_ERROR("Node is nullptr!!!");
    return;
  }
  NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
  int status = NodeStatusInvalid;
  int result = node_manager->checkNodeExist(node, &status);
  if (result != Success) {
    LOG_ERROR("The node(%p) checkNodeExist failed, result:%d.", node, result);
    return;
  }

  node->_inEventCallbackNode = true;

  INlsRequest *request = node->getRequest();
  FlowingSynthesizerParam *param =
      (FlowingSynthesizerParam *)request->getRequestParam();
  node->cmdNotify(CmdSendText, param->getSingleRoundText().c_str());
  node->cmdNotify(CmdStop, NULL);
  param->clearSingleRoundText();

#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif
  return;
}

#ifdef __LINUX__
void WorkThread::sysDnsEventCallback(evutil_socket_t socketFd, short what,
                                     void *arg) {
  if (what == EV_READ) {
    /* check this node is alive */
    NlsClientImpl *client = _instance;
    NlsNodeManager *node_manager = client->getNodeManger();
    int status = NodeStatusInvalid;
    ConnectNode *node = static_cast<ConnectNode *>(arg);
    int ret = node_manager->checkNodeExist(node, &status);
    if (ret != Success) {
      LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
      return;
    }

    dnsEventCallback(node->_dnsErrorCode, node->_addrinfo, arg);
    node->_dnsErrorCode = 0;
  }
  return;
}
#endif

/**
 * @brief: 进行DNS获得IP后开始链接
 * @return:
 */
void WorkThread::dnsEventCallback(int errorCode,
                                  struct evutil_addrinfo *address, void *arg) {
  ConnectNode *node = static_cast<ConnectNode *>(arg);
  NlsClientImpl *client = _instance;
  NlsNodeManager *node_manager = client->getNodeManger();
  int status = NodeStatusInvalid;
  int ret = node_manager->checkNodeExist(node, &status);
  if (ret != Success) {
    LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
    return;
  } else {
    if (status >= NodeStatusCancelling) {
      LOG_WARN(
          "Node(%p) checkNodeExist failed, status:%s, node status:%s, do "
          "nothing later...",
          node, node->getConnectNodeStatusString().c_str(),
          node_manager->getNodeStatusString(status).c_str());
      // maybe mem leak here
      destroyConnectNode(node);
      return;
    }
  }

  WorkThread *pThread = node->getEventThread();
  if (pThread == NULL) {
    LOG_ERROR("The WorkThread of Node(%p) is nullptr, skipping ...", node);
    return;
  }
  node->_dnsRequestCallbackStatus = 1;
  node->_inEventCallbackNode = true;
  if (errorCode) {
    LOG_ERROR("WorkThread(%p) Node(%p) %s dns failed: %s.", pThread, node,
              node->getUrlAddress()._host, evutil_gai_strerror(errorCode));
    node->setConnectNodeStatus(NodeConnecting);
    if (node->dnsProcess(node->getEventThread()->_addrInFamily,
                         node->getEventThread()->_directIp,
                         node->getEventThread()->_enableSysGetAddr) < 0) {
      destroyConnectNode(node);
    }

    // check node again!!!
    ret = node_manager->checkNodeExist(node, &status);
    if (ret != Success) {
      LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
      return;
    }
#ifdef _MSC_VER
    SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
    SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                     node->_inEventCallbackNode);
#endif
    node->_dnsRequestCallbackStatus = 2;
    return;
  }

  if (address->ai_canonname) {
    LOG_DEBUG("WorkThread(%p) Node(%p) ai_canonname: %s", pThread, node,
              address->ai_canonname);
  }

  struct evutil_addrinfo *ai;
  for (ai = address; ai; ai = ai->ai_next) {
    char buffer[HostSize] = {0};
    const char *ip = NULL;
    if (ai->ai_family == AF_INET) {
      struct sockaddr_in *sin = (struct sockaddr_in *)ai->ai_addr;
      ip = evutil_inet_ntop(AF_INET, &sin->sin_addr, buffer, HostSize);

      if (ip) {
        LOG_DEBUG("WorkThread(%p) Node(%p) IpV4:%s.", pThread, node, ip);
#ifdef ENABLE_DNS_IP_CACHE
        node->getEventThread()->setIpCache(
            (char *)node->getRequest()->getRequestParam()->_url.c_str(),
            (char *)ip);
#endif

        int ret = node->connectProcess(ip, AF_INET);
        if (ret == 0) {
          ret = node->sslProcess();
          if (ret == 0) {
            LOG_DEBUG("WorkThread(%p) Node(%p) begin gateway request process.",
                      pThread, node);
            if (nodeRequestProcess(node) < 0) {
              destroyConnectNode(node);
            }

            // check node again!!!
            ret = node_manager->checkNodeExist(node, &status);
            if (ret != Success) {
              LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
              return;
            }
#ifdef _MSC_VER
            SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
            SEND_COND_SIGNAL(node->_mtxEventCallbackNode,
                             node->_cvEventCallbackNode,
                             node->_inEventCallbackNode);
#endif
            node->_dnsRequestCallbackStatus = 2;
            return;
          }
        }

        if (ret == 1) {
          LOG_DEBUG(
              "WorkThread(%p) Node(%p) connectProcess or sslProcess return 1, "
              "will try "
              "connect ...",
              pThread, node);
          // connect EINPROGRESS
          break;
        } else {
          LOG_DEBUG("WorkThread(%p) Node(%p) goto ConnectRetry, ret:%d.",
                    pThread, node, ret);
          goto ConnectRetry;
        }
      }

    } else if (ai->ai_family == AF_INET6) {
      struct sockaddr_in6 *sin6 = (struct sockaddr_in6 *)ai->ai_addr;
      ip = evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buffer, HostSize);

      if (ip) {
        LOG_DEBUG("WorkThread(%p) Node(%p) IpV6:%s.", pThread, node, ip);
#ifdef ENABLE_DNS_IP_CACHE
        node->getEventThread()->setIpCache(
            (char *)node->getRequest()->getRequestParam()->_url.c_str(),
            (char *)ip);
#endif

        int ret = node->connectProcess(ip, AF_INET6);
        if (ret == 0) {
          LOG_DEBUG("WorkThread(%p) Node(%p) begin ssl process.", pThread,
                    node);
          ret = node->sslProcess();
          if (ret == 0) {
            LOG_DEBUG("WorkThread(%p) Node(%p) begin gateway request process.",
                      pThread, node);
            if (nodeRequestProcess(node) < 0) {
              destroyConnectNode(node);
            }

            // check node again!!!
            ret = node_manager->checkNodeExist(node, &status);
            if (ret != Success) {
              LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
              return;
            }
#ifdef _MSC_VER
            SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
            SEND_COND_SIGNAL(node->_mtxEventCallbackNode,
                             node->_cvEventCallbackNode,
                             node->_inEventCallbackNode);
#endif
            node->_dnsRequestCallbackStatus = 2;
            return;
          }
        }

        if (ret == 1) {
          LOG_DEBUG(
              "WorkThread(%p) Node(%p) connectProcess return 1, will try "
              "connect ...",
              pThread, node);
          break;
        } else {
          LOG_DEBUG("WorkThread(%p) Node(%p) goto ConnectRetry.", pThread,
                    node);
          goto ConnectRetry;
        }
      }
    }
  }  // for

  evutil_freeaddrinfo(address);

  // check node again!!!
  ret = node_manager->checkNodeExist(node, &status);
  if (ret != Success) {
    LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
    return;
  }
#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif
  node->_dnsRequestCallbackStatus = 2;
  return;

ConnectRetry:
  evutil_freeaddrinfo(address);
  node->disconnectProcess();
  node->setConnectNodeStatus(NodeConnecting);
  if (node->dnsProcess(node->getEventThread()->_addrInFamily,
                       node->getEventThread()->_directIp,
                       node->getEventThread()->_enableSysGetAddr) < 0) {
    destroyConnectNode(node);
  }

  // check node again!!!
  ret = node_manager->checkNodeExist(node, &status);
  if (ret != Success) {
    LOG_ERROR("checkNodeExist failed, ret:%d.", ret);
    return;
  }
#ifdef _MSC_VER
  SET_EVENT(node->_inEventCallbackNode, node->_mtxEventCallbackNode);
#else
  SEND_COND_SIGNAL(node->_mtxEventCallbackNode, node->_cvEventCallbackNode,
                   node->_inEventCallbackNode);
#endif
  node->_dnsRequestCallbackStatus = 2;
  return;
}

/**
 * @brief: 开始gateway的请求处理
 * @return: 成功则Success, 失败则返回负值.
 */
int WorkThread::nodeRequestProcess(ConnectNode *node) {
  int ret = Success;
  if (node == NULL) {
    LOG_ERROR("node is nullptr.");
    return -(NodeEmpty);
  }
  NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
  int status = NodeStatusInvalid;
  int result = node_manager->checkNodeExist(node, &status);
  if (result != Success) {
    LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
    return result;
  }

  ConnectStatus workStatus = node->getConnectNodeStatus();
  // LOG_DEBUG("Request(%p) Node(%p) workStatus %d(%s).", node->getRequest(),
  // node,
  //           workStatus, node->getConnectNodeStatusString().c_str());
  switch (workStatus) {
    /*connect to gateWay*/
    case NodeHandshaking:
      node->gatewayRequest();
      ret = node->nlsSendFrame(node->getCmdEvBuffer());
      node->setConnectNodeStatus(NodeHandshaked);
      break;

    case NodeHandshaked:
    case NodeStarting:
      ret = node->nlsSendFrame(node->getCmdEvBuffer());
      break;

    case NodeWakeWording:
      ret = node->nlsSendFrame(node->getWwvEvBuffer());
      if (ret == 0) {
        if (node->getWakeStatus()) {
          node->addCmdDataBuffer(CmdWarkWord);
          ret = node->nlsSendFrame(node->getCmdEvBuffer());
        }
      }
      break;

    case NodeStarted:
      ret = node->nlsSendFrame(node->getBinaryEvBuffer(), true);
      /* 音频数据发送完毕,检测是否需要发送控制指令数据 */
      if (ret == 0) {
        ret = node->sendControlDirective();
      }
      break;

    default:
      ret = -(InvalidWorkStatus);
      break;
  }

  if (ret < 0) {
    LOG_ERROR("Node(%p) Send failed, ret:%d.", node, ret);

    std::string failedInfo = node->getErrorMsg();
    if (failedInfo.empty()) {
      char tmp_msg[512] = {0};
      snprintf(tmp_msg, 512 - 1,
               "workThread workStatus(%s) Send failed. error_code(%d)",
               node->getConnectNodeStatusString().c_str(), ret);
      failedInfo.assign(tmp_msg);
    }
    node->handlerTaskFailedEvent(failedInfo);
    node->closeConnectNode();
    return ret;
  }

  // LOG_DEBUG("Node(%p) nodeResquestProcess done.", node);
  return Success;
}

/**
 * @brief: 接收gateway的响应
 * @return: 成功则Success, 失败则返回负值.
 */
int WorkThread::nodeResponseProcess(ConnectNode *node) {
  int ret = Success;

  // LOG_DEBUG("Node(%p) nodeResponseProcess begin ...", node);

  if (node == NULL) {
    LOG_ERROR("node is nullptr.");
    return -(NodeEmpty);
  }
  NlsNodeManager *node_manager = node->getInstance()->getNodeManger();
  int status = NodeStatusInvalid;
  int result = node_manager->checkNodeExist(node, &status);
  if (result != Success) {
    LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", node, result);
    return result;
  }
  if (node->_releasingFlag) {
    LOG_ERROR("Node(%p) is releasing!!! skipping ...", node);
    return -(InvalidStatusWhenReleasing);
  }

  ConnectStatus workStatus = node->getConnectNodeStatus();
  // LOG_DEBUG("Node(%p) current node status:%s.",
  //     node, node->getConnectNodeStatusString().c_str());
  switch (workStatus) {
    /*connect to gateway*/
    case NodeHandshaking:
    case NodeHandshaked:
      ret = node->gatewayResponse();
      if (ret == 0) {
        /* ret == 0 mean parsing response successfully */
        node->setConnectNodeStatus(NodeStarting);
        if (node->isPreNodeStartStepByStep()) {
          LOG_INFO(
              "Request(%p) Node(%p) pre-node starts step by step, now break "
              "here.",
              node->getRequest(), node);
          break;
        }
        if (node->getRequest()->getRequestParam()->_requestType ==
            SpeechTextDialog) {
          node->addCmdDataBuffer(CmdTextDialog);
        } else {
          node->addCmdDataBuffer(CmdStart);
        }
        ret = node->nlsSendFrame(node->getCmdEvBuffer());
        if (ret >= 0) {
          node->sendFakeSynthesisStarted();
        }
      } else if (ret == -(NlsReceiveEmpty)) {
        LOG_WARN("Node(%p) nlsReceive empty, try again...", node);
        return Success;
      }
      break;

    /*send start command*/
    case NodeStarting:
    case NodeWakeWording:
      ret = node->webSocketResponse();
      workStatus = node->getConnectNodeStatus();
      if (workStatus == NodeStarted) {
        ret = node->nlsSendFrame(node->getBinaryEvBuffer(), true);
        if (ret == 0) {
          ret = node->sendControlDirective();
        }
      } else if (workStatus == NodeWakeWording) {
        ret = node->nlsSendFrame(node->getWwvEvBuffer());
      }
      break;

    case NodeStarted:
      ret = node->webSocketResponse();
      break;

    case NodeConnecting:
      if (node->isLongConnection()) {
        /*
         * 在长链接模式下, 可能存在进入NodeConnecting而非NodeStarted状态的情况
         * 以NodeStarted来处理......
         */
        LOG_WARN("Node(%p) NodeConnecting is abnormal.", node);
        ret = node->webSocketResponse();
      } else {
        ret = -(InvalidWorkStatus);
      }
      break;

    case NodeInvalid:
      // request has released
      ret = -(InvalidRequest);
      break;

    default:
      LOG_WARN("Node(%p) current workStatus is %d.", node, NodeInvalid);
      ret = -(InvalidWorkStatus);
      break;
  }

  if (ret < 0) {
    if (ret == -(EventClientEmpty)) {
      LOG_ERROR("Instance has released, skip all operation.");
      return ret;
    }
    if (ret == -(InvalidStatusWhenReleasing)) {
      LOG_ERROR("Node(%p) is releasing, skip all operation.", node);
      return ret;
    }

    if (NodeClosed == node->getConnectNodeStatus()) {
      LOG_WARN(
          "Node(%p) current node status is NodeClosed, please ignore this "
          "warn.",
          node);
      return Success;
    }
    LOG_ERROR("Node(%p) response failed, ret:%d.", node, ret);

    std::string failedInfo = node->getErrorMsg();
    if (failedInfo.empty()) {
      char tmp_msg[512] = {0};
      snprintf(tmp_msg, 512 - 1,
               "workThread workStatus(%s) Response failed. error_code(%d)",
               node->getConnectNodeStatusString().c_str(), ret);
      failedInfo.assign(tmp_msg);
    }
    if (ret == -(InvalidRequest)) {
      LOG_ERROR(
          "Node(%p) Response failed, errormsg:%s. But request has released, "
          "ignore TaskFailed and Closed event.",
          node, failedInfo.c_str());
    } else {
      node->closeConnectNode();
      node->handlerTaskFailedEvent(failedInfo);
    }
  }

  // LOG_DEBUG("Node(%p) nodeResponseProcess done.", node);

  return ret;
}

void WorkThread::setAddrInFamily(int aiFamily) { _addrInFamily = aiFamily; }

void WorkThread::setDirectHost(char *directIp) {
  memset(_directIp, 0, 64);
  if (directIp && strnlen(directIp, 64) > 0) {
    strncpy(_directIp, directIp, 64);
  }
}

void WorkThread::setUseSysGetAddrInfo(bool enable) {
  _enableSysGetAddr = enable;
}

void WorkThread::setInstance(NlsClientImpl *instance) { _instance = instance; }

#ifdef ENABLE_DNS_IP_CACHE
std::string WorkThread::getIpFromCache(char *host, bool force) {
  MUTEX_LOCK(_mtxList);
  std::string ip_str = "";
  if (host != NULL) {
    if (WebSocketTcp::urlWithAccess(host)) {
      LOG_DEBUG("Using special host, without IpCache.");
      MUTEX_UNLOCK(_mtxList);
      return ip_str;
    }

    std::string host_str(host);
    std::map<std::string, struct DnsIpCache>::iterator iter;
    iter = _dnsIpCache.find(host_str);
    if (iter != _dnsIpCache.end()) {
      // find all IP info of this host
      struct DnsIpCache ips = iter->second;
      uint32_t count = ips.ip_list.size();
      if (force && count > 0) {
        int index = rand() % count;
        ip_str = ips.ip_list[index];
        LOG_INFO("Get Ip %s from host(%s) %d/%d.", ip_str.c_str(), host, index,
                 count);
      } else {
        if (ips.same_ip_count < DnsIpCache::WorkThreshold) {
          LOG_DEBUG("Host(%s) try to get more IPs. (%d/%d)", host,
                    ips.same_ip_count, DnsIpCache::WorkThreshold);
        } else {
          if (count > 0) {
            int index = rand() % count;
            ip_str = ips.ip_list[index];
            LOG_INFO("Get Ip %s from host(%s) %d/%d.", ip_str.c_str(), host,
                     index, count);
          } else {
            LOG_ERROR("Host(%s) is empty.", host);
          }
        }
      }
    }
  }
  MUTEX_UNLOCK(_mtxList);
  return ip_str;
}

void WorkThread::setIpCache(char *host, char *ip) {
  MUTEX_LOCK(_mtxList);
  if (host == NULL || ip == NULL) {
    LOG_INFO("Clear _dnsIpCache");
    for (std::map<std::string, struct DnsIpCache>::iterator iter =
             _dnsIpCache.begin();
         iter != _dnsIpCache.end(); ++iter) {
      iter->second.ip_list.clear();
    }
    _dnsIpCache.clear();
  } else {
    std::string host_str(host);
    std::string ip_str(ip);
    std::map<std::string, struct DnsIpCache>::iterator iter =
        _dnsIpCache.find(host_str);
    if (iter != _dnsIpCache.end()) {
      // find all IP info of this host
      struct DnsIpCache ips = iter->second;
      uint32_t count = ips.ip_list.size();
      if (count > 0) {
        std::vector<std::string>::iterator ip_iter;
        ip_iter = find(ips.ip_list.begin(), ips.ip_list.end(), ip_str);
        if (ip_iter == ips.ip_list.end()) {
          iter->second.ip_list.push_back(ip_str);
          iter->second.same_ip_count = 0;
          LOG_INFO("Push new ip(%s) into host(%s) cache", ip_str.c_str(),
                   host_str.c_str());
        } else {
          iter->second.same_ip_count++;
#ifdef ENABLE_NLS_DEBUG_2
          LOG_DEBUG("Old ip(%s) and host(%s), same_count is %d and size is %d.",
                    ip_str.c_str(), host_str.c_str(),
                    iter->second.same_ip_count, _dnsIpCache.size());
#endif
        }
      } else {
        LOG_ERROR("Host(%s) is empty.", host);
      }
    } else {
      // cannot find address
      struct DnsIpCache ip_cache;
      ip_cache.ip_list.push_back(ip_str);
      _dnsIpCache[host_str] = ip_cache;
      LOG_INFO("New IP cache by host(%s) and ip(%s), size:%d.",
               host_str.c_str(), ip_str.c_str(), _dnsIpCache.size());
    }
  }
  MUTEX_UNLOCK(_mtxList);
}
#endif

void WorkThread::updateParameters(ConnectNode *node) {
  if (node) {
    if (_dnsBase) {
      time_t timeout_ms = node->getRequest()->getRequestParam()->getTimeout();
      float timeout_sec = (float)timeout_ms / 1000;
      std::string timeout_sec_str = utility::TextUtils::to_string(timeout_sec);
      LOG_DEBUG("WorkThread(%p) evdns_base setting timeout %s seconds.", this,
                timeout_sec_str.c_str());
      evdns_base_set_option(_dnsBase, "timeout", timeout_sec_str.c_str());
    }
  }
}

}  // namespace AlibabaNls