in chime-sdk-signaling-cpp/src/websocket/libwebsockets_websocket.cc [64:179]
int LibwebsocketsWebsocket::Callback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in,
size_t len) {
auto* self = static_cast<LibwebsocketsWebsocket*>(user);
switch (reason) {
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: {
const char* error = in ? static_cast<char*>(in) : "(null)";
self->HandleError(std::string("Error while trying to connect the websocket: ") + std::string(error));
// Retry connection attempt according to retry policy.
if (lws_retry_sul_schedule_retry_wsi(wsi, &(self->retry_connect_.sul), self->RetryConnect,
&(self->connection_retry_count_))) {
lwsl_err("Connection attempts exhausted.\n");
// Return non-zero to close the connection.
return -1;
}
return 0;
}
case LWS_CALLBACK_CLIENT_RECEIVE: {
lwsl_debug("Data received.\n");
lwsl_hexdump_debug(in, len);
const size_t remaining = lws_remaining_packet_payload(wsi);
auto* uint8_ptr = static_cast<uint8_t*>(in);
// Messages can be fragmented if the size exceeds max bytes
// Therefore, it needs to handle fragmented message.
self->received_data_buffer_.insert(self->received_data_buffer_.end(), uint8_ptr, uint8_ptr + len);
if (!remaining && lws_is_final_fragment(wsi)) {
self->observer_->OnWebsocketBinaryReceived(self->received_data_buffer_);
self->received_data_buffer_.clear();
}
break;
}
case LWS_CALLBACK_CLIENT_ESTABLISHED: {
lwsl_info("Handshake complete. Successfully upgraded to websocket.\n");
self->observer_->OnWebsocketConnected();
break;
}
case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
self->close_code_ = *(static_cast<uint16_t*>(in));
lwsl_info("Server initiated connection close. Close code: %hu\n", self->close_code_);
lwsl_hexdump_debug(in, len);
// By returning zero, Libwebsockets will echo the close back to the server, then close.
return 0;
}
case LWS_CALLBACK_CLIENT_CLOSED: {
std::string description;
if (self->close_code_ != 0) {
description = std::string("Websocket closed with status: ") + std::to_string(self->close_code_);
} else {
description = "Websocket closed.";
}
lwsl_info("%s", description.c_str());
WebsocketStatus status;
status.description = description;
self->observer_->OnWebsocketClosed(status);
break;
}
case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: {
lwsl_info("Appending additional headers.\n");
unsigned char** data_pointer = static_cast<unsigned char**>(in);
unsigned char* end_of_data = (*data_pointer) + len;
for (const auto& additional_header : self->configuration_.additional_headers) {
if (lws_add_http_header_by_name(wsi, reinterpret_cast<const unsigned char*>(additional_header.first.c_str()),
reinterpret_cast<const unsigned char*>(additional_header.second.c_str()),
additional_header.second.size(), data_pointer, end_of_data)) {
self->HandleError("Failed to add header: " + additional_header.first + " " + additional_header.second);
// Return non-zero to close the connection.
return -1;
}
}
break;
}
case LWS_CALLBACK_CLIENT_WRITEABLE: {
// TODO: Check if this can be also fragmented
if (!self->message_queue_.empty()) {
std::vector<uint8_t> data = self->message_queue_.front();
self->message_queue_.pop();
// Prepend space required by Libwebsockets.
std::vector<uint8_t> data_with_prefix(LWS_PRE + data.size(), 0);
std::copy(data.begin(), data.end(), data_with_prefix.begin() + LWS_PRE);
std::string data_string(data.begin(), data.end());
lwsl_debug("Writing message with length %lu and data %s \n", data.size(), data_string.c_str());
int bytes_written = lws_write(self->wsi_, &(data_with_prefix)[LWS_PRE], data.size(), LWS_WRITE_BINARY);
if (bytes_written == -1) {
self->HandleError("Fatal write error. Closing.");
lwsl_hexdump_debug(in, len);
// Return non-zero to close the connection.
return -1;
}
}
break;
}
case LWS_CALLBACK_COMPLETED_CLIENT_HTTP: {
lwsl_info("Headers complete.");
break;
}
default:
lwsl_debug("Callback reason, %d, not handled.", static_cast<int>(reason));
lwsl_hexdump_debug(in, len);
break;
}
// Required by Libwebsockets for internal postprocessing.
return lws_callback_http_dummy(wsi, reason, user, in, len);
}