in nlsCppSdk/transport/nlsEventNetWork.cpp [206:392]
int NlsEventNetWork::start(INlsRequest *request) {
MUTEX_LOCK(_mtxThread);
if (_eventClient == NULL) {
LOG_ERROR(
"NlsEventNetWork has destroyed, please invoke startWorkThread() "
"first.");
MUTEX_UNLOCK(_mtxThread);
return -(EventClientEmpty);
}
ConnectNode *node = request->getConnectNode();
if (node == NULL) {
LOG_ERROR("The node of request(%p) is nullptr, you have destroyed request!",
request);
MUTEX_UNLOCK(_mtxThread);
return -(NodeEmpty);
}
#ifdef ENABLE_PRECONNECTED_POOL
if (_preconnectedPool) {
node->usePreconnection(true);
node->useLongConnection(false);
} else {
node->usePreconnection(false);
}
#endif
/* 长链接模式下, 若为Completed状态, 则等待其进入Closed后, 再重置状态. */
if (node->isLongConnection()) {
int try_count = 500;
while (try_count-- > 0 && node->getConnectNodeStatus() == NodeCompleted) {
#ifdef _MSC_VER
Sleep(1);
#else
usleep(1000);
#endif
}
if (node->getConnectNodeStatus() == NodeClosed) {
LOG_DEBUG(
"Node:%p current is NodeClosed and longConnection mode, reset "
"status.",
node);
node->initAllStatus();
}
}
/*
* invoke start
* Node处于刚创建完状态, 且处于非退出状态, 则可进行start操作.
*/
if (node->getConnectNodeStatus() == NodeCreated &&
node->getExitStatus() == ExitInvalid) {
node->setConnectNodeStatus(NodeInvoking);
#ifdef ENABLE_REQUEST_RECORDING
node->updateNodeProcess("start", NodeInvoking, true, 0);
#endif
int num = request->getThreadNumber();
if (num < 0) {
num = selectThreadNumber();
}
if (num < 0) {
node->setConnectNodeStatus(NodeCreated);
MUTEX_UNLOCK(_mtxThread);
#ifdef ENABLE_REQUEST_RECORDING
node->updateNodeProcess("start", NodeCreated, false, 0);
#endif
return -(SelectThreadFailed);
} else {
request->setThreadNumber(num);
}
LOG_DEBUG("Request(%p) Node(%p) select NO:%d Total:%d thread.", request,
node, num, _workThreadsNumber);
WorkThread *work_thread = &_workThreadArray[num];
node->setEventThread(work_thread);
node->getEventThread()->setInstance(_instance);
node->setInstance(_instance);
node->getEventThread()->setAddrInFamily(_addrInFamily);
if (strnlen(_directIp, 64) > 0) {
node->getEventThread()->setDirectHost(_directIp);
}
node->getEventThread()->setUseSysGetAddrInfo(_enableSysGetAddr);
node->setSyncCallTimeout(_syncCallTimeoutMs);
work_thread->updateParameters(node);
#ifdef ENABLE_PRECONNECTED_POOL
ConnectedStatus getPrestartedNode = PreNodeInvalid;
uint64_t try_begin_ms = utility::TextUtils::GetTimestampMs();
if (_preconnectedPool) {
getPrestartedNode = (ConnectedStatus)node->tryToGetPreconnection();
}
uint64_t try_end_ms = utility::TextUtils::GetTimestampMs();
if (getPrestartedNode == PreNodeConnected) {
// 获得了preconnected节点, 需要发起start
LOG_DEBUG("Request(%p) node(%p) get a preconnected node ...", request,
node);
#ifdef ENABLE_REQUEST_RECORDING
node->getNodeProcess()->connect_type = ConnectWithPreconnectedNodePool;
#endif
int event_ret = event_add(node->getStartWithPoolEvent(true), NULL);
event_active(node->getStartWithPoolEvent(), EV_READ, 0);
} else if (getPrestartedNode == PreNodeStarted) {
// 获得了prestarted节点, 直接开始工作
LOG_DEBUG("Request(%p) node(%p) get a prestarted node ...", request,
node);
#ifdef ENABLE_REQUEST_RECORDING
node->getNodeProcess()->connect_type = ConnectWithPrestartedNodePool;
#endif
int event_ret = event_add(node->getStartWithPoolEvent(true), NULL);
event_active(node->getStartWithPoolEvent(), EV_READ, 0);
LOG_DEBUG(
"Request(%p) node(%p) get a prestarted node "
"tryToGetPreconnection latency %llums",
request, node, try_end_ms - try_begin_ms);
} else
#endif
{
LOG_DEBUG(
"Request(%p) node(%p) ready to invoke event_add LaunchEvent ...",
request, node);
int event_ret = event_add(node->getLaunchEvent(true), NULL);
if (event_ret != Success) {
LOG_ERROR("Request(%p) node(%p) invoking event_add failed(%d).",
request, node, event_ret);
MUTEX_UNLOCK(_mtxThread);
#ifdef ENABLE_REQUEST_RECORDING
node->updateNodeProcess("start", NodeCreated, false, 0);
#endif
return -(InvokeStartFailed);
} else {
LOG_DEBUG(
"Request(%p) node(%p) invoking event_add success, ready to launch "
"request.",
request, node);
}
event_active(node->getLaunchEvent(), EV_READ, 0);
}
node->initNlsEncoder();
if (node->getSyncCallTimeout() > 0) {
node->waitInvokeFinish();
int error_code = node->getErrorCode();
if (error_code != Success) {
MUTEX_UNLOCK(_mtxThread);
#ifdef ENABLE_REQUEST_RECORDING
node->updateNodeProcess("start", NodeCreated, false, 0);
#endif
return -(error_code);
}
}
#ifdef ENABLE_REQUEST_RECORDING
node->updateNodeProcess("start", NodeCreated, false, 0);
#endif
} else if (node->getExitStatus() == ExitInvalid &&
node->getConnectNodeStatus() > NodeCreated &&
node->getConnectNodeStatus() < NodeFailed) {
LOG_WARN(
"Request(%p) node(%p) has invoked start, node status:%s, exit "
"status:%s. skip ...",
request, node, node->getConnectNodeStatusString().c_str(),
node->getExitStatusString().c_str());
MUTEX_UNLOCK(_mtxThread);
return Success;
} else {
LOG_ERROR(
"Request(%p) node(%p) invoke start failed, current status is invalid. "
"node status:%s, exit status:%s.",
request, node, node->getConnectNodeStatusString().c_str(),
node->getExitStatusString().c_str());
node->setConnectNodeStatus(NodeCreated);
MUTEX_UNLOCK(_mtxThread);
return -(InvokeStartFailed);
}
MUTEX_UNLOCK(_mtxThread);
LOG_DEBUG("Request(%p) node(%p) invoke start success.", request, node);
return Success;
}