int ConnectNode::parseFrame()

in nlsCppSdk/transport/connectNode.cpp [2143:2444]


int ConnectNode::parseFrame(WebSocketFrame *wsFrame) {
  REQUEST_CHECK(_request, this);
  int result = Success;
  NlsEvent *frameEvent = NULL;

#ifdef ENABLE_NLS_DEBUG_2
  struct timeval timewait_start, timewait_a, timewait_b, timewait_c, timewait_d,
      timewait_end;
  gettimeofday(&timewait_start, NULL);
#endif

  if (wsFrame->type == WebSocketHeaderType::CLOSE) {
    LOG_INFO("Node(%p) get CLOSE wsFrame closeCode:%d.", this,
             wsFrame->closeCode);
    if (NodeClosed != _workStatus) {
      std::string msg((char *)wsFrame->data, wsFrame->length);
      if (!msg.empty()) {
        LOG_ERROR("Node(%p) get error message:%s", this, msg.c_str());
      }
      char tmp_msg[2048] = {0};
      snprintf(tmp_msg, 2048 - 1, "{\"TaskFailed\":\"%s\"}", msg.c_str());
      std::string failedMsg = tmp_msg;

      LOG_ERROR("Node(%p) failed msg:%s.", this, failedMsg.c_str());

      frameEvent = new NlsEvent(failedMsg.c_str(), wsFrame->closeCode,
                                NlsEvent::TaskFailed,
                                _request->getRequestParam()->_task_id);
      if (frameEvent == NULL) {
        LOG_ERROR("Node(%p) new NlsEvent failed!", this);
        handlerEvent(TASKFAILED_NEW_NLSEVENT_FAILED, MemNotEnough,
                     NlsEvent::TaskFailed, _enableOnMessage);
        return -(NewNlsEventFailed);
      }
    } else {
      LOG_INFO("Node(%p) NlsEvent::Close has invoked, skip CLOSE_FRAME.", this);
    }
  } else if (wsFrame->type == WebSocketHeaderType::PONG) {
    LOG_DEBUG("Node(%p) get PONG.", this);
    return Success;
  } else {
#ifdef ENABLE_NLS_DEBUG_2
    gettimeofday(&timewait_a, NULL);
#endif
    frameEvent = convertResult(wsFrame, &result);
#ifdef ENABLE_NLS_DEBUG_2
    gettimeofday(&timewait_b, NULL);
    uint64_t time_consuming_convertResult0 =
        timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 -
        timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000;
    if (time_consuming_convertResult0 > 50) {
      LOG_WARN("Request(%p) Node(%p) parseFrame after convertResult:%llu.",
               _request, this, time_consuming_convertResult0);
    }
#endif
  }

  if (frameEvent == NULL) {
    if (result == -(WsFrameBodyEmpty)) {
      LOG_WARN(
          "Node(%p) convert result failed, result:%d. Maybe recv dirty data, "
          "skip here ...",
          this, result);
      return Success;
    } else {
      LOG_ERROR("Node(%p) convert result failed, result:%d.", this, result);
      closeConnectNode();
      if (result != Success) {
        std::string tmp_buf;
        handlerEvent(genCloseMsg(&tmp_buf), CloseCode, NlsEvent::Close,
                     _enableOnMessage);
      }
      return -(NlsEventEmpty);
    }
  }

  LOG_DEBUG(
      "Node(%p) begin HandlerFrame, msg type:%s node status:%s exit status:%s.",
      this, frameEvent->getMsgTypeString().c_str(),
      getConnectNodeStatusString().c_str(), getExitStatusString().c_str());

  // invoked cancel()
  if (_exitStatus == ExitCancel) {
    LOG_WARN("Node(%p) has been canceled.", this);
    if (frameEvent) delete frameEvent;
    frameEvent = NULL;
    return -(InvalidExitStatus);
  }

  int msg_type = frameEvent->getMsgType();
  switch (msg_type) {
    case NlsEvent::RecognitionStarted:
    case NlsEvent::TranscriptionStarted:
    case NlsEvent::SynthesisStarted:
      // reset task_id from server, which will use in channelClose callback.
      if (frameEvent->getTaskId()) {
        std::string taskId(frameEvent->getTaskId());
        if (!taskId.empty()) {
          _request->getRequestParam()->setTaskId(taskId);
        }
      }
      break;
    default:
      break;
  }

#ifdef ENABLE_NLS_DEBUG_2
  gettimeofday(&timewait_c, NULL);
#endif
  result = handlerFrame(frameEvent);
#ifdef ENABLE_NLS_DEBUG_2
  gettimeofday(&timewait_d, NULL);
#endif
  if (result) {
    delete frameEvent;
    frameEvent = NULL;
    return result;
  }

  LOG_DEBUG(
      "Node(%p) HandlerFrame finish, current node status:%s, ready to set "
      "workStatus.",
      this, getConnectNodeStatusString().c_str());

  // after callback
  bool closeFlag = false;
  switch (msg_type) {
    case NlsEvent::RecognitionStarted:
    case NlsEvent::TranscriptionStarted:
    case NlsEvent::SynthesisStarted:
      if (_request->getRequestParam()->_requestType == SpeechWakeWordDialog) {
        _workStatus = NodeWakeWording;
      } else {
        _workStatus = NodeStarted;
        if (_request->getRequestParam()->_mode == TypeStreamInputTts) {
          FlowingSynthesizerParam *param =
              (FlowingSynthesizerParam *)_request->getRequestParam();
          if (param->getSingleRoundText().size() > 0) {
            _isSendSingleRoundText = true;
            if (event_add(getSingleRoundTextEvent(), NULL) == Success) {
              event_active(getSingleRoundTextEvent(), EV_READ, 0);
            }
          }
        }
      }
#ifdef ENABLE_CONTINUED
      // reconnecting finished
      _reconnection.state = NodeReconnection::NoReconnection;
#endif
      break;
    case NlsEvent::Close:
    case NlsEvent::RecognitionCompleted:
      _workStatus = NodeCompleted;
      if (_request->getRequestParam()->_mode == TypeDialog) {
        closeFlag = false;
      } else {
        closeFlag = true;
      }
      break;
    case NlsEvent::TaskFailed:
      _workStatus = NodeFailed;
      closeFlag = true;
      break;
    case NlsEvent::TranscriptionCompleted:
      _workStatus = NodeCompleted;
      closeFlag = true;
      break;
    case NlsEvent::SynthesisCompleted:
      _workStatus = NodeCompleted;
      closeFlag = true;
      break;
    case NlsEvent::DialogResultGenerated:
      closeFlag = true;
      break;
    case NlsEvent::WakeWordVerificationCompleted:
      _workStatus = NodeStarted;
      break;
    case NlsEvent::Binary:
      if (_isFirstBinaryFrame) {
        LOG_DEBUG(
            "Node(%p) get first binary frame, set work status to NodeStarted",
            this);
        _isFirstBinaryFrame = false;
        _workStatus = NodeStarted;
      }
      break;
    default:
      closeFlag = false;
      break;
  }

  if (frameEvent) delete frameEvent;
  frameEvent = NULL;

  if (closeFlag) {
    _isPreconnecting = false;
    if (!_isLongConnection || _workStatus == NodeFailed) {
#ifdef ENABLE_PRECONNECTED_POOL
      if (_usePreconnection && _workStatus != NodeFailed) {
        // 启用preconnected功能更并且未发生错误, 则进行记录
        closeStatusConnectNodeForConnectedPool();
        _isPreconnecting =
            true; /* 用于标记此Node进行交互并存储到ConnectedPool */
      } else
#endif
      {
        closeConnectNode();
      }
    } else {
      closeStatusConnectNode();
    }

#ifdef ENABLE_PRECONNECTED_POOL
    if (_isPreconnecting) {
      // 启用preconnected功能并且未发生错误, 则进行记录
      // if (_request->getRequestParam()->_mode == TypeTts) {
      //   LOG_DEBUG(
      //       "Node(%p) reset NodeStatus to NodeHandshaked, will push "
      //       "prestarted node into ConnectedPool.",
      //       this);
      //   // 把Node存入ConnectedPool
      //   if (NlsEventNetWork::_eventClient &&
      //       NlsEventNetWork::_eventClient->getPreconnectedPool()) {
      //     uint64_t push_prestarted_begin_ms =
      //         utility::TextUtils::GetTimestampMs();
      //     NlsEventNetWork::_eventClient->getPreconnectedPool()
      //         ->pushPrestartedNode(_request,
      //         _request->getRequestParam()->_mode,
      //                              _sslHandle == _nativeSslHandle);
      //     uint64_t push_prestarted_end_ms =
      //         utility::TextUtils::GetTimestampMs();
      //     if (push_prestarted_end_ms - push_prestarted_begin_ms > 50) {
      //       LOG_WARN("Node(%p) pushPrestartedNode excessive latency:%llums.",
      //                this, push_prestarted_end_ms -
      //                push_prestarted_begin_ms);
      //     }
      //   }
      //   LOG_DEBUG("Node(%p) reset NodeStatus to NodeHandshaked done.", this);
      // } else {
      //   LOG_DEBUG(
      //       "Node(%p) reset NodeStatus to NodeHandshaked, will push
      //       prestarted " "node into ConnectedPool when Started.", this);
      //   // 将在Started时把Node存入ConnectedPool
      // }

      LOG_DEBUG(
          "Node(%p) reset NodeStatus to NodeHandshaked, will push "
          "prestarted node into ConnectedPool.",
          this);

      // 把Node存入ConnectedPool
      if (NlsEventNetWork::_eventClient &&
          NlsEventNetWork::_eventClient->getPreconnectedPool()) {
        uint64_t push_prestarted_begin_ms =
            utility::TextUtils::GetTimestampMs();
        if (_nodeProcess.connect_type == ConnectWithPreconnectedNodePool &&
            _sslHandle != _nativeSslHandle) {
          NlsEventNetWork::_eventClient->getPreconnectedPool()
              ->pushPrestartedNodeFromPreconnected(
                  _request, _request->getRequestParam()->_mode);
        } else {
          NlsEventNetWork::_eventClient->getPreconnectedPool()
              ->pushPrestartedNode(_request, _request->getRequestParam()->_mode,
                                   _sslHandle == _nativeSslHandle);
        }
        uint64_t push_prestarted_end_ms = utility::TextUtils::GetTimestampMs();
        if (push_prestarted_end_ms - push_prestarted_begin_ms > 50) {
          LOG_WARN("Node(%p) pushPrestartedNode excessive latency:%llums.",
                   this, push_prestarted_end_ms - push_prestarted_begin_ms);
        }
      }

      LOG_DEBUG("Node(%p) reset NodeStatus to NodeHandshaked done.", this);
    }
#endif

    std::string tmp_buf;
    handlerEvent(genCloseMsg(&tmp_buf), CloseCode, NlsEvent::Close,
                 _enableOnMessage);
  }  // closeFlag

#ifdef ENABLE_NLS_DEBUG_2
  gettimeofday(&timewait_end, NULL);
  uint64_t time_consuming =
      timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 -
      timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000;
  uint64_t time_consuming_convertResult =
      timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 -
      timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000;
  uint64_t time_consuming_handlerFrame =
      timewait_d.tv_sec * 1000 + timewait_d.tv_usec / 1000 -
      timewait_c.tv_sec * 1000 - timewait_c.tv_usec / 1000;
  if (time_consuming > 50) {
    LOG_WARN(
        "Request(%p) Node(%p) parseFrame done with excessive time "
        "%llums, including convertResult:%llums and handlerFrame:%llums.",
        _request, this, time_consuming, time_consuming_convertResult,
        time_consuming_handlerFrame);
  }
#endif
  return Success;
}