int LibwebsocketsWebsocket::Callback()

in chime-sdk-signaling-cpp/src/websocket/libwebsockets_websocket.cc [64:179]


int LibwebsocketsWebsocket::Callback(struct lws* wsi, enum lws_callback_reasons reason, void* user, void* in,
                                     size_t len) {
  auto* self = static_cast<LibwebsocketsWebsocket*>(user);

  switch (reason) {
    case LWS_CALLBACK_CLIENT_CONNECTION_ERROR: {
      const char* error = in ? static_cast<char*>(in) : "(null)";
      self->HandleError(std::string("Error while trying to connect the websocket: ") + std::string(error));

      // Retry connection attempt according to retry policy.
      if (lws_retry_sul_schedule_retry_wsi(wsi, &(self->retry_connect_.sul), self->RetryConnect,
                                           &(self->connection_retry_count_))) {
        lwsl_err("Connection attempts exhausted.\n");
        // Return non-zero to close the connection.
        return -1;
      }
      return 0;
    }

    case LWS_CALLBACK_CLIENT_RECEIVE: {
      lwsl_debug("Data received.\n");
      lwsl_hexdump_debug(in, len);
      const size_t remaining = lws_remaining_packet_payload(wsi);
      auto* uint8_ptr = static_cast<uint8_t*>(in);
      // Messages can be fragmented if the size exceeds max bytes
      // Therefore, it needs to handle fragmented message.
      self->received_data_buffer_.insert(self->received_data_buffer_.end(), uint8_ptr, uint8_ptr + len);
      if (!remaining && lws_is_final_fragment(wsi)) {
        self->observer_->OnWebsocketBinaryReceived(self->received_data_buffer_);
        self->received_data_buffer_.clear();
      }
      break;
    }

    case LWS_CALLBACK_CLIENT_ESTABLISHED: {
      lwsl_info("Handshake complete. Successfully upgraded to websocket.\n");
      self->observer_->OnWebsocketConnected();
      break;
    }

    case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE: {
      self->close_code_ = *(static_cast<uint16_t*>(in));
      lwsl_info("Server initiated connection close. Close code: %hu\n", self->close_code_);
      lwsl_hexdump_debug(in, len);
      // By returning zero, Libwebsockets will echo the close back to the server, then close.
      return 0;
    }

    case LWS_CALLBACK_CLIENT_CLOSED: {
      std::string description;
      if (self->close_code_ != 0) {
        description = std::string("Websocket closed with status: ") + std::to_string(self->close_code_);
      } else {
        description = "Websocket closed.";
      }
      lwsl_info("%s", description.c_str());

      WebsocketStatus status;
      status.description = description;
      self->observer_->OnWebsocketClosed(status);
      break;
    }

    case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER: {
      lwsl_info("Appending additional headers.\n");
      unsigned char** data_pointer = static_cast<unsigned char**>(in);
      unsigned char* end_of_data = (*data_pointer) + len;
      for (const auto& additional_header : self->configuration_.additional_headers) {
        if (lws_add_http_header_by_name(wsi, reinterpret_cast<const unsigned char*>(additional_header.first.c_str()),
                                        reinterpret_cast<const unsigned char*>(additional_header.second.c_str()),
                                        additional_header.second.size(), data_pointer, end_of_data)) {
          self->HandleError("Failed to add header: " + additional_header.first + " " + additional_header.second);

          // Return non-zero to close the connection.
          return -1;
        }
      }
      break;
    }
    case LWS_CALLBACK_CLIENT_WRITEABLE: {
      // TODO: Check if this can be also fragmented
      if (!self->message_queue_.empty()) {
        std::vector<uint8_t> data = self->message_queue_.front();
        self->message_queue_.pop();

        // Prepend space required by Libwebsockets.
        std::vector<uint8_t> data_with_prefix(LWS_PRE + data.size(), 0);
        std::copy(data.begin(), data.end(), data_with_prefix.begin() + LWS_PRE);

        std::string data_string(data.begin(), data.end());
        lwsl_debug("Writing message with length %lu and data %s \n", data.size(), data_string.c_str());
        int bytes_written = lws_write(self->wsi_, &(data_with_prefix)[LWS_PRE], data.size(), LWS_WRITE_BINARY);
        if (bytes_written == -1) {
          self->HandleError("Fatal write error. Closing.");
          lwsl_hexdump_debug(in, len);

          // Return non-zero to close the connection.
          return -1;
        }
      }
      break;
    }
    case LWS_CALLBACK_COMPLETED_CLIENT_HTTP: {
      lwsl_info("Headers complete.");
      break;
    }

    default:
      lwsl_debug("Callback reason, %d, not handled.", static_cast<int>(reason));
      lwsl_hexdump_debug(in, len);
      break;
  }

  // Required by Libwebsockets for internal postprocessing.
  return lws_callback_http_dummy(wsi, reason, user, in, len);
}