in nlsCppSdk/transport/connectNode.cpp [1802:2012]
int ConnectNode::webSocketResponse() {
int ret = 0;
int read_len = 0;
if (_releasingFlag) {
LOG_WARN("Node(%p) is releasing!!! skipping ...", this);
return -(InvalidStatusWhenReleasing);
}
uint8_t *frame = (uint8_t *)calloc(ReadBufferSize, sizeof(char));
if (frame == NULL) {
LOG_ERROR("%s %d calloc failed.", __func__, __LINE__);
return 0;
}
#ifdef ENABLE_NLS_DEBUG_2
struct timeval timewait_start, timewait_a, timewait_b, timewait_end;
gettimeofday(&timewait_start, NULL);
#endif
// receive buffer from SSL into _readEvBuffer
read_len = nlsReceive(frame, ReadBufferSize);
if (read_len < 0) {
LOG_ERROR("Request(%p Node(%p) nlsReceive failed, read_len:%d", _request,
this, read_len);
free(frame);
return -(NlsReceiveFailed);
} else if (read_len == 0) {
#ifdef ENABLE_NLS_DEBUG_2
LOG_DEBUG("Request(%p) Node(%p) nlsReceive empty, read_len:%d", _request,
this, read_len);
#endif
free(frame);
return 0;
#ifdef ENABLE_NLS_DEBUG_2
} else {
LOG_DEBUG("Request(%p Node(%p) nlsReceive %dbytes", _request, this,
read_len);
#endif
}
const int maxTryAgain = 3;
int tryAgain = maxTryAgain;
bool eLoop = false;
do {
ret = 0;
size_t frameSize = evbuffer_get_length(_readEvBuffer);
if (frameSize == 0) {
free(frame);
frame = NULL;
ret = 0;
break;
} else if (frameSize > ReadBufferSize) {
uint8_t *tmp = (uint8_t *)realloc(frame, frameSize + 1);
if (NULL == tmp) {
LOG_ERROR("Node(%p) realloc failed.", this);
free(frame);
frame = NULL;
ret = -(ReallocFailed);
break;
} else {
frame = tmp;
LOG_WARN("Node(%p) websocket frame realloc, new size:%d.", this,
frameSize + 1);
}
#ifdef ENABLE_NLS_DEBUG_2
} else {
LOG_DEBUG("Node(%p) nlsReceive %dbytes in readEvBuffer.", this,
frameSize);
#endif
}
size_t cur_data_size = frameSize;
evbuffer_copyout(_readEvBuffer, frame, frameSize);
WebSocketFrame wsFrame;
memset(&wsFrame, 0x0, sizeof(struct WebSocketFrame));
int recv_ret = _webSocket.receiveFullWebSocketFrame(frame, frameSize,
&_wsType, &wsFrame);
if (recv_ret == Success) {
// LOG_DEBUG("Request(%p) Node(%p) parse websocket frame, len:%zu, frame
// size:%zu, _wsType.opCode:%d, wsFrame.type:%d.",
// _request, this, wsFrame.length, frameSize, _wsType.opCode,
// wsFrame.type);
if (_releasingFlag) {
LOG_WARN("Node(%p) is releasing!!! skipping ...", this);
ret = -(InvalidStatusWhenReleasing);
break;
}
if (_wsType.opCode == WebSocketHeaderType::PONG) {
LOG_DEBUG("Node(%p) receive PONG.", this);
// memset(&_wsType, 0x0, sizeof(struct WebSocketHeaderType));
wsFrame.type = _wsType.opCode;
}
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_a, NULL);
#endif
/*
* Will invoke callback in parseFrame.
* If blocking in callback, will block in parseFrame
*/
int result = parseFrame(&wsFrame);
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_b, NULL);
#endif
if (result) {
LOG_ERROR("Node(%p) parse WS frame failed:%d.", this, result);
ret = result;
break;
}
/* Should check node here */
if (_instance == NULL) {
/* Maybe user has released instance */
ret = -(EventClientEmpty);
break;
} else {
/* Maybe user has released request.*/
NlsNodeManager *node_manager = _instance->getNodeManger();
int status = NodeStatusInvalid;
ret = node_manager->checkNodeExist(this, &status);
if (ret != Success) {
LOG_ERROR("Node(%p) checkNodeExist failed, result:%d.", this, ret);
break;
}
}
evbuffer_drain(_readEvBuffer, wsFrame.length + _wsType.headerSize);
cur_data_size = cur_data_size - (wsFrame.length + _wsType.headerSize);
ret = wsFrame.length + _wsType.headerSize;
tryAgain = maxTryAgain;
} else if (recv_ret == -(InvalidWsFrameHeaderSize) ||
recv_ret == -(InvalidWsFrameHeaderBody)) {
if (tryAgain-- > 0) {
LOG_WARN(
"Request(%p) Node(%p) the WS data is insufficient, and continues "
"to be "
"received",
_request, this);
usleep(5 * 1000);
read_len = nlsReceive(frame, ReadBufferSize);
if (read_len < 0) {
LOG_ERROR("Request(%p) Node(%p) nlsReceive failed, read_len:%d",
_request, this, read_len);
if (frame) free(frame);
return -(NlsReceiveFailed);
} else {
// LOG_WARN("Request(%p) Node(%p) nlsReceive again ...", _request,
// this);
}
continue;
}
} else if (recv_ret == -(InvalidWsFrameBody)) {
LOG_DEBUG(
"Request(%p) Node(%p) the WS data is insufficient, and continues to "
"be "
"received later! Read frame size:%dbytes, wsType: "
"headerSize:%dbytes, fin:0x%x opCode:%d mask:0x%x N0:%d N:%d.",
_request, this, frameSize, _wsType.headerSize, _wsType.fin,
_wsType.opCode, _wsType.mask, _wsType.N0, _wsType.N);
ret = 0;
} else {
LOG_ERROR("Request(%p) Node(%p) receive full WebSocket Frame failed:%d",
_request, this, recv_ret);
}
/* 解析成功并还有剩余数据, 则尝试再解析 */
if (ret > 0 && cur_data_size > 0) {
LOG_DEBUG(
"Request(%p) Node(%p) current data remainder size:%d, ret:%d, "
"receive ws frame "
"continue...",
_request, this, cur_data_size, ret);
eLoop = true;
} else {
eLoop = false;
}
} while (eLoop);
if (frame) free(frame);
frame = NULL;
#ifdef ENABLE_NLS_DEBUG_2
gettimeofday(&timewait_end, NULL);
uint64_t time_consuming_a =
timewait_a.tv_sec * 1000 + timewait_a.tv_usec / 1000 -
timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000;
uint64_t time_consuming_parseFrame =
timewait_b.tv_sec * 1000 + timewait_b.tv_usec / 1000 -
timewait_a.tv_sec * 1000 - timewait_a.tv_usec / 1000;
uint64_t time_consuming =
timewait_end.tv_sec * 1000 + timewait_end.tv_usec / 1000 -
timewait_start.tv_sec * 1000 - timewait_start.tv_usec / 1000;
if (time_consuming > 50) {
LOG_WARN(
"Request(%p) Node(%p) webSocketResponse done with excessive time "
"%llums, including recv:%llu parseFrame:%llums.",
_request, this, time_consuming, time_consuming_a,
time_consuming_parseFrame);
} else {
LOG_DEBUG("Request(%p) Node(%p) webSocketResponse done", _request, this);
}
#endif
return ret;
}