in nlsCppSdk/transport/connectNode.cpp [2143:2444]
int ConnectNode::parseFrame(WebSocketFrame *wsFrame) {
REQUEST_CHECK(_request, this);
int result = Success;
NlsEvent *frameEvent = NULL;
#ifdef ENABLE_NLS_DEBUG_2
struct timeval timewait_start, timewait_a, timewait_b, timewait_c, timewait_d,
timewait_end;
gettimeofday(&timewait_start, NULL);
#endif
if (wsFrame->type == WebSocketHeaderType::CLOSE) {
LOG_INFO("Node(%p) get CLOSE wsFrame closeCode:%d.", this,
wsFrame->closeCode);
if (NodeClosed != _workStatus) {
std::string msg((char *)wsFrame->data, wsFrame->length);
if (!msg.empty()) {
LOG_ERROR("Node(%p) get error message:%s", this, msg.c_str());
}
char tmp_msg[2048] = {0};
snprintf(tmp_msg, 2048 - 1, "{\"TaskFailed\":\"%s\"}", msg.c_str());
std::string failedMsg = tmp_msg;
LOG_ERROR("Node(%p) failed msg:%s.", this, failedMsg.c_str());
frameEvent = new NlsEvent(failedMsg.c_str(), wsFrame->closeCode,
NlsEvent::TaskFailed,
_request->getRequestParam()->_task_id);
if (frameEvent == NULL) {
LOG_ERROR("Node(%p) new NlsEvent failed!", this);
handlerEvent(TASKFAILED_NEW_NLSEVENT_FAILED, MemNotEnough,
NlsEvent::TaskFailed, _enableOnMessage);
return -(NewNlsEventFailed);
}
} else {
LOG_INFO("Node(%p) NlsEvent::Close has invoked, skip CLOSE_FRAME.", this);
}
} else if (wsFrame->type == WebSocketHeaderType::PONG) {
LOG_DEBUG("Node(%p) get PONG.", this);
return Success;
} else {
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_a, NULL);
#endif
frameEvent = convertResult(wsFrame, &result);
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_b, NULL);
uint64_t time_consuming_convertResult0 =
timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 -
timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000;
if (time_consuming_convertResult0 > 50) {
LOG_WARN("Request(%p) Node(%p) parseFrame after convertResult:%llu.",
_request, this, time_consuming_convertResult0);
}
#endif
}
if (frameEvent == NULL) {
if (result == -(WsFrameBodyEmpty)) {
LOG_WARN(
"Node(%p) convert result failed, result:%d. Maybe recv dirty data, "
"skip here ...",
this, result);
return Success;
} else {
LOG_ERROR("Node(%p) convert result failed, result:%d.", this, result);
closeConnectNode();
if (result != Success) {
std::string tmp_buf;
handlerEvent(genCloseMsg(&tmp_buf), CloseCode, NlsEvent::Close,
_enableOnMessage);
}
return -(NlsEventEmpty);
}
}
LOG_DEBUG(
"Node(%p) begin HandlerFrame, msg type:%s node status:%s exit status:%s.",
this, frameEvent->getMsgTypeString().c_str(),
getConnectNodeStatusString().c_str(), getExitStatusString().c_str());
// invoked cancel()
if (_exitStatus == ExitCancel) {
LOG_WARN("Node(%p) has been canceled.", this);
if (frameEvent) delete frameEvent;
frameEvent = NULL;
return -(InvalidExitStatus);
}
int msg_type = frameEvent->getMsgType();
switch (msg_type) {
case NlsEvent::RecognitionStarted:
case NlsEvent::TranscriptionStarted:
case NlsEvent::SynthesisStarted:
// reset task_id from server, which will use in channelClose callback.
if (frameEvent->getTaskId()) {
std::string taskId(frameEvent->getTaskId());
if (!taskId.empty()) {
_request->getRequestParam()->setTaskId(taskId);
}
}
break;
default:
break;
}
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_c, NULL);
#endif
result = handlerFrame(frameEvent);
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_d, NULL);
#endif
if (result) {
delete frameEvent;
frameEvent = NULL;
return result;
}
LOG_DEBUG(
"Node(%p) HandlerFrame finish, current node status:%s, ready to set "
"workStatus.",
this, getConnectNodeStatusString().c_str());
// after callback
bool closeFlag = false;
switch (msg_type) {
case NlsEvent::RecognitionStarted:
case NlsEvent::TranscriptionStarted:
case NlsEvent::SynthesisStarted:
if (_request->getRequestParam()->_requestType == SpeechWakeWordDialog) {
_workStatus = NodeWakeWording;
} else {
_workStatus = NodeStarted;
if (_request->getRequestParam()->_mode == TypeStreamInputTts) {
FlowingSynthesizerParam *param =
(FlowingSynthesizerParam *)_request->getRequestParam();
if (param->getSingleRoundText().size() > 0) {
_isSendSingleRoundText = true;
if (event_add(getSingleRoundTextEvent(), NULL) == Success) {
event_active(getSingleRoundTextEvent(), EV_READ, 0);
}
}
}
}
#ifdef ENABLE_CONTINUED
// reconnecting finished
_reconnection.state = NodeReconnection::NoReconnection;
#endif
break;
case NlsEvent::Close:
case NlsEvent::RecognitionCompleted:
_workStatus = NodeCompleted;
if (_request->getRequestParam()->_mode == TypeDialog) {
closeFlag = false;
} else {
closeFlag = true;
}
break;
case NlsEvent::TaskFailed:
_workStatus = NodeFailed;
closeFlag = true;
break;
case NlsEvent::TranscriptionCompleted:
_workStatus = NodeCompleted;
closeFlag = true;
break;
case NlsEvent::SynthesisCompleted:
_workStatus = NodeCompleted;
closeFlag = true;
break;
case NlsEvent::DialogResultGenerated:
closeFlag = true;
break;
case NlsEvent::WakeWordVerificationCompleted:
_workStatus = NodeStarted;
break;
case NlsEvent::Binary:
if (_isFirstBinaryFrame) {
LOG_DEBUG(
"Node(%p) get first binary frame, set work status to NodeStarted",
this);
_isFirstBinaryFrame = false;
_workStatus = NodeStarted;
}
break;
default:
closeFlag = false;
break;
}
if (frameEvent) delete frameEvent;
frameEvent = NULL;
if (closeFlag) {
_isPreconnecting = false;
if (!_isLongConnection || _workStatus == NodeFailed) {
#ifdef ENABLE_PRECONNECTED_POOL
if (_usePreconnection && _workStatus != NodeFailed) {
// 启用preconnected功能更并且未发生错误, 则进行记录
closeStatusConnectNodeForConnectedPool();
_isPreconnecting =
true; /* 用于标记此Node进行交互并存储到ConnectedPool */
} else
#endif
{
closeConnectNode();
}
} else {
closeStatusConnectNode();
}
#ifdef ENABLE_PRECONNECTED_POOL
if (_isPreconnecting) {
// 启用preconnected功能并且未发生错误, 则进行记录
// if (_request->getRequestParam()->_mode == TypeTts) {
// LOG_DEBUG(
// "Node(%p) reset NodeStatus to NodeHandshaked, will push "
// "prestarted node into ConnectedPool.",
// this);
// // 把Node存入ConnectedPool
// if (NlsEventNetWork::_eventClient &&
// NlsEventNetWork::_eventClient->getPreconnectedPool()) {
// uint64_t push_prestarted_begin_ms =
// utility::TextUtils::GetTimestampMs();
// NlsEventNetWork::_eventClient->getPreconnectedPool()
// ->pushPrestartedNode(_request,
// _request->getRequestParam()->_mode,
// _sslHandle == _nativeSslHandle);
// uint64_t push_prestarted_end_ms =
// utility::TextUtils::GetTimestampMs();
// if (push_prestarted_end_ms - push_prestarted_begin_ms > 50) {
// LOG_WARN("Node(%p) pushPrestartedNode excessive latency:%llums.",
// this, push_prestarted_end_ms -
// push_prestarted_begin_ms);
// }
// }
// LOG_DEBUG("Node(%p) reset NodeStatus to NodeHandshaked done.", this);
// } else {
// LOG_DEBUG(
// "Node(%p) reset NodeStatus to NodeHandshaked, will push
// prestarted " "node into ConnectedPool when Started.", this);
// // 将在Started时把Node存入ConnectedPool
// }
LOG_DEBUG(
"Node(%p) reset NodeStatus to NodeHandshaked, will push "
"prestarted node into ConnectedPool.",
this);
// 把Node存入ConnectedPool
if (NlsEventNetWork::_eventClient &&
NlsEventNetWork::_eventClient->getPreconnectedPool()) {
uint64_t push_prestarted_begin_ms =
utility::TextUtils::GetTimestampMs();
if (_nodeProcess.connect_type == ConnectWithPreconnectedNodePool &&
_sslHandle != _nativeSslHandle) {
NlsEventNetWork::_eventClient->getPreconnectedPool()
->pushPrestartedNodeFromPreconnected(
_request, _request->getRequestParam()->_mode);
} else {
NlsEventNetWork::_eventClient->getPreconnectedPool()
->pushPrestartedNode(_request, _request->getRequestParam()->_mode,
_sslHandle == _nativeSslHandle);
}
uint64_t push_prestarted_end_ms = utility::TextUtils::GetTimestampMs();
if (push_prestarted_end_ms - push_prestarted_begin_ms > 50) {
LOG_WARN("Node(%p) pushPrestartedNode excessive latency:%llums.",
this, push_prestarted_end_ms - push_prestarted_begin_ms);
}
}
LOG_DEBUG("Node(%p) reset NodeStatus to NodeHandshaked done.", this);
}
#endif
std::string tmp_buf;
handlerEvent(genCloseMsg(&tmp_buf), CloseCode, NlsEvent::Close,
_enableOnMessage);
} // closeFlag
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_end, NULL);
uint64_t time_consuming =
timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 -
timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000;
uint64_t time_consuming_convertResult =
timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 -
timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000;
uint64_t time_consuming_handlerFrame =
timewait_d.tv_sec * 1000 + timewait_d.tv_usec / 1000 -
timewait_c.tv_sec * 1000 - timewait_c.tv_usec / 1000;
if (time_consuming > 50) {
LOG_WARN(
"Request(%p) Node(%p) parseFrame done with excessive time "
"%llums, including convertResult:%llums and handlerFrame:%llums.",
_request, this, time_consuming, time_consuming_convertResult,
time_consuming_handlerFrame);
}
#endif
return Success;
}