in packages/bun-uws/src/WebSocketContext.h [60:239]
static bool handleFragment(char *data, size_t length, unsigned int remainingBytes, int opCode, bool fin, WebSocketState<isServer> *webSocketState, void *s) {
/* WebSocketData and WebSocketContextData */
WebSocketContextData<SSL, USERDATA> *webSocketContextData = (WebSocketContextData<SSL, USERDATA> *) us_socket_context_ext(SSL, us_socket_context(SSL, (us_socket_t *) s));
WebSocketData *webSocketData = (WebSocketData *) us_socket_ext(SSL, (us_socket_t *) s);
/* Is this a non-control frame? */
if (opCode < 3) {
/* Did we get everything in one go? */
if (!remainingBytes && fin && !webSocketData->fragmentBuffer.length()) {
/* Handle compressed frame */
if (webSocketData->compressionStatus == WebSocketData::CompressionStatus::COMPRESSED_FRAME) {
webSocketData->compressionStatus = WebSocketData::CompressionStatus::ENABLED;
LoopData *loopData = (LoopData *) us_loop_ext(us_socket_context_loop(SSL, us_socket_context(SSL, (us_socket_t *) s)));
/* Decompress using shared or dedicated decompressor */
std::optional<std::string_view> inflatedFrame;
if (webSocketData->inflationStream) {
inflatedFrame = webSocketData->inflationStream->inflate(loopData->zlibContext, {data, length}, webSocketContextData->maxPayloadLength, false);
} else {
inflatedFrame = loopData->inflationStream->inflate(loopData->zlibContext, {data, length}, webSocketContextData->maxPayloadLength, true);
}
if (!inflatedFrame.has_value()) {
forceClose(webSocketState, s, ERR_TOO_BIG_MESSAGE_INFLATION);
return true;
} else {
data = (char *) inflatedFrame->data();
length = inflatedFrame->length();
}
}
/* Check text messages for Utf-8 validity */
if (opCode == 1 && !protocol::isValidUtf8((unsigned char *) data, length)) {
forceClose(webSocketState, s, ERR_INVALID_TEXT);
return true;
}
/* Emit message event & break if we are closed or shut down when returning */
if (webSocketContextData->messageHandler) {
webSocketContextData->messageHandler((WebSocket<SSL, isServer, USERDATA> *) s, std::string_view(data, length), (OpCode) opCode);
if (us_socket_is_closed(SSL, (us_socket_t *) s) || webSocketData->isShuttingDown) {
return true;
}
}
} else {
/* Allocate fragment buffer up front first time */
if (!webSocketData->fragmentBuffer.length()) {
webSocketData->fragmentBuffer.reserve(length + remainingBytes);
}
/* Fragments forming a big message are not caught until appending them */
if (refusePayloadLength(length + webSocketData->fragmentBuffer.length(), webSocketState, s)) {
forceClose(webSocketState, s, ERR_TOO_BIG_MESSAGE);
return true;
}
webSocketData->fragmentBuffer.append(data, length);
/* Are we done now? */
// todo: what if we don't have any remaining bytes yet we are not fin? forceclose!
if (!remainingBytes && fin) {
/* Handle compression */
if (webSocketData->compressionStatus == WebSocketData::CompressionStatus::COMPRESSED_FRAME) {
webSocketData->compressionStatus = WebSocketData::CompressionStatus::ENABLED;
/* 9 bytes of padding for libdeflate, 4 for zlib */
webSocketData->fragmentBuffer.append("123456789");
LoopData *loopData = (LoopData *) us_loop_ext(
us_socket_context_loop(SSL,
us_socket_context(SSL, (us_socket_t *) s)
)
);
/* Decompress using shared or dedicated decompressor */
std::optional<std::string_view> inflatedFrame;
if (webSocketData->inflationStream) {
inflatedFrame = webSocketData->inflationStream->inflate(loopData->zlibContext, {webSocketData->fragmentBuffer.data(), webSocketData->fragmentBuffer.length() - 9}, webSocketContextData->maxPayloadLength, false);
} else {
inflatedFrame = loopData->inflationStream->inflate(loopData->zlibContext, {webSocketData->fragmentBuffer.data(), webSocketData->fragmentBuffer.length() - 9}, webSocketContextData->maxPayloadLength, true);
}
if (!inflatedFrame.has_value()) {
forceClose(webSocketState, s, ERR_TOO_BIG_MESSAGE_INFLATION);
return true;
} else {
data = (char *) inflatedFrame->data();
length = inflatedFrame->length();
}
} else {
// reset length and data ptrs
length = webSocketData->fragmentBuffer.length();
data = webSocketData->fragmentBuffer.data();
}
/* Check text messages for Utf-8 validity */
if (opCode == 1 && !protocol::isValidUtf8((unsigned char *) data, length)) {
forceClose(webSocketState, s, ERR_INVALID_TEXT);
return true;
}
/* Emit message and check for shutdown or close */
if (webSocketContextData->messageHandler) {
webSocketContextData->messageHandler((WebSocket<SSL, isServer, USERDATA> *) s, std::string_view(data, length), (OpCode) opCode);
if (us_socket_is_closed(SSL, (us_socket_t *) s) || webSocketData->isShuttingDown) {
return true;
}
}
/* If we shutdown or closed, this will be taken care of elsewhere */
webSocketData->fragmentBuffer.clear();
}
}
} else {
/* Control frames need the websocket to send pings, pongs and close */
WebSocket<SSL, isServer, USERDATA> *webSocket = (WebSocket<SSL, isServer, USERDATA> *) s;
if (!remainingBytes && fin && !webSocketData->controlTipLength) {
if (opCode == CLOSE) {
auto closeFrame = protocol::parseClosePayload(data, length);
webSocket->end(closeFrame.code, std::string_view(closeFrame.message, closeFrame.length));
return true;
} else {
if (opCode == PING) {
webSocket->send(std::string_view(data, length), (OpCode) OpCode::PONG);
if (webSocketContextData->pingHandler) {
webSocketContextData->pingHandler(webSocket, {data, length});
if (us_socket_is_closed(SSL, (us_socket_t *) s) || webSocketData->isShuttingDown) {
return true;
}
}
} else if (opCode == PONG) {
if (webSocketContextData->pongHandler) {
webSocketContextData->pongHandler(webSocket, {data, length});
if (us_socket_is_closed(SSL, (us_socket_t *) s) || webSocketData->isShuttingDown) {
return true;
}
}
}
}
} else {
/* Here we never mind any size optimizations as we are in the worst possible path */
webSocketData->fragmentBuffer.append(data, length);
webSocketData->controlTipLength += (unsigned int) length;
if (!remainingBytes && fin) {
char *controlBuffer = (char *) webSocketData->fragmentBuffer.data() + webSocketData->fragmentBuffer.length() - webSocketData->controlTipLength;
if (opCode == CLOSE) {
protocol::CloseFrame closeFrame = protocol::parseClosePayload(controlBuffer, webSocketData->controlTipLength);
webSocket->end(closeFrame.code, std::string_view(closeFrame.message, closeFrame.length));
return true;
} else {
if (opCode == PING) {
webSocket->send(std::string_view(controlBuffer, webSocketData->controlTipLength), (OpCode) OpCode::PONG);
if (webSocketContextData->pingHandler) {
webSocketContextData->pingHandler(webSocket, std::string_view(controlBuffer, webSocketData->controlTipLength));
if (us_socket_is_closed(SSL, (us_socket_t *) s) || webSocketData->isShuttingDown) {
return true;
}
}
} else if (opCode == PONG) {
if (webSocketContextData->pongHandler) {
webSocketContextData->pongHandler(webSocket, std::string_view(controlBuffer, webSocketData->controlTipLength));
if (us_socket_is_closed(SSL, (us_socket_t *) s) || webSocketData->isShuttingDown) {
return true;
}
}
}
}
/* Same here, we do not care for any particular smart allocation scheme */
webSocketData->fragmentBuffer.resize((unsigned int) webSocketData->fragmentBuffer.length() - webSocketData->controlTipLength);
webSocketData->controlTipLength = 0;
}
}
}
return false;
}