unsigned __stdcall ConnectedPool::loopConnectedPoolEventCallback()

in nlsCppSdk/transport/connectedPool.cpp [143:2699]


unsigned __stdcall ConnectedPool::loopConnectedPoolEventCallback(LPVOID arg) {
#else
void *ConnectedPool::loopConnectedPoolEventCallback(void *arg) {
#endif

  ConnectedPool *eventParam = static_cast<ConnectedPool *>(arg);

#if defined(__ANDROID__) || defined(__linux__)
  sigset_t signal_mask;

  if (-1 == sigemptyset(&signal_mask)) {
    LOG_ERROR("sigemptyset failed.");
    exit(1);
  }

  if (-1 == sigaddset(&signal_mask, SIGPIPE)) {
    LOG_ERROR("sigaddset failed.");
    exit(1);
  }

  if (pthread_sigmask(SIG_BLOCK, &signal_mask, NULL) != 0) {
    LOG_ERROR("pthread_sigmask failed.");
    exit(1);
  }

  prctl(PR_SET_NAME, "connectedPoolEventThread");
#endif

  LOG_DEBUG("ConnectedPool(%p) create loopConnectedPoolEventCallback.", arg);

  if (eventParam->_poolWorkBase) {
    LOG_DEBUG("ConnectedPool(%p) event_base_dispatch ...", arg);
    event_base_dispatch(eventParam->_poolWorkBase);
  }
  if (eventParam->_poolWorkBase) {
    LOG_DEBUG("ConnectedPool(%p) event_base_free ...", arg);
    event_base_free(eventParam->_poolWorkBase);
    eventParam->_poolWorkBase = NULL;
  }

  eventParam->_poolWorkThreadId = 0;

  LOG_DEBUG("ConnectedPool(%p) loopConnectedPoolEventCallback exit.", arg);

#if defined(_MSC_VER)
  return Success;
#else
  return NULL;
#endif
}

void ConnectedPool::connectPoolEventCallback(evutil_socket_t socketFd,
                                             short event, void *arg) {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_a, timewait_end = 0;
  ConnectedPool *pool = static_cast<ConnectedPool *>(arg);
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(pool->_lock, pool);
  timewait_a = utility::TextUtils::GetTimestampMs();
#else
  ConnectedPool *pool = static_cast<ConnectedPool *>(arg);
  MUTEX_LOCK_WITH_TAG(pool->_lock, pool);
#endif

  LOG_DEBUG("Pool(%p) connectPoolEventCallback checking every pre-node ...",
            pool);

  int releaseCount = 0;

  if (event == EV_CLOSED) {
  } else {
    // event == EV_TIMEOUT
    if (pool->_fssRequests.work) {
      releaseCount +=
          pool->timeoutPrestartedNode(&pool->_fssRequests.prestartedRequests);
      releaseCount += pool->timeoutPreconnectedNode(
          &pool->_fssRequests.preconnectedRequests);
    }
    if (pool->_srRequests.work) {
      releaseCount +=
          pool->timeoutPrestartedNode(&pool->_srRequests.prestartedRequests);
      releaseCount += pool->timeoutPreconnectedNode(
          &pool->_srRequests.preconnectedRequests);
    }
    if (pool->_stRequests.work) {
      releaseCount +=
          pool->timeoutPrestartedNode(&pool->_stRequests.prestartedRequests);
      releaseCount += pool->timeoutPreconnectedNode(
          &pool->_stRequests.preconnectedRequests);
    }
    if (pool->_syRequests.work) {
      releaseCount +=
          pool->timeoutPrestartedNode(&pool->_syRequests.prestartedRequests);
      // pool->showEveryNode(&pool->_syRequests.prestartedRequests,
      //                     "syPrestarted after timeout");
      releaseCount += pool->timeoutPreconnectedNode(
          &pool->_syRequests.preconnectedRequests);
      // pool->showEveryNode(&pool->_syRequests.preconnectedRequests,
      //                     "syPreconnected after timeout");
    }
  }

  MUTEX_UNLOCK_WITH_TAG(pool->_lock, pool);

  evtimer_add(pool->_connectPoolEvent, &pool->_connectPoolTimerTv);

  if (releaseCount > 0) {
    event_active(pool->_nodeReleaseEvent, EV_READ, 0);
  }

#ifdef ENABLE_NLS_DEBUG_2
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN(
        "Pool(%p) connectPoolEventCallback done with excessive time:%llums, "
        "lock:%llums, work:%llums",
        pool, timewait_end - timewait_start, timewait_a - timewait_start,
        timewait_end - timewait_a);
  } else {
    LOG_DEBUG("Pool(%p) connectPoolEventCallback done", pool);
  }
#else
  LOG_DEBUG("Pool(%p) connectPoolEventCallback done", pool);
#endif
  return;
}

void ConnectedPool::nodeReleaseEventCallback(evutil_socket_t socketFd,
                                             short event, void *arg) {
  ConnectedPool *pool = static_cast<ConnectedPool *>(arg);

  LOG_DEBUG(
      "Pool(%p) nodeReleaseEventCallback ready to delete or preconnect ...",
      pool);

  if (event == EV_READ) {
    if (pool->_fssRequests.work) {
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_fssRequests.prestartedRequests, "fssPrestarted");
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_fssRequests.preconnectedRequests, "fssPreconnected");
    }
    if (pool->_srRequests.work) {
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_srRequests.prestartedRequests, "srPrestarted");
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_srRequests.preconnectedRequests, "srPreconnected");
    }
    if (pool->_stRequests.work) {
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_stRequests.prestartedRequests, "stPrestarted");
      // pool->showEveryNode(&pool->_stRequests.prestartedRequests,
      //                     "stPrestarted");
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_stRequests.preconnectedRequests, "stPreconnected");
      // pool->showEveryNode(&pool->_stRequests.preconnectedRequests,
      //                     "stPreconnected");
    }
    if (pool->_syRequests.work) {
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_syRequests.prestartedRequests, "syPrestarted");
      pool->deleteOrPreconnectNodeShouldReleased(
          &pool->_syRequests.preconnectedRequests, "syPreconnected");
    }
  }

  LOG_DEBUG("Pool(%p) nodeReleaseEventCallback done", pool);
}

int ConnectedPool::startConnectedPool() {
  // uint64_t timewait_start, timewait_end = 0;
  // timewait_start = utility::TextUtils::GetTimestampMs();
  // MUTEX_LOCK(_lock);
  // timewait_end = utility::TextUtils::GetTimestampMs();
  // if (timewait_end - timewait_start > 50) {
  //   LOG_WARN(
  //       "ConnectedPool(%p) startConnectedPool lock with excessive time
  //       %llums.", this, timewait_end - timewait_start);
  // } else {
  //   LOG_DEBUG("ConnectedPool(%p) startConnectedPool ...", this);
  // }
  // MUTEX_UNLOCK(_lock);
  return Success;
}

int ConnectedPool::stopConnectedPool() {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK(_lock);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN(
        "ConnectedPool(%p) stopConnectedPool lock with excessive time %llums.",
        this, timewait_end - timewait_start);
  } else {
    LOG_DEBUG("ConnectedPool(%p) stopConnectedPool ...", this);
  }
#else
  MUTEX_LOCK(_lock);
  LOG_DEBUG("ConnectedPool(%p) stopConnectedPool ...", this);
#endif

  if (_connectPoolEvent != NULL) {
    if (_connectPoolTimerFlag) {
      evtimer_del(_connectPoolEvent);
      _connectPoolTimerFlag = false;
    } else {
      LOG_DEBUG("ConnectedPool(%p) PoolEvent isnot in working ...", this);
    }
    event_free(_connectPoolEvent);
    _connectPoolEvent = NULL;
  }

  if (_nodeReleaseEvent) {
    event_del(_nodeReleaseEvent);
    event_free(_nodeReleaseEvent);
    _nodeReleaseEvent = NULL;
  }

  LOG_INFO("ConnectedPool(%p) break evbase(%p) ...", this, _poolWorkBase);
  event_base_loopbreak(_poolWorkBase);
  return Success;
}

bool ConnectedPool::popPrestartedNode(INlsRequest *request, NlsType type) {
  if (request == NULL) {
    return false;
  }

#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN(
        "ConnectedPool(%p) popPrestartedNode Request(%p) lock with excessive "
        "time %llums.",
        this, request, timewait_end - timewait_start);
  } else {
    LOG_DEBUG("ConnectedPool(%p) popPrestartedNode Request(%p) ...", this,
              request);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
  LOG_DEBUG("ConnectedPool(%p) popPrestartedNode Request(%p) ...", this,
            request);
#endif

  int ret = Success;
  // 0. 当前池子没有待工作的节点, 则做准备
  int prestarted = 0;
  int preconnected = 0;
  ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
  if (prestarted == 0 || preconnected == 0) {
    // 0.1 填充preconnectedRequests和prestartedRequests
    ret = initThisNodesPool(type);
    LOG_DEBUG(
        "ConnectedPool(%p) popPrestartedNode initThisNodesPool done with "
        "request(%p). prestarted is %d, preconnected is %d.",
        this, request, prestarted, preconnected);
    MUTEX_UNLOCK_WITH_TAG(_lock, request);
    return false;
  } else {
    int count = getNumberOfPrestartedNodes(type);
    LOG_DEBUG(
        "ConnectedPool(%p) popPrestartedNode, request(%p), prestarted is %d, "
        "count of PrestartedNodes is %d.",
        this, request, prestarted, count);
    if (prestarted > 0 && count > 0) {
      // 1. 存在started的工作节点, 获取此节点
      if (popOnePrestartedNode(request, type)) {
        MUTEX_UNLOCK_WITH_TAG(_lock, request);
        return true;
      }
    }
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return false;
}

bool ConnectedPool::popPreconnectedNode(INlsRequest *request, NlsType type) {
  if (request == NULL) {
    return false;
  }

#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
#endif

  int ret = Success;
  // 0. 当前池子没有待工作的节点, 则做准备
  int prestarted = 0;
  int preconnected = 0;
  ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
  if (prestarted == 0 || preconnected == 0) {
    // 0.1 填充preconnectedRequests和prestartedRequests
    ret = initThisNodesPool(type);
    MUTEX_UNLOCK_WITH_TAG(_lock, request);
    return false;
  } else {
    if (preconnected > 0 && getNumberOfPreconnectedNodes(type) > 0) {
      // 2. 存在connected的工作节点, 获取此节点
      if (popOnePreconnectedNode(request, type)) {
        MUTEX_UNLOCK_WITH_TAG(_lock, request);
        return true;
      }
    }
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return false;
}

bool ConnectedPool::pushPreconnectedNode(INlsRequest *request, NlsType type,
                                         bool newNode) {
  if (request == NULL) {
    return false;
  }

#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
#endif

  int index = request->getConnectNode()->getPoolIndex();

  LOG_DEBUG(
      "ConnectedPool(%p) input %s request(%p) ndoe(%p) "
      "with "
      "index(%d) and "
      "type[(%d)"
      "0:Asr,1:SpeechTranscriber,2:TTS,3:StreamInputTts] ...",
      this, newNode ? "new" : "old", request, request->getConnectNode(), index,
      type);

  int ret = Success;
  // 0. 当前池子没有待工作的节点, 则做准备
  int prestarted = 0;
  int preconnected = 0;
  ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
  if (prestarted == 0 || preconnected == 0) {
    // 0.1 填充preconnectedRequests和prestartedRequests
    ret = initThisNodesPool(type);
  }

  evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
  SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();

  struct ConnectedPoolProcess *poolProcess = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  switch (type) {
    case TypeAsr:
      poolProcess = &_srRequests;
      curPool = &_srRequests.preconnectedRequests;
      break;
    case TypeRealTime:
      poolProcess = &_stRequests;
      curPool = &_stRequests.preconnectedRequests;
      break;
    case TypeTts:
      poolProcess = &_syRequests;
      curPool = &_syRequests.preconnectedRequests;
      break;
    case TypeStreamInputTts:
      poolProcess = &_fssRequests;
      curPool = &_fssRequests.preconnectedRequests;
      break;
    default:
      break;
  }

  if (curPool) {
    if (!newNode) {
      /* 查找是否已经存在, 则更新部最近一次操作时间戳.
       * 且push操作表示此SSL已经用完 */
      if (curPool->size() >= index) {
        struct ConnectedNodeProcess &node = (*curPool)[index];
        if (node.status == PreNodeConnected) {
#ifdef ENABLE_NLS_DEBUG_2
          LOG_DEBUG(
              "ConnectedPool(%p) pushPreconnectedNode request(%p) compare to "
              "request(%p) ...",
              this, request, node.request);
#endif
          evutil_socket_t itSocketFd =
              node.request->getConnectNode()->getSocketFd();
          SSLconnect *itSslHandle =
              node.request->getConnectNode()->getSslHandle();

          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            uint64_t oldTimestamp = node.workableTimestamp;
            node.workableTimestamp = request->getConnectNode()
                                         ->getNodeProcess()
                                         ->last_op_timestamp_ms;
            node.canPick = false;
            /* curRequest在finishPushPreNode时再置NULL */
            // node.curRequest = NULL;
            node.curRequestInvalid = true;

            LOG_INFO(
                "ConnectedPool(%p) pushPreconnectedNode input request(%p) "
                "node(%p) "
                "is existent in index(%d/%d), update last operation "
                "timestamp(from %s to %s). SSL handle is "
                "%p and SocketFd is %d.",
                this, request, request->getConnectNode(), index,
                curPool->size(),
                utility::TextUtils::GetTimeFromMs(oldTimestamp).c_str(),
                utility::TextUtils::GetTimeFromMs(node.workableTimestamp)
                    .c_str(),
                node.sslHandle, node.socketFd);
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
          }
        }
      }
    }

    /* 若不存在, 则存储 */
    for (std::vector<struct ConnectedNodeProcess>::iterator it =
             curPool->begin();
         it != curPool->end(); ++it) {
      if (it->status == PreNodeToBeCreated && !it->shouldRelease &&
          it->request == NULL) {
        index = std::distance(curPool->begin(), it);
        it->type = request->getRequestParam()->_mode;
        it->status = PreNodeConnected;
        it->workableTimestamp =
            request->getConnectNode()->getNodeProcess()->last_op_timestamp_ms;
        uint64_t oldTimestamp = it->workableTimestamp;
        it->startTimestamp = it->workableTimestamp;
        if (request->getRequestParam()->_mode == TypeTts) {
          it->ttsVersion = request->getRequestParam()->getVersion();
        }
        it->sdkName = request->getRequestParam()->getSdkName();
        it->startedResponse.clear();
        it->request = request;
        it->socketFd = curSocketFd;
        it->sslHandle = curSslHandle;
        it->canPick = false;
        it->curRequest = NULL;
        it->shouldRelease = false;
        it->shouldPreconnect = false;
        it->curRequestInvalid = false;
        it->isAbnormal = false;
        request->getConnectNode()->setPoolIndex(index);
        poolProcess->work = true;

        uint64_t tokenExpirationTimestamp =
            request->getRequestParam()->_tokenExpirationTime;
        const int redundancyTimeDiffMs = 3600000;  // 1h
        if (tokenExpirationTimestamp > redundancyTimeDiffMs) {
          uint64_t nowTimestamp = utility::TextUtils::GetTimestampMs();
          const uint64_t diffTimeMs = 43200000;  // 12h
          it->tokenExpirationTimestamp =
              (tokenExpirationTimestamp >= nowTimestamp + diffTimeMs)
                  ? nowTimestamp + diffTimeMs
                  : (tokenExpirationTimestamp >=
                             nowTimestamp + redundancyTimeDiffMs
                         ? tokenExpirationTimestamp - redundancyTimeDiffMs
                         : tokenExpirationTimestamp);
        }

        LOG_INFO(
            "ConnectedPool(%p) pushPreconnectedNode input request(%p) node(%p) "
            "done in index(%d/%d), start timestamp(%s), last operation "
            "timestamp(%s), token expiration timestamp(%s). SSL handle "
            "is "
            "%p and SocketFd is %d.",
            this, request, request->getConnectNode(), index, curPool->size(),
            utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
            utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
            utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
                .c_str(),
            it->sslHandle, it->socketFd);
        MUTEX_UNLOCK_WITH_TAG(_lock, request);
        return true;
      }
    }  // for
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return false;
}

bool ConnectedPool::pushPrestartedNode(INlsRequest *request, NlsType type,
                                       bool newNode) {
  if (request == NULL) {
    return false;
  }

#ifdef ENABLE_NLS_DEBUG_2
  uint64_t a_ms = utility::TextUtils::GetTimestampMs();
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  } else {
    LOG_DEBUG("ConnectedPool(%p) pushPrestartedNode request(%p) ...", this,
              request);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
#endif

  int index = request->getConnectNode()->getPoolIndex();

#ifdef ENABLE_NLS_DEBUG_2
  uint64_t b_ms = utility::TextUtils::GetTimestampMs();
#endif

  LOG_DEBUG(
      "ConnectedPool(%p) input %s request(%p) ndoe(%p) with "
      "index(%d) and "
      "type[(%d)"
      "0:Asr,1:SpeechTranscriber,2:TTS,3:StreamInputTts] ...",
      this, newNode ? "new" : "old", request, request->getConnectNode(), index,
      type);

  int ret = Success;
  // 0. 当前池子没有待工作的节点, 则做准备
  int prestarted = 0;
  int preconnected = 0;
  ret = getNumberOfThisTypeNodes(type, prestarted, preconnected);
  if (prestarted == 0 || preconnected == 0) {
    // 0.1 填充preconnectedRequests和prestartedRequests
    ret = initThisNodesPool(type);
  }

#ifdef ENABLE_NLS_DEBUG_2
  uint64_t c_ms = utility::TextUtils::GetTimestampMs();
#endif

  evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
  SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();

  struct ConnectedPoolProcess *poolProcess = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  switch (type) {
    case TypeAsr:
      poolProcess = &_srRequests;
      curPool = &_srRequests.prestartedRequests;
      break;
    case TypeRealTime:
      poolProcess = &_stRequests;
      curPool = &_stRequests.prestartedRequests;
      break;
    case TypeTts:
      poolProcess = &_syRequests;
      curPool = &_syRequests.prestartedRequests;
      break;
    case TypeStreamInputTts:
      poolProcess = &_fssRequests;
      curPool = &_fssRequests.prestartedRequests;
      break;
    default:
      break;
  }

  if (curPool) {
    if (!newNode) {
      /* 查找是否已经存在, 则更新部最近一次操作时间戳.
       * 且push操作表示此SSL已经用完 */
      if (curPool->size() >= index) {
        struct ConnectedNodeProcess &node = (*curPool)[index];
        if (node.status == PreNodeStarted) {
#ifdef ENABLE_NLS_DEBUG_2
          LOG_DEBUG(
              "ConnectedPool(%p) pushPrestartedNode request(%p) compare to "
              "request(%p) ...",
              this, request, node.request);
#endif
          evutil_socket_t itSocketFd =
              node.request->getConnectNode()->getSocketFd();
          SSLconnect *itSslHandle =
              node.request->getConnectNode()->getSslHandle();

          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            uint64_t oldTimestamp = node.workableTimestamp;
            node.workableTimestamp = request->getConnectNode()
                                         ->getNodeProcess()
                                         ->last_op_timestamp_ms;
            node.canPick = false;
            /* curRequest在finishPushPreNode时再置NULL */
            // node.curRequest = NULL;
            node.curRequestInvalid = true;

#ifdef ENABLE_NLS_DEBUG_2
            uint64_t d_ms = utility::TextUtils::GetTimestampMs();
            LOG_DEBUG(
                "ConnectedPool(%p) pushPrestartedNode latency request(%p) "
                "lock:%llu init:%llu exist:%llu, index:%d",
                this, request, b_ms - a_ms, c_ms - b_ms, d_ms - c_ms, index);
#endif

            LOG_INFO(
                "ConnectedPool(%p) pushPrestartedNode input request(%p) "
                "node(%p) "
                "is existent in index(%d/%d), update last operation "
                "timestamp(from %s to %s). SSL handle is "
                "%p and SocketFd is %d.",
                this, request, request->getConnectNode(), index,
                curPool->size(),
                utility::TextUtils::GetTimeFromMs(oldTimestamp).c_str(),
                utility::TextUtils::GetTimeFromMs(node.workableTimestamp)
                    .c_str(),
                node.sslHandle, node.socketFd);
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
          }
        }
      }
    }

#ifdef ENABLE_NLS_DEBUG_2
    uint64_t d_ms = utility::TextUtils::GetTimestampMs();
#endif

    /* 若不存在, 则存储 */
    for (std::vector<struct ConnectedNodeProcess>::iterator it =
             curPool->begin();
         it != curPool->end(); ++it) {
      if (it->status == PreNodeToBeCreated && !it->shouldRelease &&
          it->request == NULL) {
        index = std::distance(curPool->begin(), it);
        it->type = request->getRequestParam()->_mode;
        it->status = PreNodeStarted;
        it->workableTimestamp =
            request->getConnectNode()->getNodeProcess()->last_op_timestamp_ms;
        it->startTimestamp = it->workableTimestamp;
        if (request->getRequestParam()->_mode == TypeTts) {
          it->ttsVersion = request->getRequestParam()->getVersion();
        }
        it->sdkName = request->getRequestParam()->getSdkName();
        it->startedResponse.clear();
        it->request = request;
        it->socketFd = curSocketFd;
        it->sslHandle = curSslHandle;
        it->canPick = false;
        it->curRequest = NULL;
        it->shouldRelease = false;
        it->shouldPreconnect = false;
        it->curRequestInvalid = false;
        it->isAbnormal = false;
        request->getConnectNode()->setPoolIndex(index);
        poolProcess->work = true;

        uint64_t tokenExpirationTimestamp =
            request->getRequestParam()->_tokenExpirationTime;
        const int redundancyTimeDiffMs = 3600000;  // 1h
        if (tokenExpirationTimestamp > redundancyTimeDiffMs) {
          uint64_t nowTimestamp = utility::TextUtils::GetTimestampMs();
          const uint64_t diffTimeMs = 43200000;  // 12h
          it->tokenExpirationTimestamp =
              (tokenExpirationTimestamp >= nowTimestamp + diffTimeMs)
                  ? nowTimestamp + diffTimeMs
                  : (tokenExpirationTimestamp >=
                             nowTimestamp + redundancyTimeDiffMs
                         ? tokenExpirationTimestamp - redundancyTimeDiffMs
                         : tokenExpirationTimestamp);
        }

#ifdef ENABLE_NLS_DEBUG_2
        uint64_t e_ms = utility::TextUtils::GetTimestampMs();
        LOG_DEBUG(
            "ConnectedPool(%p) pushPrestartedNode latency request(%p) "
            "lock:%llu init:%llu exist:%llu inexist:%llu",
            this, request, b_ms - a_ms, c_ms - b_ms, d_ms - c_ms, e_ms - d_ms);
#endif

        LOG_INFO(
            "ConnectedPool(%p) pushPrestartedNode input request(%p) node(%p) "
            "done in index(%d/%d), start timestamp(%s), last operation "
            "timestamp(%s), token expiration timestamp(%s). SSL handle "
            "is "
            "%p and SocketFd is %d.",
            this, request, request->getConnectNode(), index, curPool->size(),
            utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
            utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
            utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
                .c_str(),
            it->sslHandle, it->socketFd);
        MUTEX_UNLOCK_WITH_TAG(_lock, request);
        return true;
      }
    }  // for
  }

#ifdef ENABLE_NLS_DEBUG_2
  uint64_t e_ms = utility::TextUtils::GetTimestampMs();
  LOG_DEBUG(
      "ConnectedPool(%p) pushPrestartedNode latency request(%p) "
      "lock:%llu init:%llu work:%llu",
      this, request, b_ms - a_ms, c_ms - b_ms, e_ms - c_ms);
#endif
  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return false;
}

bool ConnectedPool::pushPrestartedNodeFromPreconnected(INlsRequest *request,
                                                       NlsType type) {
  if (request == NULL) {
    return false;
  }

  MUTEX_LOCK_WITH_TAG(_lock, request);

  int index = request->getConnectNode()->getPoolIndex();
  if (index < 0) {
    LOG_WARN(
        "ConnectedPool(%p) input "
        "request(%p) node(%p) "
        "failed with index:%d.",
        this, request, request->getConnectNode(), index);
    MUTEX_UNLOCK_WITH_TAG(_lock, request);
    return false;
  }

  LOG_DEBUG(
      "ConnectedPool(%p) input request(%p) "
      "ndoe(%p) with "
      "index(%d) and "
      "type[(%d)"
      "0:Asr,1:SpeechTranscriber,2:TTS,3:StreamInputTts] ...",
      this, request, request->getConnectNode(), index, type);

  int ret = Success;
  evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
  SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();

  struct ConnectedPoolProcess *poolProcess = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool0 = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  switch (type) {
    case TypeAsr:
      poolProcess = &_srRequests;
      curPool0 = &_srRequests.preconnectedRequests;
      curPool = &_srRequests.prestartedRequests;
      break;
    case TypeRealTime:
      poolProcess = &_stRequests;
      curPool0 = &_stRequests.preconnectedRequests;
      curPool = &_stRequests.prestartedRequests;
      break;
    case TypeTts:
      poolProcess = &_syRequests;
      curPool0 = &_syRequests.preconnectedRequests;
      curPool = &_syRequests.prestartedRequests;
      break;
    case TypeStreamInputTts:
      poolProcess = &_fssRequests;
      curPool0 = &_fssRequests.preconnectedRequests;
      curPool = &_fssRequests.prestartedRequests;
      break;
    default:
      break;
  }

  bool result = false;
  if (curPool0 && curPool) {
    /* 查找是否已经存在 */
    if (curPool0->size() >= index) {
      struct ConnectedNodeProcess &node = (*curPool0)[index];
      if (node.status == PreNodeConnected) {
        evutil_socket_t itSocketFd =
            node.request->getConnectNode()->getSocketFd();
        SSLconnect *itSslHandle =
            node.request->getConnectNode()->getSslHandle();

        if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
          for (std::vector<struct ConnectedNodeProcess>::iterator it =
                   curPool->begin();
               it != curPool->end(); ++it) {
            if (it->status == PreNodeToBeCreated && !it->shouldRelease &&
                it->request == NULL) {
              index = std::distance(curPool->begin(), it);
              it->type = node.request->getRequestParam()->_mode;
              it->status = PreNodeStarted;
              it->workableTimestamp = node.request->getConnectNode()
                                          ->getNodeProcess()
                                          ->last_op_timestamp_ms;
              it->startTimestamp = it->workableTimestamp;
              if (node.request->getRequestParam()->_mode == TypeTts) {
                it->ttsVersion = node.request->getRequestParam()->getVersion();
              }
              it->sdkName = request->getRequestParam()->getSdkName();
              it->startedResponse.clear();
              it->request = node.request;
              it->socketFd = curSocketFd;
              it->sslHandle = curSslHandle;
              it->canPick = false;
              it->curRequest = NULL;
              node.request->getConnectNode()->setPoolIndex(index);
              poolProcess->work = true;

              uint64_t tokenExpirationTimestamp =
                  node.request->getRequestParam()->_tokenExpirationTime;
              const int redundancyTimeDiffMs = 3600000;  // 1h
              if (tokenExpirationTimestamp > redundancyTimeDiffMs) {
                uint64_t nowTimestamp = utility::TextUtils::GetTimestampMs();
                const uint64_t diffTimeMs = 43200000;  // 12h
                it->tokenExpirationTimestamp =
                    (tokenExpirationTimestamp >= nowTimestamp + diffTimeMs)
                        ? nowTimestamp + diffTimeMs
                        : (tokenExpirationTimestamp >=
                                   nowTimestamp + redundancyTimeDiffMs
                               ? tokenExpirationTimestamp - redundancyTimeDiffMs
                               : tokenExpirationTimestamp);
              }

              LOG_INFO(
                  "ConnectedPool(%p) pushPrestartedNodeFromPreconnected input "
                  "request(%p) node(%p) "
                  "done in index(%d/%d), start timestamp(%s), last operation "
                  "timestamp(%s), token expiration timestamp(%s). SSL handle "
                  "is "
                  "%p and SocketFd is %d.",
                  this, request, request->getConnectNode(), index,
                  curPool->size(),
                  utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
                  utility::TextUtils::GetTimeFromMs(it->workableTimestamp)
                      .c_str(),
                  utility::TextUtils::GetTimeFromMs(
                      it->tokenExpirationTimestamp)
                      .c_str(),
                  it->sslHandle, it->socketFd);

              result = true;
              break;
            }
          }  // for

          // empty this node
          node.clearNode();
          // node.status = PreNodeToBeCreated;
          // node.startTimestamp = 0;
          // node.workableTimestamp = 0;
          // node.tokenExpirationTimestamp = 0;
          // node.socketFd = INVALID_SOCKET;
          // node.sslHandle = NULL;
          // node.ttsVersion = 0;
          // node.sdkName.clear();
          // node.startedResponse.clear();
          // node.request = NULL;
          // node.curRequest = NULL;
          // node.canPick = false;
          // node.isAbnormal = false;
          // node.shouldRelease = false;
          // node.shouldPreconnect = false;
          // node.curRequestInvalid = false;
        }  // find
      }
    }
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return result;
}

bool ConnectedPool::sslBelongToPool(INlsRequest *request, NlsType type,
                                    bool &oriRequestIsAbnormal,
                                    bool &requestInPool) {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  } else {
    LOG_DEBUG("ConnectedPool(%p) Request(%p) with lock(%p) sslBelongToPool ...",
              this, request, &_lock);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
#endif

  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
  SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();
  int preSize = getNumberOfPrestartedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.prestartedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.prestartedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.prestartedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.prestartedRequests;
        break;
      default:
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeStarted) {
          evutil_socket_t itSocketFd = it->socketFd;
          SSLconnect *itSslHandle = it->sslHandle;
          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            oriRequestIsAbnormal = it->isAbnormal;
            if (request == it->request) {
              requestInPool = true;
            }
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
          }
        }
      }  // for
    }    // curPool
  }

  preSize = getNumberOfPreconnectedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.preconnectedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.preconnectedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.preconnectedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.preconnectedRequests;
        break;
      default:
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeConnected) {
          evutil_socket_t itSocketFd = it->socketFd;
          SSLconnect *itSslHandle = it->sslHandle;
          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            oriRequestIsAbnormal = it->isAbnormal;
            if (request == it->request) {
              requestInPool = true;
            }
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
          }
        }
      }  // for
    }    // curPool
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return false;
}

void ConnectedPool::curRequestIsAbnormal(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
#endif

  std::list<int> *curList = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  evutil_socket_t curSocketFd = request->getConnectNode()->getSocketFd();
  SSLconnect *curSslHandle = request->getConnectNode()->getSslHandle();
  int preSize = getNumberOfPrestartedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.prestartedRequests;
        curList = &_srRequests.prestartedIndexList;
        break;
      case TypeRealTime:
        curPool = &_stRequests.prestartedRequests;
        curList = &_stRequests.prestartedIndexList;
        break;
      case TypeTts:
        curPool = &_syRequests.prestartedRequests;
        curList = &_syRequests.prestartedIndexList;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.prestartedRequests;
        curList = &_fssRequests.prestartedIndexList;
        break;
      default:
        curPool = NULL;
        curList = NULL;
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeStarted) {
          evutil_socket_t itSocketFd = it->socketFd;
          SSLconnect *itSslHandle = it->sslHandle;
          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            it->isAbnormal = true;
            int index = std::distance(curPool->begin(), it);
            removeElement(*curList, index);
            LOG_INFO(
                "ConnectedPool(%p) curRequestIsAbnormal Request(%p) SSL(%p) "
                "SocketFd(%d) Index(%d) is abnormal.",
                this, request, curSslHandle, curSocketFd, index);
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return;
          }
        }
      }  // for
    }    // curPool
  }

  preSize = getNumberOfPreconnectedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.preconnectedRequests;
        curList = &_srRequests.preconnectedIndexList;
        break;
      case TypeRealTime:
        curPool = &_stRequests.preconnectedRequests;
        curList = &_stRequests.preconnectedIndexList;
        break;
      case TypeTts:
        curPool = &_syRequests.preconnectedRequests;
        curList = &_syRequests.preconnectedIndexList;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.preconnectedRequests;
        curList = &_fssRequests.preconnectedIndexList;
        break;
      default:
        curPool = NULL;
        curList = NULL;
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeConnected) {
          evutil_socket_t itSocketFd = it->socketFd;
          SSLconnect *itSslHandle = it->sslHandle;
          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            it->isAbnormal = true;
            int index = std::distance(curPool->begin(), it);
            removeElement(*curList, index);
            LOG_INFO(
                "ConnectedPool(%p) curRequestIsAbnormal Request(%p) SSL(%p) "
                "SocketFd(%d) Index(%d) is abnormal.",
                this, request, curSslHandle, curSocketFd, index);
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return;
          }
        }
      }  // for
    }    // curPool
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  LOG_DEBUG("ConnectedPool(%p) Request(%p) curRequestIsAbnormal done", this,
            request);
}

void ConnectedPool::finishPushPreNode(NlsType type, evutil_socket_t curSocketFd,
                                      SSLconnect *curSslHandle, int index,
                                      INlsRequest *request) {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  } else {
    LOG_DEBUG(
        "ConnectedPool(%p) input request(%p) with lock(%p), SSL "
        "handle "
        "is "
        "%p and SocketFd is %d. Index is %d.",
        this, request, &_lock, curSslHandle, curSocketFd, index);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
  LOG_DEBUG(
      "ConnectedPool(%p) input request(%p) with lock(%p), SSL "
      "handle "
      "is "
      "%p and SocketFd is %d. Index is %d.",
      this, request, &_lock, curSslHandle, curSocketFd, index);
#endif

  int findIndex = index >= 0 ? index : 0;
  std::list<int> *curList = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  int preSize = getNumberOfPrestartedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.prestartedRequests;
        curList = &_srRequests.prestartedIndexList;
        break;
      case TypeRealTime:
        curPool = &_stRequests.prestartedRequests;
        curList = &_stRequests.prestartedIndexList;
        break;
      case TypeTts:
        curPool = &_syRequests.prestartedRequests;
        curList = &_syRequests.prestartedIndexList;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.prestartedRequests;
        curList = &_fssRequests.prestartedIndexList;
        break;
      default:
        curPool = NULL;
        curList = NULL;
        break;
    }

#ifdef ENABLE_NLS_DEBUG_2
    uint64_t a_ms = utility::TextUtils::GetTimestampMs();
#endif

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin() + findIndex; it != curPool->end(); ++it) {
        // LOG_DEBUG(
        //     "ConnectedPool(%p) finishPushPreNode index(%d) Request(%p), it
        //     SSL " "handle " "is "
        //     "%p and SocketFd is %d. status is %d. cur SSL handle is %p and "
        //     "SocketFd is %d.",
        //     this, std::distance(curPool->begin(), it), it->request,
        //     it->sslHandle, it->socketFd, it->status, curSslHandle,
        //     curSocketFd);

        if (it->status == PreNodeStarted) {
          evutil_socket_t itSocketFd = it->socketFd;
          SSLconnect *itSslHandle = it->sslHandle;
          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            int curIndex = std::distance(curPool->begin(), it);
#ifdef ENABLE_NLS_DEBUG_2
            uint64_t b_ms = utility::TextUtils::GetTimestampMs();
            LOG_DEBUG(
                "ConnectedPool(%p) finishPushPreNode latency PreNodeStarted "
                "request(%p) "
                "finish:%llu",
                this, it->request, b_ms - a_ms);
#endif
            LOG_INFO(
                "ConnectedPool(%p) request(%p) node(%p) [curRequest(%p)] can "
                "pick. "
                "SSL handle is %p and SocketFd is %d. Index is %d.",
                this, it->request, it->request->getConnectNode(),
                it->curRequest, it->sslHandle, it->socketFd, curIndex);

            it->canPick = true;
            it->curRequest = NULL;
            it->curRequestInvalid = false;
            insertListInOrder(*curList, curIndex);

            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return;
          }
        }
      }  // for
    }    // curPool
  } else {
    // LOG_ERROR("ConnectedPool(%p) Request(%p) cannot get prestarted pool!",
    // this,
    //           request);
  }

  preSize = getNumberOfPreconnectedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.preconnectedRequests;
        curList = &_srRequests.preconnectedIndexList;
        break;
      case TypeRealTime:
        curPool = &_stRequests.preconnectedRequests;
        curList = &_stRequests.preconnectedIndexList;
        break;
      case TypeTts:
        curPool = &_syRequests.preconnectedRequests;
        curList = &_syRequests.preconnectedIndexList;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.preconnectedRequests;
        curList = &_fssRequests.preconnectedIndexList;
        break;
      default:
        curPool = NULL;
        curList = NULL;
        break;
    }

    if (curPool) {
#ifdef ENABLE_NLS_DEBUG_2
      uint64_t c_ms = utility::TextUtils::GetTimestampMs();
#endif
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin() + findIndex; it != curPool->end(); ++it) {
        if (it->status == PreNodeConnected) {
          evutil_socket_t itSocketFd = it->socketFd;
          SSLconnect *itSslHandle = it->sslHandle;
          if (curSocketFd == itSocketFd && curSslHandle == itSslHandle) {
            int curIndex = std::distance(curPool->begin(), it);
            LOG_INFO(
                "ConnectedPool(%p) request(%p) node(%p) [curRequest(%p)] can "
                "pick. "
                "SSL handle is %p and SocketFd is %d. Index is %d.",
                this, it->request, it->request->getConnectNode(),
                it->curRequest, it->sslHandle, it->socketFd, curIndex);

            it->canPick = true;
            it->curRequest = NULL;
            it->curRequestInvalid = false;
            insertListInOrder(*curList, curIndex);

            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return;
          }
        }
      }  // for

#ifdef ENABLE_NLS_DEBUG_2
      uint64_t d_ms = utility::TextUtils::GetTimestampMs();
      LOG_DEBUG(
          "ConnectedPool(%p) finishPushPreNode latency PreNodeConnected "
          "finish:%llu",
          this, d_ms - c_ms);
#endif
    }  // curPool
  } else {
    // LOG_ERROR("ConnectedPool(%p) Request(%p) cannot get preconnected pool!",
    //           this, request);
  }

  LOG_ERROR("ConnectedPool(%p) Request(%p) finishPushPreNode occur exception!",
            this, request);
  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return;
}

bool ConnectedPool::requestInPool(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
#endif

  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  int preSize = getNumberOfPrestartedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.prestartedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.prestartedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.prestartedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.prestartedRequests;
        break;
      default:
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeStarted) {
          if (request == it->request &&
              request->getConnectNode() == it->request->getConnectNode()) {
            LOG_DEBUG(
                "find prestarted node of request(%p) and node(%p) in "
                "size(%d).",
                request, request->getConnectNode(), preSize);
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
#ifdef ENABLE_NLS_DEBUG_2
          } else {
            LOG_DEBUG(
                "request(%p) is not prestarted request(%p) node(%p) with "
                "index(%d).",
                request, it->request, it->request->getConnectNode(),
                std::distance(curPool->begin(), it));
#endif
          }
        }
      }  // for
    }
  }

  preSize = getNumberOfPreconnectedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.preconnectedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.preconnectedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.preconnectedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.preconnectedRequests;
        break;
      default:
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeConnected) {
          if (request == it->request &&
              request->getConnectNode() == it->request->getConnectNode()) {
            LOG_DEBUG(
                "find preconnected node of request(%p) and node(%p) in "
                "size(%d).",
                request, request->getConnectNode(), preSize);
            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
#ifdef ENABLE_NLS_DEBUG_2
          } else {
            LOG_DEBUG("request(%p) is not preconnected request(%p) node(%p)",
                      request, it->request, it->request->getConnectNode());
#endif
          }
        }
      }  // for
    }
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return false;
}

bool ConnectedPool::deletePreNodeByRequest(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, request);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) Request(%p) lock with excessive time %llums.",
             this, request, timewait_end - timewait_start);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, request);
#endif

  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  int preSize = getNumberOfPrestartedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.prestartedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.prestartedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.prestartedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.prestartedRequests;
        break;
      default:
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeStarted) {
          if (request == it->request &&
              request->getConnectNode() == it->request->getConnectNode()) {
            LOG_DEBUG(
                "find prestarted node of request(%p) and node(%p) in "
                "size(%d) with SSL handler(%p), and remove from pool.",
                request, request->getConnectNode(), preSize, it->sslHandle);

            // empty this node
            it->clearNode();
            // it->status = PreNodeToBeCreated;
            // it->startTimestamp = 0;
            // it->workableTimestamp = 0;
            // it->socketFd = INVALID_SOCKET;
            // it->sslHandle = NULL;
            // it->ttsVersion = 0;
            // it->startedResponse.clear();
            // it->request = NULL;
            // it->curRequest = NULL;
            // it->canPick = false;

            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
#ifdef ENABLE_NLS_DEBUG_2
          } else {
            LOG_DEBUG(
                "request(%p) is not prestarted request(%p) node(%p) with "
                "index(%d).",
                request, it->request, it->request->getConnectNode(),
                std::distance(curPool->begin(), it));
#endif
          }
        }
      }  // for
    }
  }

  preSize = getNumberOfPreconnectedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.preconnectedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.preconnectedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.preconnectedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.preconnectedRequests;
        break;
      default:
        break;
    }

    if (curPool) {
      bool equalFlag = false;
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeConnected) {
          if (request == it->request &&
              request->getConnectNode() == it->request->getConnectNode()) {
            LOG_DEBUG(
                "find preconnected node of request(%p) and node(%p) in "
                "size(%d) with SSL handler(%p), and remove from pool.",
                request, request->getConnectNode(), preSize, it->sslHandle);

            // empty this node
            it->clearNode();
            // it->status = PreNodeToBeCreated;
            // it->startTimestamp = 0;
            // it->workableTimestamp = 0;
            // it->socketFd = INVALID_SOCKET;
            // it->sslHandle = NULL;
            // it->ttsVersion = 0;
            // it->startedResponse = "";
            // it->request = NULL;
            // it->curRequest = NULL;
            // it->canPick = false;

            MUTEX_UNLOCK_WITH_TAG(_lock, request);
            return true;
#ifdef ENABLE_NLS_DEBUG_2
          } else {
            LOG_DEBUG("request(%p) is not preconnected request(%p) node(%p)",
                      request, it->request, it->request->getConnectNode());
#endif
          }
        }
      }  // for
    }
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, request);
  return false;
}

bool ConnectedPool::deletePreNodeBySSL(SSLconnect *curSslHandle, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
  uint64_t timewait_start, timewait_end = 0;
  timewait_start = utility::TextUtils::GetTimestampMs();
  MUTEX_LOCK_WITH_TAG(_lock, curSslHandle);
  timewait_end = utility::TextUtils::GetTimestampMs();
  if (timewait_end - timewait_start > 50) {
    LOG_WARN("ConnectedPool(%p) SSL(%p) lock with excessive time %llums.", this,
             curSslHandle, timewait_end - timewait_start);
  }
#else
  MUTEX_LOCK_WITH_TAG(_lock, curSslHandle);
#endif

  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  int preSize = getNumberOfPrestartedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.prestartedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.prestartedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.prestartedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.prestartedRequests;
        break;
      default:
        break;
    }

    if (curPool) {
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeStarted) {
          if (it->sslHandle == curSslHandle) {
            LOG_DEBUG(
                "find prestarted SSL(%p) in "
                "size(%d), and remove request(%p).",
                curSslHandle, preSize, it->request);

            // empty this node
            if (it->request) {
              delete it->request;
              it->request = NULL;
            }
            it->clearNode();
            // it->status = PreNodeToBeCreated;
            // it->startTimestamp = 0;
            // it->workableTimestamp = 0;
            // it->socketFd = INVALID_SOCKET;
            // it->sslHandle = NULL;
            // it->ttsVersion = 0;
            // it->startedResponse.clear();
            // it->curRequest = NULL;
            // it->canPick = false;
            // it->isAbnormal = false;

            MUTEX_UNLOCK_WITH_TAG(_lock, curSslHandle);
            return true;
          }
        }
      }  // for
    }
  }

  preSize = getNumberOfPreconnectedNodes(type);
  if (preSize > 0) {
    switch (type) {
      case TypeAsr:
        curPool = &_srRequests.preconnectedRequests;
        break;
      case TypeRealTime:
        curPool = &_stRequests.preconnectedRequests;
        break;
      case TypeTts:
        curPool = &_syRequests.preconnectedRequests;
        break;
      case TypeStreamInputTts:
        curPool = &_fssRequests.preconnectedRequests;
        break;
      default:
        curPool = NULL;
        break;
    }

    if (curPool) {
      bool equalFlag = false;
      std::vector<struct ConnectedNodeProcess>::iterator it;
      for (it = curPool->begin(); it != curPool->end(); ++it) {
        if (it->status == PreNodeConnected) {
          if (it->sslHandle == curSslHandle) {
            LOG_DEBUG(
                "find preconnected SSL(%p) in "
                "size(%d), and remove request(%p).",
                curSslHandle, preSize, it->request);

            // empty this node
            if (it->request) {
              delete it->request;
              it->request = NULL;
            }
            it->clearNode();
            // it->status = PreNodeToBeCreated;
            // it->startTimestamp = 0;
            // it->workableTimestamp = 0;
            // it->socketFd = INVALID_SOCKET;
            // it->sslHandle = NULL;
            // it->ttsVersion = 0;
            // it->startedResponse.clear();
            // it->curRequest = NULL;
            // it->canPick = false;
            // it->isAbnormal = false;

            MUTEX_UNLOCK_WITH_TAG(_lock, curSslHandle);
            return true;
          }
        }
      }  // for
    }
  }

  MUTEX_UNLOCK_WITH_TAG(_lock, curSslHandle);
  return false;
}

int ConnectedPool::getNumberOfThisTypeNodes(NlsType type, int &prestarted,
                                            int &preconnected) {
  switch (type) {
    case TypeAsr:
      prestarted = _srRequests.prestartedRequests.size();
      break;
    case TypeRealTime:
      prestarted = _stRequests.prestartedRequests.size();
      break;
    case TypeTts:
      prestarted = _syRequests.prestartedRequests.size();
      break;
    case TypeStreamInputTts:
      prestarted = _fssRequests.prestartedRequests.size();
      break;
    default:
      prestarted = 0;
      break;
  }

  switch (type) {
    case TypeAsr:
      preconnected = _srRequests.preconnectedRequests.size();
      break;
    case TypeRealTime:
      preconnected = _stRequests.preconnectedRequests.size();
      break;
    case TypeTts:
      preconnected = _syRequests.preconnectedRequests.size();
      break;
    case TypeStreamInputTts:
      preconnected = _fssRequests.preconnectedRequests.size();
      break;
    default:
      preconnected = 0;
      break;
  }

  return Success;
}

int ConnectedPool::getNumberOfPreconnectedNodes(NlsType type) {
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  switch (type) {
    case TypeAsr:
      curPool = &_srRequests.preconnectedRequests;
      break;
    case TypeRealTime:
      curPool = &_stRequests.preconnectedRequests;
      break;
    case TypeTts:
      curPool = &_syRequests.preconnectedRequests;
      break;
    case TypeStreamInputTts:
      curPool = &_fssRequests.preconnectedRequests;
      break;
    default:
      break;
  }

  int count = 0;
  if (curPool) {
    size_t size = curPool->size();
    for (std::vector<struct ConnectedNodeProcess>::iterator it =
             curPool->begin();
         it != curPool->end(); ++it) {
      if (it->status == PreNodeConnected) {
        count++;
      }
    }  // for
  }
  return count;
}

int ConnectedPool::getNumberOfPrestartedNodes(NlsType type) {
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  switch (type) {
    case TypeAsr:
      curPool = &_srRequests.prestartedRequests;
      break;
    case TypeRealTime:
      curPool = &_stRequests.prestartedRequests;
      break;
    case TypeTts:
      curPool = &_syRequests.prestartedRequests;
      break;
    case TypeStreamInputTts:
      curPool = &_fssRequests.prestartedRequests;
      break;
    default:
      break;
  }

  int count = 0;
  if (curPool) {
    size_t size = curPool->size();
    for (std::vector<struct ConnectedNodeProcess>::iterator it =
             curPool->begin();
         it != curPool->end(); ++it) {
      if (it->status == PreNodeStarted) {
        count++;
      }
    }  // for
  }
  return count;
}

int ConnectedPool::initThisNodesPool(NlsType type) {
  // LOG_DEBUG("ConnectedPool(%p) initThisNodesPool ...", this);
  std::vector<struct ConnectedNodeProcess> *curPrestartedPool = NULL;
  std::vector<struct ConnectedNodeProcess> *curPreconnectedPool = NULL;
  switch (type) {
    case TypeAsr:
      curPrestartedPool = &_srRequests.prestartedRequests;
      curPreconnectedPool = &_srRequests.preconnectedRequests;
      break;
    case TypeRealTime:
      curPrestartedPool = &_stRequests.prestartedRequests;
      curPreconnectedPool = &_stRequests.preconnectedRequests;
      break;
    case TypeTts:
      curPrestartedPool = &_syRequests.prestartedRequests;
      curPreconnectedPool = &_syRequests.preconnectedRequests;
      break;
    case TypeStreamInputTts:
      curPrestartedPool = &_fssRequests.prestartedRequests;
      curPreconnectedPool = &_fssRequests.preconnectedRequests;
      break;
    default:
      break;
  }

  if (curPrestartedPool) {
    size_t size = curPrestartedPool->size();
    for (size_t i = size; i < _maxPreconnectedNumber; ++i) {
      struct ConnectedNodeProcess tmp;
      tmp.type = type;
      tmp.status = PreNodeToBeCreated;
      curPrestartedPool->push_back(tmp);
    }  // for
  }

  if (curPreconnectedPool) {
    size_t size = curPreconnectedPool->size();
    for (size_t i = size; i < _maxPreconnectedNumber; ++i) {
      struct ConnectedNodeProcess tmp;
      tmp.type = type;
      tmp.status = PreNodeToBeCreated;
      curPreconnectedPool->push_back(tmp);
    }  // for
  }

  return Success;
}

bool ConnectedPool::popOnePreconnectedNode(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
  LOG_DEBUG(
      "ConnectedPool(%p) popOnePreconnectedNode request(%p) with type(%d) "
      "...",
      this, request, type);
#endif

  std::list<int> *curList = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  switch (type) {
    case TypeAsr:
      curPool = &_srRequests.preconnectedRequests;
      curList = &_srRequests.prestartedIndexList;
      break;
    case TypeRealTime:
      curPool = &_stRequests.preconnectedRequests;
      curList = &_stRequests.preconnectedIndexList;
      break;
    case TypeTts:
      curPool = &_syRequests.preconnectedRequests;
      curList = &_syRequests.preconnectedIndexList;
      break;
    case TypeStreamInputTts:
      curPool = &_fssRequests.preconnectedRequests;
      curList = &_fssRequests.preconnectedIndexList;
      break;
    default:
      curPool = NULL;
      curList = NULL;
      break;
  }

  if (curPool) {
    int popIndex = popListFront(*curList);
    std::vector<struct ConnectedNodeProcess>::iterator it;
    for (it = curPool->begin() + popIndex; it != curPool->end(); ++it) {
      if (it->status == PreNodeConnected) {
        if (it->canPick) {        /*SSL处于空闲, 可取走SSL*/
          bool equalFlag = false; /* 判断request参数是否相同 */
          INlsRequestParam *paramsInRequest = request->getRequestParam();
          INlsRequestParam *paramsInPool = it->request->getRequestParam();
          if (paramsInRequest && paramsInPool) {
            switch (type) {
              case TypeAsr:
                equalFlag = *paramsInPool == *paramsInRequest &&
                            it->sdkName == paramsInRequest->getSdkName();
                break;
              case TypeRealTime:
                equalFlag = *paramsInPool == *paramsInRequest &&
                            it->sdkName == paramsInRequest->getSdkName();
                break;
              case TypeTts:
                equalFlag = *paramsInPool == *paramsInRequest &&
                            it->sdkName == paramsInRequest->getSdkName() &&
                            it->ttsVersion == paramsInRequest->getVersion();
                break;
              case TypeStreamInputTts:
                equalFlag = *paramsInPool == *paramsInRequest &&
                            it->sdkName == paramsInRequest->getSdkName();
                break;
              default:
                break;
            }
          } else {
            LOG_ERROR(
                "ConnectedPool(%p) input invalid request(%p) params(%p) and "
                "item request(%p) params(%p).",
                this, request, paramsInRequest, it->request, paramsInPool);
          }
          if (equalFlag) { /* request参数相同*/
            int index = std::distance(curPool->begin(), it);
            // fill node
            SSLconnect *oldSSL = request->getConnectNode()->getSslHandle();
            evutil_socket_t oldSocketFd =
                request->getConnectNode()->getSocketFd();
            delete oldSSL;
            evutil_closesocket(oldSocketFd);
            urlAddress *dstUrlAddress =
                request->getConnectNode()->getUrlAddressPointer();
            urlAddress *srcUrlAddress =
                it->request->getConnectNode()->getUrlAddressPointer();
            request->getConnectNode()->setSslHandle(it->sslHandle);
            request->getConnectNode()->setSocketFd(it->socketFd);
            memcpy(dstUrlAddress, srcUrlAddress, sizeof(struct urlAddress));
            it->canPick = false;
            it->curRequest = request;
            it->curRequestInvalid = false;
            request->getConnectNode()->setPoolIndex(index);
            removeElement(*curList, index);

            LOG_INFO(
                "ConnectedPool(%p) popOnePreconnectedNode request(%p) "
                "node(%p) "
                "with "
                "type(%d) index(%d/%d) done, reset SSL handle %p to %p and "
                "reset "
                "SocketFd %d "
                "to %d.",
                this, request, request->getConnectNode(), type,
                std::distance(curPool->begin(), it), curPool->size(), oldSSL,
                request->getConnectNode()->getSslHandle(), oldSocketFd,
                request->getConnectNode()->getSocketFd());
            return true;
          }  // equalFlag
        } else {
          // LOG_DEBUG(
          //     "ConnectedPool(%p) popOnePreconnectedNode request(%p) node(%p)
          //     " "is not ready ...", this, request,
          //     request->getConnectNode());
        }
      }
    }  // for
  }    // curPool
  return false;
}

bool ConnectedPool::popOnePrestartedNode(INlsRequest *request, NlsType type) {
#ifdef ENABLE_NLS_DEBUG_2
  LOG_DEBUG(
      "ConnectedPool(%p) popOnePrestartedNode request(%p) with type(%d) "
      "...",
      this, request, type);
#endif

  std::list<int> *curList = NULL;
  std::vector<struct ConnectedNodeProcess> *curPool = NULL;
  switch (type) {
    case TypeAsr:
      curPool = &_srRequests.prestartedRequests;
      curList = &_srRequests.prestartedIndexList;
      break;
    case TypeRealTime:
      curPool = &_stRequests.prestartedRequests;
      curList = &_stRequests.prestartedIndexList;
      break;
    case TypeTts:
      curPool = &_syRequests.prestartedRequests;
      curList = &_syRequests.prestartedIndexList;
      break;
    case TypeStreamInputTts:
      curPool = &_fssRequests.prestartedRequests;
      curList = &_fssRequests.prestartedIndexList;
      break;
    default:
      curPool = NULL;
      curList = NULL;
      break;
  }

  if (curPool) {
    int popIndex = popListFront(*curList);
    std::vector<struct ConnectedNodeProcess>::iterator it;
    for (it = curPool->begin() + popIndex; it != curPool->end(); ++it) {
      if (it->status == PreNodeStarted) {
        if (it->canPick) {        /*SSL处于空闲, 可取走SSL*/
          bool equalFlag = false; /* 判断request参数是否相同 */
          INlsRequestParam *paramsInPool = it->request->getRequestParam();
          INlsRequestParam *paramsInRequest = request->getRequestParam();
          if (paramsInRequest && paramsInPool) {
            switch (type) {
              case TypeAsr:
                equalFlag =
                    *paramsInPool == *paramsInRequest &&
                    paramsInPool->getSdkName() == paramsInRequest->getSdkName();
                break;
              case TypeRealTime:
                equalFlag =
                    *paramsInPool == *paramsInRequest &&
                    paramsInPool->getSdkName() == paramsInRequest->getSdkName();
                break;
              case TypeTts:
                equalFlag =
                    *paramsInPool == *paramsInRequest &&
                    paramsInPool->getSdkName() ==
                        paramsInRequest->getSdkName() &&
                    paramsInPool->getVersion() == paramsInRequest->getVersion();
                break;
              case TypeStreamInputTts:
                equalFlag =
                    *paramsInPool == *paramsInRequest &&
                    paramsInPool->getSdkName() == paramsInRequest->getSdkName();
                break;
              default:
                break;
            }
          } else {
            LOG_ERROR(
                "ConnectedPool(%p) input invalid request(%p) params(%p) and "
                "item request(%p) params(%p).",
                this, request, paramsInRequest, it->request, paramsInPool);
          }
          if (equalFlag) { /* request参数相同*/
            int index = std::distance(curPool->begin(), it);
            // fill node
            SSLconnect *oldSSL = request->getConnectNode()->getSslHandle();
            evutil_socket_t oldSocketFd =
                request->getConnectNode()->getSocketFd();
            delete oldSSL;
            evutil_closesocket(oldSocketFd);
            urlAddress *dstUrlAddress =
                request->getConnectNode()->getUrlAddressPointer();
            urlAddress *srcUrlAddress =
                it->request->getConnectNode()->getUrlAddressPointer();
            request->getConnectNode()->setSslHandle(it->sslHandle);
            request->getConnectNode()->setSocketFd(it->socketFd);
            memcpy(dstUrlAddress, srcUrlAddress, sizeof(struct urlAddress));
            it->canPick = false;
            it->curRequest = request;
            it->curRequestInvalid = false;
            request->getConnectNode()->setPoolIndex(index);
            removeElement(*curList, index);

            LOG_INFO(
                "ConnectedPool(%p) popOnePrestartedNode request(%p) node(%p) "
                "with "
                "type(%d) index(%d/%d) done, reset SSL handle %p to %p and "
                "reset "
                "SocketFd %d "
                "to %d.",
                this, request, request->getConnectNode(), type, index,
                curPool->size(), oldSSL,
                request->getConnectNode()->getSslHandle(), oldSocketFd,
                request->getConnectNode()->getSocketFd());
            return true;
          }  // equalFlag
        }    // canPick
      } else {
        // LOG_DEBUG(
        //     "ConnectedPool(%p) popOnePrestartedNode(index:%d) request(%p) "
        //     "node(%p) with SSL handle(%p) and socketFd(%d) "
        //     "is not ready, cannot pick ...",
        //     this, std::distance(curPool->begin(), it), request,
        //     request->getConnectNode(),
        //     request->getConnectNode()->getSslHandle(),
        //     request->getConnectNode()->getSocketFd());
      }
    }  // for
  }    // curPool
  return false;
}

void ConnectedPool::deletePreNode(
    std::vector<struct ConnectedNodeProcess> *pool) {
  if (pool) {
    for (std::vector<struct ConnectedNodeProcess>::iterator it = pool->begin();
         it != pool->end(); ++it) {
      if (it->request) {
        delete it->request;
        it->request = NULL;
      }
      it->socketFd = INVALID_SOCKET;
      it->sslHandle = NULL;
      it->curRequest = NULL;
      it->canPick = false;
    }  // for

    pool->clear();
  }
}

int ConnectedPool::timeoutPrestartedNode(
    std::vector<struct ConnectedNodeProcess> *pool) {
  int releaseCount = 0;
  std::vector<struct ConnectedNodeProcess>::iterator it;
  for (it = pool->begin(); it != pool->end(); ++it) {
    if (it->status == PreNodeStarted) {
      uint64_t curTimestamp = utility::TextUtils::GetTimestampMs();
      // LOG_DEBUG(
      //     "it->request:%p, "
      //     "it->curRequest->getConnectNode():%p",
      //     it->request);
      // update workableTimestamp from current request
      if (!it->curRequestInvalid && it->curRequest &&
          it->curRequest->getConnectNode() &&
          it->curRequest->getConnectNode()->getNodeProcess() &&
          it->curRequest->getConnectNode()
                  ->getNodeProcess()
                  ->last_op_timestamp_ms > 0) {
        // LOG_DEBUG(
        //     "it->curRequest:%p, "
        //     "it->curRequest->getConnectNode():%p",
        //     it->curRequest, it->curRequest->getConnectNode());
        it->workableTimestamp = it->curRequest->getConnectNode()
                                    ->getNodeProcess()
                                    ->last_op_timestamp_ms;
      }
      uint64_t gapTimestamp = curTimestamp - it->workableTimestamp;
      bool timeout = gapTimestamp >= _prerequestedTimeoutMs;
      bool tokenTimeout = it->tokenExpirationTimestamp > 0 &&
                          curTimestamp >= it->tokenExpirationTimestamp;
      if (timeout || tokenTimeout) {
        if (it->curRequest == NULL) {
          LOG_WARN(
              "Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
              "prestarted "
              "request(%p) SSL handler(%p) index(%d) "
              "because of %llums. start timestamp is %llu(%s), last "
              "operate timestamp is %llu(%s), and token expiration timestamp "
              "is %llu(%s).",
              this, pool, pool->begin(), pool->end(), it->request,
              it->sslHandle, std::distance(pool->begin(), it), gapTimestamp,
              it->startTimestamp,
              utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
              it->workableTimestamp,
              utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
              it->tokenExpirationTimestamp,
              utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
                  .c_str());

          // empty this node, delete request later
          it->status = PreNodeToBeCreated;
          it->startTimestamp = 0;
          it->workableTimestamp = 0;
          it->tokenExpirationTimestamp = 0;
          it->socketFd = INVALID_SOCKET;
          it->sslHandle = NULL;
          it->ttsVersion = 0;
          it->sdkName.clear();
          it->startedResponse.clear();
          it->canPick = false;
          it->curRequestInvalid = false;
          it->shouldRelease = true;
          if (!tokenTimeout) {
            it->shouldPreconnect = true;
          }
          releaseCount++;
        } else {
          LOG_WARN(
              "Pool(%p:(%p)%p-%p) connectPoolEventCallback prestarted "
              "request(%p) curRequest(%p) idle "
              "timeout:%llums. start timestamp is %llu(%s). But request(%p) is "
              "using, check later ...",
              this, pool, pool->begin(), pool->end(), it->request,
              it->curRequest, gapTimestamp, it->startTimestamp,
              utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
              it->curRequest);
        }
      } else {
        // valid connection
        int pingResult = Success;
        if (it->request && it->request->getConnectNode()) {
          pingResult = it->request->getConnectNode()->syncPingCmd();
        }
        if (pingResult != Success) {
          LOG_WARN(
              "Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
              "prestarted "
              "request(%p) SSL handler(%p) index(%d) "
              "because of ping failed. start timestamp is %llu(%s), last "
              "operate timestamp is %llu(%s), and token expiration timestamp "
              "is %llu(%s).",
              this, pool, pool->begin(), pool->end(), it->request,
              it->sslHandle, std::distance(pool->begin(), it),
              it->startTimestamp,
              utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
              it->workableTimestamp,
              utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
              it->tokenExpirationTimestamp,
              utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
                  .c_str());

          // empty this node, delete request later
          it->status = PreNodeToBeCreated;
          it->startTimestamp = 0;
          it->workableTimestamp = 0;
          it->socketFd = INVALID_SOCKET;
          it->sslHandle = NULL;
          it->ttsVersion = 0;
          it->sdkName.clear();
          it->startedResponse.clear();
          it->canPick = false;
          it->shouldPreconnect = false;
          it->curRequestInvalid = false;
          it->shouldRelease = true;
          releaseCount++;
        }
        // LOG_DEBUG(
        //     "Pool(%p) connectPoolEventCallback "
        //     "request(%p) %llu - %llu = %llu.",
        //     this, it->request, curTimestamp, it->workableTimestamp,
        //     gapTimestamp);
      }
    }
  }  // for

  return releaseCount;
}

int ConnectedPool::timeoutPreconnectedNode(
    std::vector<struct ConnectedNodeProcess> *pool) {
  int releaseCount = 0;
  std::vector<struct ConnectedNodeProcess>::iterator it;
  for (it = pool->begin(); it != pool->end(); ++it) {
    if (it->status == PreNodeConnected) {
      uint64_t curTimestamp = utility::TextUtils::GetTimestampMs();
      // update workableTimestamp from current request
      if (!it->curRequestInvalid && it->curRequest &&
          it->curRequest->getConnectNode() &&
          it->curRequest->getConnectNode()->getNodeProcess() &&
          it->curRequest->getConnectNode()
                  ->getNodeProcess()
                  ->last_op_timestamp_ms > 0) {
        it->workableTimestamp = it->curRequest->getConnectNode()
                                    ->getNodeProcess()
                                    ->last_op_timestamp_ms;
      }
      uint64_t gapTimestamp = curTimestamp - it->workableTimestamp;
      bool timeout = gapTimestamp >= _preconnectedTimeoutMs;
      bool tokenTimeout = it->tokenExpirationTimestamp > 0 &&
                          curTimestamp >= it->tokenExpirationTimestamp;
      if (timeout || tokenTimeout) {
        if (it->curRequest == NULL) {
          LOG_WARN(
              "Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
              "preconnected "
              "request(%p) SSL handler(%p) index(%d) "
              "because of %llums. start timestamp is %llu(%s), last "
              "operate timestamp is %llu(%s), and token expiration timestamp "
              "is %llu(%s).",
              this, pool, pool->begin(), pool->end(), it->request,
              it->sslHandle, std::distance(pool->begin(), it), gapTimestamp,
              it->startTimestamp,
              utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
              it->workableTimestamp,
              utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
              it->tokenExpirationTimestamp,
              utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
                  .c_str());

          // empty this node, delete request later
          it->status = PreNodeToBeCreated;
          it->startTimestamp = 0;
          it->workableTimestamp = 0;
          it->socketFd = INVALID_SOCKET;
          it->sslHandle = NULL;
          it->ttsVersion = 0;
          it->startedResponse.clear();
          it->canPick = false;
          it->shouldRelease = true;
          if (!tokenTimeout) {
            it->shouldPreconnect = true;
          }
          it->curRequestInvalid = false;
          releaseCount++;
        } else {
          LOG_WARN(
              "Pool(%p:(%p)%p-%p) connectPoolEventCallback preconnected "
              "request(%p) curRequest(%p) idle "
              "timeout:%llums. start timestamp is %llu(%s). But request(%p) is "
              "using, check later ...",
              this, pool, pool->begin(), pool->end(), it->request,
              it->curRequest, gapTimestamp, it->startTimestamp,
              utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
              it->curRequest);
        }
      } else {
        // valid connection
        int pingResult = Success;
        if (it->request && it->request->getConnectNode()) {
          pingResult = it->request->getConnectNode()->syncPingCmd();
        }
        if (pingResult != Success) {
          LOG_WARN(
              "Pool(%p:(%p)%p-%p) connectPoolEventCallback should release "
              "preconnected "
              "request(%p) SSL handler(%p) index(%d) "
              "because of ping failed(%d). start timestamp is %llu(%s), last "
              "operate timestamp is %llu(%s), and token expiration timestamp "
              "is %llu(%s).",
              this, pool, pool->begin(), pool->end(), it->request,
              it->sslHandle, std::distance(pool->begin(), it), pingResult,
              it->startTimestamp,
              utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
              it->workableTimestamp,
              utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
              it->tokenExpirationTimestamp,
              utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
                  .c_str());

          // empty this node, delete request later
          it->status = PreNodeToBeCreated;
          it->startTimestamp = 0;
          it->workableTimestamp = 0;
          it->tokenExpirationTimestamp = 0;
          it->socketFd = INVALID_SOCKET;
          it->sslHandle = NULL;
          it->ttsVersion = 0;
          it->sdkName.clear();
          it->startedResponse.clear();
          it->canPick = false;
          it->shouldPreconnect = false;
          it->curRequestInvalid = false;
          it->shouldRelease = true;
          releaseCount++;
        }
        // LOG_DEBUG(
        //     "Pool(%p) connectPoolEventCallback "
        //     "request(%p) %llu - %llu = %llu.",
        //     this, it->request, curTimestamp, it->workableTimestamp,
        //     gapTimestamp);
      }
    }
  }  // for

  return releaseCount;
}

void ConnectedPool::deleteOrPreconnectNodeShouldReleased(
    std::vector<struct ConnectedNodeProcess> *pool, std::string name) {
  // LOG_DEBUG("Pool(%p:(%p)) Name(%s) begin ...", this, pool, name.c_str());

  std::vector<struct ConnectedNodeProcess>::iterator it;
  for (it = pool->begin(); it != pool->end(); ++it) {
    // LOG_DEBUG(
    //     "Pool(%p:(%p)) Name(%s) Request(%p) index(%d) shouldRelease(%s) "
    //     "shouldPreconnect(%s) ...",
    //     this, pool, name.c_str(), it->request, std::distance(pool->begin(),
    //     it), it->shouldRelease ? "true" : "false", it->shouldPreconnect ?
    //     "true" : "false");

    if (it->shouldRelease) {
      it->shouldRelease = false;
      if (it->shouldPreconnect) {
        LOG_INFO(
            "Request(%p) %s index(%d) needs to preconnect now and then delete "
            "...",
            it->request, name.c_str(), std::distance(pool->begin(), it));
        it->shouldPreconnect = false;
        preconnectNodeByRequest(it->request);  // it->request is old request
        // LOG_DEBUG("Request(%p) push into pool finish.", it->request);
      } else {
        LOG_INFO(
            "Request(%p) %s index(%d) needs to be deleted now because the "
            "timeout "
            "...",
            it->request, name.c_str(), std::distance(pool->begin(), it));
      }

      // delete old request
      if (it->request) {
        bool release_lock_ret = true;
        NlsClientImpl *cur_instance =
            it->request->getConnectNode()->getInstance();
        if (cur_instance != NULL) {
          MUTEX_TRY_LOCK_WITH_TAG(cur_instance->_mtxReleaseRequestGuard, 2000,
                                  release_lock_ret, it->request);
          if (!release_lock_ret) {
            LOG_ERROR("Request(%p) lock destroy failed, deadlock has occurred",
                      it->request);
          }
        } else {
          LOG_ERROR("Request(%p) just only created ...", it->request);
          release_lock_ret = false;
        }

        delete it->request;

        if (release_lock_ret) {
          MUTEX_UNLOCK_WITH_TAG(cur_instance->_mtxReleaseRequestGuard,
                                it->request);
        }
      }

      // empty this node
      it->clearNode();
      // it->status = PreNodeToBeCreated;
      // it->startTimestamp = 0;
      // it->workableTimestamp = 0;
      // it->socketFd = INVALID_SOCKET;
      // it->sslHandle = NULL;
      // it->ttsVersion = 0;
      // it->startedResponse = "";
      // it->curRequest = NULL;
      // it->canPick = false;
      // it->request = NULL;
      // it->isAbnormal = false;
    }  // shouldRelease is true
  }    // for

  // LOG_DEBUG("Pool(%p:(%p)) Name(%s) done.", this, pool, name.c_str());
}

void ConnectedPool::preconnectNodeByRequest(INlsRequest *request) {
  if (request) {
    INlsRequest *newRequest = NULL;
    INlsRequestParam *requestParam = request->getRequestParam();
    ConnectNode *node = request->getConnectNode();
    NlsType type = requestParam->_mode;

    LOG_DEBUG(
        "ConnectedPool(%p) preconnectNodeByRequest old request(%p) with "
        "type:%d.",
        this, request, type);

    switch (type) {
      case TypeAsr:
        newRequest = NlsClient::getInstance()->createRecognizerRequest(
            requestParam->getSdkName().c_str(), node->isLongConnection());
        break;
      case TypeRealTime:
        newRequest = NlsClient::getInstance()->createTranscriberRequest(
            requestParam->getSdkName().c_str(), node->isLongConnection());
        break;
      case TypeTts:
        newRequest = NlsClient::getInstance()->createSynthesizerRequest(
            (TtsVersion)requestParam->getVersion(),
            requestParam->getSdkName().c_str(), node->isLongConnection());
        break;
      case TypeStreamInputTts:
        newRequest = NlsClient::getInstance()->createFlowingSynthesizerRequest(
            requestParam->getSdkName().c_str(), node->isLongConnection());
        break;
      default:
        break;
    }  // switch

    if (newRequest) {
      LOG_INFO(
          "ConnectedPool(%p) create new request(%p) from old request(%p) for "
          "PreconnectedPool.",
          this, newRequest, request);
      *(newRequest->getRequestParam()) = *(request->getRequestParam());
      newRequest->getConnectNode()->usePreNodeStartStepByStep(true);
      int ret = NlsEventNetWork::_eventClient->startInner(newRequest);
      bool result = false;
      if (ret == Success) {
        result = newRequest->getConnectNode()->directLinkIpFromCache();
      }
      if (result) {
        result = pushPreconnectedNode(newRequest, type, true);
        if (result) {
          finishPushPreNode(type, newRequest->getConnectNode()->getSocketFd(),
                            newRequest->getConnectNode()->getSslHandle(),
                            newRequest->getConnectNode()->getPoolIndex(),
                            newRequest);
        } else {
          deletePreNodeBySSL(newRequest->getConnectNode()->getSslHandle(),
                             type);
        }
      } else {
        switch (type) {
          case TypeAsr:
            NlsClient::getInstance()->releaseRecognizerRequest(
                (SpeechRecognizerRequest *)newRequest);
            break;
          case TypeRealTime:
            NlsClient::getInstance()->releaseTranscriberRequest(
                (SpeechTranscriberRequest *)newRequest);
            break;
          case TypeTts:
            NlsClient::getInstance()->releaseSynthesizerRequest(
                (SpeechSynthesizerRequest *)newRequest);
            break;
          case TypeStreamInputTts:
            NlsClient::getInstance()->releaseFlowingSynthesizerRequest(
                (FlowingSynthesizerRequest *)newRequest);
            break;
          default:
            break;
        }  // switch
      }
    }
  } else {
    LOG_ERROR("ConnectedPool(%p) preconnectNodeByRequest request is null.",
              this, request);
  }
}

void ConnectedPool::showEveryNode(
    std::vector<struct ConnectedNodeProcess> *pool, std::string name) {
  LOG_DEBUG("==>> ConnectedPool(%p:(%p)%p-%p) show every node in pool %s ...",
            this, pool, pool->begin(), pool->end(), name.c_str());

  LOG_DEBUG(" %s =>", name.c_str());

  std::vector<struct ConnectedNodeProcess>::iterator it;
  for (it = pool->begin(); it != pool->end(); ++it) {
    LOG_DEBUG("   index(%d) status:%s", std::distance(pool->begin(), it),
              getStatusStr(it->status).c_str());
    LOG_DEBUG("    request:%p curRequest:%p", it->request, it->curRequest);
    if (it->request) {
      LOG_DEBUG("    node:%p", it->request->getConnectNode());
    }
    LOG_DEBUG("    socketFd:%d sslHandle:%p", it->socketFd, it->sslHandle);
    if (it->request && it->request->getConnectNode()) {
      LOG_DEBUG("    node socketFd:%d sslHandle:%p",
                it->request->getConnectNode()->getSocketFd(),
                it->request->getConnectNode()->getSslHandle());
    }
    LOG_DEBUG(
        "    shouldRelease:%s shouldPreconnect:%s isAbnormal:%s "
        "canPick:%s curRequestInvalid:%s",
        it->shouldRelease ? "true" : "false",
        it->shouldPreconnect ? "true" : "false",
        it->isAbnormal ? "true" : "false", it->canPick ? "true" : "false",
        it->curRequestInvalid ? "true" : "false");
    LOG_DEBUG("    start:%s workable:%s tokenExpiration:%s",
              utility::TextUtils::GetTimeFromMs(it->startTimestamp).c_str(),
              utility::TextUtils::GetTimeFromMs(it->workableTimestamp).c_str(),
              utility::TextUtils::GetTimeFromMs(it->tokenExpirationTimestamp)
                  .c_str());
  }

  LOG_DEBUG("<<== ConnectedPool(%p) show every node done", this);
}

std::string ConnectedPool::getStatusStr(ConnectedStatus status) {
  std::string result = "unknown";
  switch (status) {
    case PreNodeInvalid:
      result = "PreNodeInvalid";
      break;
    case PreNodeToBeCreated:
      result = "PreNodeToBeCreated";
      break;
    case PreNodeCreated:
      result = "PreNodeCreated";
      break;
    case PreNodeConnected:
      result = "PreNodeConnected";
      break;
    case PreNodeStarted:
      result = "PreNodeStarted";
      break;
    default:
      break;
  }
  return result;
}

void ConnectedPool::insertListInOrder(std::list<int> &lst, int a) {
  std::list<int>::iterator it = std::lower_bound(lst.begin(), lst.end(), a);
  lst.insert(it, a);
}

void ConnectedPool::removeElement(std::list<int> &lst, int a) { lst.remove(a); }

int ConnectedPool::popListFront(std::list<int> &lst) {
  int index = 0;
  if (!lst.empty()) {
    index = lst.front();
    lst.pop_front();
  }
  return index;
}

}  // namespace AlibabaNls