int ConnectNode::webSocketResponse()

in nlsCppSdk/transport/connectNode.cpp [1802:2012]


int ConnectNode::webSocketResponse() {
  int ret = 0;
  int read_len = 0;

  if (_releasingFlag) {
    LOG_WARN("Node(%p) is releasing!!! skipping ...", this);
    return -(InvalidStatusWhenReleasing);
  }

  uint8_t *frame = (uint8_t *)calloc(ReadBufferSize, sizeof(char));
  if (frame == NULL) {
    LOG_ERROR("%s %d calloc failed.", __func__, __LINE__);
    return 0;
  }

#ifdef ENABLE_NLS_DEBUG_2
  struct timeval timewait_start, timewait_a, timewait_b, timewait_end;
  gettimeofday(&timewait_start, NULL);
#endif

  // receive buffer from SSL into _readEvBuffer
  read_len = nlsReceive(frame, ReadBufferSize);
  if (read_len < 0) {
    LOG_ERROR("Request(%p Node(%p) nlsReceive failed, read_len:%d", _request,
              this, read_len);
    free(frame);
    return -(NlsReceiveFailed);
  } else if (read_len == 0) {
#ifdef ENABLE_NLS_DEBUG_2
    LOG_DEBUG("Request(%p) Node(%p) nlsReceive empty, read_len:%d", _request,
              this, read_len);
#endif
    free(frame);
    return 0;
#ifdef ENABLE_NLS_DEBUG_2
  } else {
    LOG_DEBUG("Request(%p Node(%p) nlsReceive %dbytes", _request, this,
              read_len);
#endif
  }

  const int maxTryAgain = 3;
  int tryAgain = maxTryAgain;
  bool eLoop = false;
  do {
    ret = 0;
    size_t frameSize = evbuffer_get_length(_readEvBuffer);
    if (frameSize == 0) {
      free(frame);
      frame = NULL;
      ret = 0;
      break;
    } else if (frameSize > ReadBufferSize) {
      uint8_t *tmp = (uint8_t *)realloc(frame, frameSize + 1);
      if (NULL == tmp) {
        LOG_ERROR("Node(%p) realloc failed.", this);
        free(frame);
        frame = NULL;
        ret = -(ReallocFailed);
        break;
      } else {
        frame = tmp;
        LOG_WARN("Node(%p) websocket frame realloc, new size:%d.", this,
                 frameSize + 1);
      }
#ifdef ENABLE_NLS_DEBUG_2
    } else {
      LOG_DEBUG("Node(%p) nlsReceive %dbytes in readEvBuffer.", this,
                frameSize);
#endif
    }

    size_t cur_data_size = frameSize;
    evbuffer_copyout(_readEvBuffer, frame, frameSize);

    WebSocketFrame wsFrame;
    memset(&wsFrame, 0x0, sizeof(struct WebSocketFrame));
    int recv_ret = _webSocket.receiveFullWebSocketFrame(frame, frameSize,
                                                        &_wsType, &wsFrame);
    if (recv_ret == Success) {
      // LOG_DEBUG("Request(%p) Node(%p) parse websocket frame, len:%zu, frame
      // size:%zu, _wsType.opCode:%d, wsFrame.type:%d.",
      //     _request, this, wsFrame.length, frameSize, _wsType.opCode,
      //     wsFrame.type);

      if (_releasingFlag) {
        LOG_WARN("Node(%p) is releasing!!! skipping ...", this);
        ret = -(InvalidStatusWhenReleasing);
        break;
      }

      if (_wsType.opCode == WebSocketHeaderType::PONG) {
        LOG_DEBUG("Node(%p) receive PONG.", this);
        // memset(&_wsType, 0x0, sizeof(struct WebSocketHeaderType));
        wsFrame.type = _wsType.opCode;
      }

#ifdef ENABLE_NLS_DEBUG_2
      gettimeofday(&timewait_a, NULL);
#endif
      /*
       * Will invoke callback in parseFrame.
       * If blocking in callback, will block in parseFrame
       */
      int result = parseFrame(&wsFrame);
#ifdef ENABLE_NLS_DEBUG_2
      gettimeofday(&timewait_b, NULL);
#endif
      if (result) {
        LOG_ERROR("Node(%p) parse WS frame failed:%d.", this, result);
        ret = result;
        break;
      }

      /* Should check node here */
      if (_instance == NULL) {
        /* Maybe user has released instance */
        ret = -(EventClientEmpty);
        break;
      } else {
        /* Maybe user has released request.*/
        NlsNodeManager *node_manager = _instance->getNodeManger();
        int status = NodeStatusInvalid;
        ret = node_manager->checkNodeExist(this, &status);
        if (ret != Success) {
          LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", this, ret);
          break;
        }
      }

      evbuffer_drain(_readEvBuffer, wsFrame.length + _wsType.headerSize);
      cur_data_size = cur_data_size - (wsFrame.length + _wsType.headerSize);

      ret = wsFrame.length + _wsType.headerSize;
      tryAgain = maxTryAgain;
    } else if (recv_ret == -(InvalidWsFrameHeaderSize) ||
               recv_ret == -(InvalidWsFrameHeaderBody)) {
      if (tryAgain-- > 0) {
        LOG_WARN(
            "Request(%p) Node(%p) the WS data is insufficient, and continues "
            "to be "
            "received",
            _request, this);

        usleep(5 * 1000);

        read_len = nlsReceive(frame, ReadBufferSize);
        if (read_len < 0) {
          LOG_ERROR("Request(%p) Node(%p) nlsReceive failed, read_len:%d",
                    _request, this, read_len);
          if (frame) free(frame);
          return -(NlsReceiveFailed);
        } else {
          // LOG_WARN("Request(%p) Node(%p) nlsReceive again ...", _request,
          // this);
        }
        continue;
      }
    } else if (recv_ret == -(InvalidWsFrameBody)) {
      LOG_DEBUG(
          "Request(%p) Node(%p) the WS data is insufficient, and continues to "
          "be "
          "received later! Read frame size:%dbytes, wsType: "
          "headerSize:%dbytes, fin:0x%x opCode:%d mask:0x%x N0:%d N:%d.",
          _request, this, frameSize, _wsType.headerSize, _wsType.fin,
          _wsType.opCode, _wsType.mask, _wsType.N0, _wsType.N);
      ret = 0;
    } else {
      LOG_ERROR("Request(%p) Node(%p) receive full WebSocket Frame failed:%d",
                _request, this, recv_ret);
    }

    /* 解析成功并还有剩余数据, 则尝试再解析 */
    if (ret > 0 && cur_data_size > 0) {
      LOG_DEBUG(
          "Request(%p) Node(%p) current data remainder size:%d, ret:%d, "
          "receive ws frame "
          "continue...",
          _request, this, cur_data_size, ret);
      eLoop = true;
    } else {
      eLoop = false;
    }
  } while (eLoop);

  if (frame) free(frame);
  frame = NULL;

#ifdef ENABLE_NLS_DEBUG_2
  gettimeofday(&timewait_end, NULL);
  uint64_t time_consuming_a =
      timewait_a.tv_sec * 1000 + timewait_a.tv_usec / 1000 -
      timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000;
  uint64_t time_consuming_parseFrame =
      timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 -
      timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000;
  uint64_t time_consuming =
      timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 -
      timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000;
  if (time_consuming > 50) {
    LOG_WARN(
        "Request(%p) Node(%p) webSocketResponse done with excessive time "
        "%llums, including recv:%llu parseFrame:%llums.",
        _request, this, time_consuming, time_consuming_a,
        time_consuming_parseFrame);
  } else {
    LOG_DEBUG("Request(%p) Node(%p) webSocketResponse done", _request, this);
  }
#endif
  return ret;
}