int NlsEventNetWork::start()

in nlsCppSdk/transport/nlsEventNetWork.cpp [206:392]


int NlsEventNetWork::start(INlsRequest *request) {
  MUTEX_LOCK(_mtxThread);

  if (_eventClient == NULL) {
    LOG_ERROR(
        "NlsEventNetWork has destroyed, please invoke startWorkThread() "
        "first.");
    MUTEX_UNLOCK(_mtxThread);
    return -(EventClientEmpty);
  }

  ConnectNode *node = request->getConnectNode();
  if (node == NULL) {
    LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!",
              request);
    MUTEX_UNLOCK(_mtxThread);
    return -(NodeEmpty);
  }

#ifdef ENABLE_PRECONNECTED_POOL
  if (_preconnectedPool) {
    node->usePreconnection(true);
    node->useLongConnection(false);
  } else {
    node->usePreconnection(false);
  }
#endif

  /* 长链接模式下, 若为Completed状态, 则等待其进入Closed后, 再重置状态. */
  if (node->isLongConnection()) {
    int try_count = 500;
    while (try_count-- > 0 && node->getConnectNodeStatus() == NodeCompleted) {
#ifdef _MSC_VER
      Sleep(1);
#else
      usleep(1000);
#endif
    }

    if (node->getConnectNodeStatus() == NodeClosed) {
      LOG_DEBUG(
          "Node:%p current is NodeClosed and longConnection mode, reset "
          "status.",
          node);
      node->initAllStatus();
    }
  }

  /*
   * invoke start
   * Node处于刚创建完状态, 且处于非退出状态, 则可进行start操作.
   */
  if (node->getConnectNodeStatus() == NodeCreated &&
      node->getExitStatus() == ExitInvalid) {
    node->setConnectNodeStatus(NodeInvoking);
#ifdef ENABLE_REQUEST_RECORDING
    node->updateNodeProcess("start", NodeInvoking, true, 0);
#endif

    int num = request->getThreadNumber();
    if (num < 0) {
      num = selectThreadNumber();
    }
    if (num < 0) {
      node->setConnectNodeStatus(NodeCreated);
      MUTEX_UNLOCK(_mtxThread);
#ifdef ENABLE_REQUEST_RECORDING
      node->updateNodeProcess("start", NodeCreated, false, 0);
#endif
      return -(SelectThreadFailed);
    } else {
      request->setThreadNumber(num);
    }

    LOG_DEBUG("Request(%p) Node(%p) select NO:%d Total:%d thread.", request,
              node, num, _workThreadsNumber);

    WorkThread *work_thread = &_workThreadArray[num];
    node->setEventThread(work_thread);
    node->getEventThread()->setInstance(_instance);
    node->setInstance(_instance);
    node->getEventThread()->setAddrInFamily(_addrInFamily);
    if (strnlen(_directIp, 64) > 0) {
      node->getEventThread()->setDirectHost(_directIp);
    }
    node->getEventThread()->setUseSysGetAddrInfo(_enableSysGetAddr);
    node->setSyncCallTimeout(_syncCallTimeoutMs);
    work_thread->updateParameters(node);

#ifdef ENABLE_PRECONNECTED_POOL
    ConnectedStatus getPrestartedNode = PreNodeInvalid;
    uint64_t try_begin_ms = utility::TextUtils::GetTimestampMs();
    if (_preconnectedPool) {
      getPrestartedNode = (ConnectedStatus)node->tryToGetPreconnection();
    }
    uint64_t try_end_ms = utility::TextUtils::GetTimestampMs();
    if (getPrestartedNode == PreNodeConnected) {
      // 获得了preconnected节点, 需要发起start
      LOG_DEBUG("Request(%p) node(%p) get a preconnected node ...", request,
                node);
#ifdef ENABLE_REQUEST_RECORDING
      node->getNodeProcess()->connect_type = ConnectWithPreconnectedNodePool;
#endif
      int event_ret = event_add(node->getStartWithPoolEvent(true), NULL);
      event_active(node->getStartWithPoolEvent(), EV_READ, 0);
    } else if (getPrestartedNode == PreNodeStarted) {
      // 获得了prestarted节点, 直接开始工作
      LOG_DEBUG("Request(%p) node(%p) get a prestarted node ...", request,
                node);
#ifdef ENABLE_REQUEST_RECORDING
      node->getNodeProcess()->connect_type = ConnectWithPrestartedNodePool;
#endif
      int event_ret = event_add(node->getStartWithPoolEvent(true), NULL);
      event_active(node->getStartWithPoolEvent(), EV_READ, 0);
      LOG_DEBUG(
          "Request(%p) node(%p) get a prestarted node "
          "tryToGetPreconnection latency %llums",
          request, node, try_end_ms - try_begin_ms);
    } else
#endif
    {
      LOG_DEBUG(
          "Request(%p) node(%p) ready to invoke event_add LaunchEvent ...",
          request, node);
      int event_ret = event_add(node->getLaunchEvent(true), NULL);
      if (event_ret != Success) {
        LOG_ERROR("Request(%p) node(%p) invoking event_add failed(%d).",
                  request, node, event_ret);
        MUTEX_UNLOCK(_mtxThread);
#ifdef ENABLE_REQUEST_RECORDING
        node->updateNodeProcess("start", NodeCreated, false, 0);
#endif
        return -(InvokeStartFailed);
      } else {
        LOG_DEBUG(
            "Request(%p) node(%p) invoking event_add success, ready to launch "
            "request.",
            request, node);
      }

      event_active(node->getLaunchEvent(), EV_READ, 0);
    }

    node->initNlsEncoder();

    if (node->getSyncCallTimeout() > 0) {
      node->waitInvokeFinish();
      int error_code = node->getErrorCode();
      if (error_code != Success) {
        MUTEX_UNLOCK(_mtxThread);
#ifdef ENABLE_REQUEST_RECORDING
        node->updateNodeProcess("start", NodeCreated, false, 0);
#endif
        return -(error_code);
      }
    }
#ifdef ENABLE_REQUEST_RECORDING
    node->updateNodeProcess("start", NodeCreated, false, 0);
#endif

  } else if (node->getExitStatus() == ExitInvalid &&
             node->getConnectNodeStatus() > NodeCreated &&
             node->getConnectNodeStatus() < NodeFailed) {
    LOG_WARN(
        "Request(%p) node(%p) has invoked start, node status:%s, exit "
        "status:%s. skip ...",
        request, node, node->getConnectNodeStatusString().c_str(),
        node->getExitStatusString().c_str());

    MUTEX_UNLOCK(_mtxThread);
    return Success;
  } else {
    LOG_ERROR(
        "Request(%p) node(%p) invoke start failed, current status is invalid. "
        "node status:%s, exit status:%s.",
        request, node, node->getConnectNodeStatusString().c_str(),
        node->getExitStatusString().c_str());

    node->setConnectNodeStatus(NodeCreated);
    MUTEX_UNLOCK(_mtxThread);
    return -(InvokeStartFailed);
  }

  MUTEX_UNLOCK(_mtxThread);
  LOG_DEBUG("Request(%p) node(%p) invoke start success.", request, node);
  return Success;
}