nlsCppSdk/transport/webSocketTcp.cpp (443 lines of code) (raw):

/* * Copyright 2021 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #if defined(_MSC_VER) #include <winsock2.h> #else #include <arpa/inet.h> #include <netdb.h> #include <stdio.h> #endif #include <stdlib.h> #include <fstream> #include <sstream> #include "nlog.h" #include "nlsGlobal.h" #include "text_utils.h" #include "utility.h" #include "webSocketTcp.h" namespace AlibabaNls { #define HTTP_STATUS_CODE "HTTP/1.1 " #define HTTP_STATUS_CODE_END " " #define HTTP_HEADER_END_STRING "\r\n\r\n" #define HTTP_CONTENT_LENGTH "Content-Length: " #define HTTP_CONTENT_LENGTH_END "\r\n" #define SEC_WS_VER "13" //#define OPU_DEBUG WebSocketTcp::WebSocketTcp() : _httpCode(0), _httpLength(0), _rStatus(WsHeadSize), _nodeHandle(NULL) { LOG_DEBUG("Create WebSocketTcp:%p.", this); } WebSocketTcp::~WebSocketTcp() { LOG_DEBUG("WsTcp(%p) Destroy WebSocketTcp done.", this); } int WebSocketTcp::parseUrlAddress(struct urlAddress& url, const char* address) { if (sscanf(address, "%10[^:/]://%256[^:/]:%d/%255s", url._type, url._host, &url._port, url._path) == 4) { if (strcmp(url._type, "wss") == 0 || strcmp(url._type, "https") == 0) { url._isSsl = true; } } else if (sscanf(address, "%10[^:/]://%256[^:/]/%255s", url._type, url._host, url._path) == 3) { if (strcmp(url._type, "wss") == 0 || strcmp(url._type, "https") == 0) { url._port = 443; url._isSsl = true; } else { url._port = 80; } } else if (sscanf(address, "%10[^:/]://%256[^:/]:%d", url._type, url._host, &url._port) == 3) { url._path[0] = '\0'; } else if (sscanf(address, "%10[^:/]://%256[^:/]", url._type, url._host) == 2) { if (strcmp(url._type, "wss") == 0 || strcmp(url._type, "https") == 0) { url._port = 443; url._isSsl = true; } else { url._port = 80; } url._path[0] = '\0'; } else { return -(ParseUrlFailed); } return Success; } bool WebSocketTcp::urlWithAccess(const char* address) { struct urlAddress url; if (WebSocketTcp::parseUrlAddress(url, address) == Success) { for (int i = 0; i < strnlen(url._path, PathSize); i++) { if (url._path[i] == '?') { return true; } } } return false; } int WebSocketTcp::requestPackage(urlAddress* url, char* buffer, std::string httpHeader) { char hostBuff[256] = {0}; if (url->_port == HttpPort) { _ssnprintf(hostBuff, 256, "Host: %s\r\n", url->_host); } else { _ssnprintf(hostBuff, 256, "Host: %s:%d\r\n", url->_host, url->_port); } int contentSize = 0; const int token_bytes = strnlen(url->_token, TokenSize); const int ws_key_bytes = strnlen(getSecWsKey(), 128) - 18; if (httpHeader.empty()) { contentSize = _ssnprintf( buffer, BufferSize, "GET /%s HTTP/1.1\r\n%s%s%s%s%s%s%s%s%s: %s\r\n%s", url->_path, hostBuff, "Upgrade: websocket\r\n", "Connection: Upgrade\r\n", getSecWsKey(), HTTP_CONTENT_LENGTH_END, "Sec-WebSocket-Version: ", SEC_WS_VER, HTTP_CONTENT_LENGTH_END, "X-NLS-Token", url->_token, HTTP_CONTENT_LENGTH_END); } else { contentSize = _ssnprintf( buffer, BufferSize, "GET /%s HTTP/1.1\r\n%s%s%s%s%s%s%s%s%s: %s\r\n%s%s", url->_path, hostBuff, "Upgrade: websocket\r\n", "Connection: Upgrade\r\n", getSecWsKey(), HTTP_CONTENT_LENGTH_END, "Sec-WebSocket-Version: ", SEC_WS_VER, HTTP_CONTENT_LENGTH_END, "X-NLS-Token", url->_token, httpHeader.c_str(), HTTP_CONTENT_LENGTH_END); } if (contentSize <= 0) { LOG_ERROR("WsTcp(%p) send http head to server failed.", this); return -(WsRequestPackageEmpty); } const int default_step = 4; const int token_step = token_bytes < default_step ? token_bytes : default_step; const int ws_key_step = ws_key_bytes < default_step ? ws_key_step : default_step; if (token_step > 0 && ws_key_step > 0) { std::string key_buf_str; std::string token_buf_str; key_buf_str = utility::TextUtils::securityDisposalForLog( buffer, &key_buf_str, "Sec-WebSocket-Key:", ws_key_step, 'X'); token_buf_str = utility::TextUtils::securityDisposalForLog( (char*)key_buf_str.c_str(), &token_buf_str, "X-NLS-Token:", token_step, 'Y'); LOG_DEBUG("WsTcp(%p) Http Request:\n%s", this, token_buf_str.c_str()); } return contentSize; } /* * HTTP/1.1 403 Forbidden * Date: Mon, 18 Feb 2019 08:38:46 GMT * Content-Length: 64 * Connection: keep-alive * Server: Tengine * * Meta:ACCESS_DENIED:The token '12345ffdsfdfvdfcdfcdc' is invalid!. */ /* * HTTP/1.1 101 Switching Protocols * Date: Mon, 18 Feb 2019 08:39:51 GMT * Connection: upgrade * Server: Tengine * upgrade: websocket * sec-websocket-accept: HSmrc0sMlYUkAGmm5OPpG2HaGWk= */ int WebSocketTcp::getTargetLen(std::string line, const char* begin, const char* end) { size_t seek = line.find(begin); size_t position = line.find(end, seek + strlen(begin)); if (position == std::string::npos) { return 0; } LOG_DEBUG("WsTcp(%p) Position: %d %d %s", this, position, strlen(begin), line.c_str() + position); std::string tmpCode( line.substr(seek + strlen(begin), position - strlen(begin))); return atoi(tmpCode.c_str()); } const char* WebSocketTcp::getFailedMsg() { return _errorMsg.c_str(); } const char* WebSocketTcp::getSecWsKey() { char buffer[128] = {0}; char tmp[64] = {0}; char key_array[10] = {'x', 'D', 'H', '3', 'L', 'M', 'J', '1', 'b', 'J'}; char key2[] = {"BhXDw=="}; char key1[] = {"EzLkh9G"}; char name3[] = {"Key"}; char name2[] = {"WebSocket"}; char name1[] = {"Sec"}; int i = 0; for (i = 0; i < 10; i++) { tmp[i] = key_array[i * 7 % 10]; } _ssnprintf(buffer, 128, "%s-%s-%s: %s%s%s", name1, name2, name3, tmp, key1, key2); _secWsKey = buffer; return _secWsKey.c_str(); } int WebSocketTcp::responsePackage(const char* content, size_t length) { LOG_DEBUG("WsTcp(%p) Http response:%s", this, content); if (!strstr(content, HTTP_HEADER_END_STRING)) { return length; } std::string tmpLine = content; if (_httpCode == 0) { _httpCode = getTargetLen(tmpLine, HTTP_STATUS_CODE, HTTP_STATUS_CODE_END); if (_httpCode == 0) { LOG_ERROR("WsTcp(%p) Got bad status connecting to %s", this, content); return -(HttpGotBadStatus); } } if (_httpCode == 101) { return 0; } else { if (_httpLength == 0) { _httpLength = getTargetLen(tmpLine, HTTP_CONTENT_LENGTH, HTTP_CONTENT_LENGTH_END); } size_t endLen = strlen(HTTP_HEADER_END_STRING); size_t position = tmpLine.find(HTTP_HEADER_END_STRING); if (position != std::string::npos) { if (_httpLength == 0) { LOG_ERROR("WsTcp(%p) Failed: %s", this, content); _errorMsg = content; return -(WsResponsePackageFailed); } else { if (_httpLength == (length - (position + endLen))) { const char* errMsg = (content + position + endLen); _errorMsg = errMsg; LOG_ERROR("WsTcp(%p) Position: %d %s", this, (int)(length - (position + endLen)), errMsg); return -(WsResponsePackageFailed); } } } } return -(WsResponsePackageFailed); } int WebSocketTcp::receiveFullWebSocketFrame(uint8_t* frame, size_t frameSize, WebSocketHeaderType* wsType, WebSocketFrame* resultDate) { int retCode = Success; // LOG_DEBUG("begin receiveFullWebSocketFrame."); switch (_rStatus) { case WsHeadSize: // LOG_DEBUG("WsHeadSize."); retCode = decodeHeaderSizeWebSocketFrame(frame, frameSize, wsType); // LOG_DEBUG("WsTcp(%p) Parse decodeHeaderSizeWebSocketFrame // wsType->opCode:%d.", this, wsType->opCode); if (retCode < 0) { LOG_ERROR("Node(%p) WsTcp(%p) Parse WsHeadSize Failed, retCode:%d.", getConnectNode(), this, retCode); return retCode; } _rStatus = WsHeadBody; case WsHeadBody: // LOG_DEBUG("WsHeadBody."); retCode = decodeHeaderBodyWebSocketFrame(frame, frameSize, wsType); // LOG_DEBUG("WsTcp(%p) Parse decodeHeaderBodyWebSocketFrame // wsType->opCode:%d.", this, wsType->opCode); if (retCode < 0) { if (wsType->opCode == WebSocketHeaderType::PONG) { LOG_DEBUG("This Node(%p) WsTcp(%p) is PONG, please ignore this warn.", getConnectNode(), this); retCode = Success; } else { LOG_WARN( "Node(%p) WsTcp(%p) Parse WsHeadBody Failed, retCode:%d, " "wsType->headerSize:%d, frameSize:%d, wsType->fin:0x%x, " "wsType->opCode:%d, wsType->mask:0x%x, wsType->N0:%d", getConnectNode(), this, retCode, wsType->headerSize, frameSize, wsType->fin, wsType->opCode, wsType->mask, wsType->N0); return retCode; } // } else { // LOG_DEBUG("Node(%p) WsTcp(%p) this WS frame has %dbytes.", // getConnectNode(), this, wsType->N); } _rStatus = WsContentBody; case WsContentBody: // LOG_DEBUG("WsContentBody."); retCode = decodeFrameBodyWebSocketFrame(frame, frameSize, wsType, resultDate); // LOG_DEBUG("WsTcp(%p) Parse decodeFrameBodyWebSocketFrame // wsType->opCode:%d with retCode:%d.", this, wsType->opCode, retCode); if (retCode < 0) { #ifdef ENABLE_NLS_DEBUG_2 LOG_WARN( "Node(%p) WsTcp(%p) Parse WsContentBody Failed, retCode:%d, " "wsType->headerSize:%d, frameSize:%d, wsType->fin:0x%x, " "wsType->opCode:%d, wsType->mask:0x%x, wsType->N0:%d, , " "wsType->N:%d", getConnectNode(), this, retCode, wsType->headerSize, frameSize, wsType->fin, wsType->opCode, wsType->mask, wsType->N0, wsType->N); #endif return retCode; } _rStatus = WsHeadSize; return Success; default: LOG_WARN("Node(%p) WsTcp(%p) Default None. _rStatus:%d.", getConnectNode(), this, _rStatus); break; } // LOG_DEBUG("receiveFullWebSocketFrame done."); return Success; } /** * @brief: 从Websocket帧中解析出HeaderSize * @return: 成功则返回收到的字节数, 失败则返回负值. */ int WebSocketTcp::decodeHeaderSizeWebSocketFrame(uint8_t* buffer, size_t length, WebSocketHeaderType* wsType) { if (length < 2) { return -(InvalidWsFrameHeaderSize); /* Need at least 2 */ } const uint8_t* data = buffer; // peek, but don't consume /* FIN: 0bit 表示是否为最后帧 */ wsType->fin = (data[0] & 0x80) == 0x80; /* Opcode: 4-7bit */ wsType->opCode = (WebSocketHeaderType::OpCodeType)(data[0] & 0x0f); /* Mask: 8bit */ wsType->mask = (data[1] & 0x80) == 0x80; /* N0(Payload length): 9-15bit 数据载荷的长度,单位是字节. * 为0~126:数据的长度为x字节. * 为126:后续2个字节代表一个16位的无符号整数,该无符号整数的值为数据的长度. * 为127:后续8个字节代表一个64位的无符号整数(最高位为0),该无符号整数的值为数据的长度. */ wsType->N0 = (data[1] & 0x7f); wsType->headerSize = 2 + (wsType->N0 == 126 ? 2 : 0) + (wsType->N0 == 127 ? 8 : 0) + (wsType->mask ? 4 : 0); if (wsType->headerSize == 2) { LOG_DEBUG( "WsTcp(%p) wsType->headerSize: %d, opCode: %d, fin: %d, N0: %d, mask: " "%d", this, wsType->headerSize, wsType->opCode, wsType->fin, wsType->N0, wsType->mask); } return Success; } /** * @brief: 从Websocket帧中解析出HeaderBody * @return: 成功则返回收到的字节数, 失败则返回负值. */ int WebSocketTcp::decodeHeaderBodyWebSocketFrame(uint8_t* buffer, size_t length, WebSocketHeaderType* wsType) { if (wsType->headerSize >= length) { return -(InvalidWsFrameHeaderBody); } if (wsType->N0 < 126) { wsType->N = wsType->N0; } else if (wsType->N0 == 126) { wsType->N = 0; wsType->N |= ((uint64_t)(*(buffer + 2))) << 8; wsType->N |= ((uint64_t)(*(buffer + 3))) << 0; } else if (wsType->N0 == 127) { wsType->N = 0; wsType->N |= ((uint64_t)(*(buffer + 2))) << 56; wsType->N |= ((uint64_t)(*(buffer + 3))) << 48; wsType->N |= ((uint64_t)(*(buffer + 4))) << 40; wsType->N |= ((uint64_t)(*(buffer + 5))) << 32; wsType->N |= ((uint64_t)(*(buffer + 6))) << 24; wsType->N |= ((uint64_t)(*(buffer + 7))) << 16; wsType->N |= ((uint64_t)(*(buffer + 8))) << 8; wsType->N |= ((uint64_t)(*(buffer + 9))) << 0; } if (wsType->mask) { wsType->masKingKey[0] = ((uint8_t)(*(buffer + 0))) << 0; wsType->masKingKey[1] = ((uint8_t)(*(buffer + 1))) << 0; wsType->masKingKey[2] = ((uint8_t)(*(buffer + 2))) << 0; wsType->masKingKey[3] = ((uint8_t)(*(buffer + 3))) << 0; } else { wsType->masKingKey[0] = 0; wsType->masKingKey[1] = 0; wsType->masKingKey[2] = 0; wsType->masKingKey[3] = 0; } // LOG_DEBUG("wsType->N: %d", wsType->N); return Success; } /** * @brief: 从Websocket帧中解析出FrameBody * @return: 成功则返回收到的字节数, 失败则返回负值. */ int WebSocketTcp::decodeFrameBodyWebSocketFrame(uint8_t* buffer, size_t length, WebSocketHeaderType* wsType, WebSocketFrame* receivedData) { if (wsType->opCode == WebSocketHeaderType::PONG) { return Success; } if ((wsType->N + wsType->headerSize) > length) { #ifdef ENABLE_NLS_DEBUG_3 LOG_ERROR("Size: wsType->N(%d) + wsType->headerSize(%u) length(%zu)", wsType->N, wsType->headerSize, length); #endif return -(InvalidWsFrameBody); } // LOG_DEBUG("Size: %d %d %d", wsType->N, wsType->headerSize, length); if (wsType->opCode == WebSocketHeaderType::TEXT_FRAME || wsType->opCode == WebSocketHeaderType::BINARY_FRAME || wsType->opCode == WebSocketHeaderType::CONTINUATION) { if (wsType->mask) { for (size_t i = 0; i != wsType->N; ++i) { *(buffer + i + wsType->headerSize) ^= wsType->masKingKey[i & 0x3]; } } if (receivedData->data == NULL) { receivedData->type = wsType->opCode; } receivedData->data = (buffer + wsType->headerSize); receivedData->length = (size_t)wsType->N; } else if (wsType->opCode == WebSocketHeaderType::PING) { return -(InvalidWsFrameBody); } else if (wsType->opCode == WebSocketHeaderType::CLOSE) { StatusCode code; code.frame[0] = *(buffer + 2); code.frame[1] = *(buffer + 3); int recode = ntohs(code.status); if (receivedData->data == NULL) { receivedData->type = wsType->opCode; receivedData->closeCode = recode; } receivedData->data = (buffer + wsType->headerSize + 2); receivedData->length = (size_t)wsType->N; } if (wsType->opCode == WebSocketHeaderType::TEXT_FRAME) { // pass } else { // LOG_DEBUG("Decoder Receive Data: %zu ", receivedData->length); }; LOG_DEBUG( "WsTcp(%p) Decoder received data opCode:%d dataType:%d dataLength:%d.", this, wsType->opCode, receivedData->type, receivedData->length); return Success; } int WebSocketTcp::binaryFrame(const uint8_t* buffer, size_t length, uint8_t** frame, size_t* frameSize) { return framePackage(WebSocketHeaderType::BINARY_FRAME, buffer, length, frame, frameSize); } int WebSocketTcp::textFrame(const uint8_t* buffer, size_t length, uint8_t** frame, size_t* frameSize) { return framePackage(WebSocketHeaderType::TEXT_FRAME, buffer, length, frame, frameSize); } int WebSocketTcp::pingFrame(uint8_t** frame, size_t* frameSize) { return framePackage(WebSocketHeaderType::PING, NULL, 0, frame, frameSize); } int WebSocketTcp::framePackage(WebSocketHeaderType::OpCodeType codeType, const uint8_t* buffer, size_t length, uint8_t** frame, size_t* frameSize) { bool useMask = true; const uint8_t masKingKey[4] = {0x12, 0x34, 0x56, 0x78}; const int headlen = 2 + (length >= 126 ? 2 : 0) + (length >= 65536 ? 6 : 0) + (useMask ? 4 : 0); uint8_t* header = (uint8_t*)calloc(headlen, sizeof(uint8_t)); // new uint8_t[headlen]; if (header == NULL) { LOG_ERROR("WsTcp(%p) calloc header failed.", this); return -(MallocFailed); } header[0] = 0x80 | codeType; if (length < 126) { header[1] = (length & 0xff) | (useMask ? 0x80 : 0); if (useMask) { header[2] = masKingKey[0]; header[3] = masKingKey[1]; header[4] = masKingKey[2]; header[5] = masKingKey[3]; } } else if (length < 65536) { header[1] = 126 | (useMask ? 0x80 : 0); header[2] = (length >> 8) & 0xff; header[3] = (length >> 0) & 0xff; if (useMask) { header[4] = masKingKey[0]; header[5] = masKingKey[1]; header[6] = masKingKey[2]; header[7] = masKingKey[3]; } } else { // TODO: run coverage testing here header[1] = 127 | (useMask ? 0x80 : 0); header[2] = ((uint64_t)length >> 56) & 0xff; header[3] = ((uint64_t)length >> 48) & 0xff; header[4] = ((uint64_t)length >> 40) & 0xff; header[5] = ((uint64_t)length >> 32) & 0xff; header[6] = ((uint64_t)length >> 24) & 0xff; header[7] = ((uint64_t)length >> 16) & 0xff; header[8] = ((uint64_t)length >> 8) & 0xff; header[9] = ((uint64_t)length >> 0) & 0xff; if (useMask) { header[10] = masKingKey[0]; header[11] = masKingKey[1]; header[12] = masKingKey[2]; header[13] = masKingKey[3]; } } // N.B. - tmpBuffer will keep growing until it can be transmitted over the // socket: uint8_t * tmpBuffer = new uint8_t[headlen + length]; // memset(tmpBuffer, 0, sizeof(uint8_t) * (headlen + length)); // memcpy(tmpBuffer, header, headlen); // memcpy(tmpBuffer + headlen, (uint8_t*)buffer, length); *frameSize = (headlen + length); *frame = (uint8_t*)calloc(*frameSize, sizeof(uint8_t)); if (*frame == NULL) { LOG_ERROR("WsTcp(%p) calloc frame failed.", this); free(header); return -(MallocFailed); } memcpy(*frame, header, headlen); if (buffer && length > 0) { memcpy(*frame + headlen, (uint8_t*)buffer, length); } #ifdef OPU_DEBUG std::ofstream ofs; ofs.open("./out.opus", std::ios::out | std::ios::app | std::ios::binary); if (ofs.is_open() && buffer) { ofs.write((const char*)buffer, length); ofs.flush(); ofs.close(); } #endif // LOG_DEBUG("framePackage Receive Data: %d ", *frameSize); if (useMask) { for (size_t i = 0; i != length; ++i) { *(*frame + headlen + length - length + i) ^= masKingKey[i & 0x3]; } } free(header); header = NULL; return Success; } } // namespace AlibabaNls