in include/ylt/standalone/cinatra/coro_http_connection.hpp [643:761]
async_simple::coro::Lazy<websocket_result> read_websocket() {
auto [ec, ws_hd_size] = co_await async_read(head_buf_, SHORT_HEADER);
websocket_result result{ec};
if (ec) {
close();
co_return result;
}
while (true) {
const char *data_ptr = asio::buffer_cast<const char *>(head_buf_.data());
auto status = ws_.parse_header(data_ptr, ws_.len_bytes());
if (status == ws_header_status::complete) {
ws_.reset_len_bytes();
head_buf_.consume(head_buf_.size());
std::span<char> payload{};
auto payload_length = ws_.payload_length();
if (max_part_size_ != 0 && payload_length > max_part_size_) {
std::string close_reason = "message_too_big";
std::string close_msg = ws_.format_close_payload(
close_code::too_big, close_reason.data(), close_reason.size());
co_await write_websocket(close_msg, opcode::close);
close();
result.ec = std::error_code(asio::error::message_size,
asio::error::get_system_category());
break;
}
if (payload_length > 0) {
detail::resize(body_, payload_length);
auto [ec, read_sz] =
co_await async_read(asio::buffer(body_), payload_length);
if (ec) {
close();
result.ec = ec;
break;
}
payload = body_;
}
ws_frame_type type = ws_.parse_payload(payload);
switch (type) {
case cinatra::ws_frame_type::WS_ERROR_FRAME:
close();
result.ec = std::make_error_code(std::errc::protocol_error);
break;
case cinatra::ws_frame_type::WS_OPENING_FRAME:
continue;
case ws_frame_type::WS_INCOMPLETE_TEXT_FRAME:
case ws_frame_type::WS_INCOMPLETE_BINARY_FRAME:
result.eof = false;
result.data = {payload.data(), payload.size()};
break;
case cinatra::ws_frame_type::WS_TEXT_FRAME:
case cinatra::ws_frame_type::WS_BINARY_FRAME: {
#ifdef CINATRA_ENABLE_GZIP
if (!gzip_compress(payload, result)) {
break;
}
#endif
result.eof = true;
result.data = {payload.data(), payload.size()};
} break;
case cinatra::ws_frame_type::WS_CLOSE_FRAME: {
#ifdef CINATRA_ENABLE_GZIP
if (!gzip_compress(payload, result)) {
break;
}
#endif
close_frame close_frame =
ws_.parse_close_payload(payload.data(), payload.size());
result.eof = true;
result.data = {close_frame.message, close_frame.length};
std::string close_msg = ws_.format_close_payload(
close_code::normal, close_frame.message, close_frame.length);
co_await write_websocket(close_msg, opcode::close);
close();
} break;
case cinatra::ws_frame_type::WS_PING_FRAME: {
result.data = {payload.data(), payload.size()};
auto ec = co_await write_websocket(result.data, opcode::pong);
if (ec) {
close();
result.ec = ec;
}
} break;
case cinatra::ws_frame_type::WS_PONG_FRAME: {
result.data = {payload.data(), payload.size()};
auto ec = co_await write_websocket(result.data, opcode::ping);
result.ec = ec;
} break;
default:
break;
}
result.type = type;
co_return result;
}
else if (status == ws_header_status::incomplete) {
auto [ec, sz] = co_await async_read(head_buf_, ws_.left_header_len());
if (ec) {
close();
result.ec = ec;
break;
}
continue;
}
else {
close();
result.ec = std::make_error_code(std::errc::protocol_error);
co_return result;
}
}
co_return result;
}