export function getEventSource()

in src/js/builtins/EventSource.ts [25:498]


export function getEventSource() {
  type Socket = Awaited<ReturnType<typeof Bun.connect<EventSource>>>;

  class EventSource extends EventTarget {
    #url;
    #state;
    #onerror;
    #onmessage;
    #onopen;
    #is_tls = false;
    #socket: Socket | null = null;
    #data_buffer = "";
    #send_buffer = "";
    #lastEventID = "";
    #reconnect = true;
    #content_length = 0; // 0 means chunked -1 means not informed aka no auto end
    #received_length = 0;
    #reconnection_time = 0;
    #reconnection_timer: Timer | null = null;

    static #ConnectNextTick(self: EventSource) {
      self.#connect();
    }
    static #SendRequest(socket: Socket, url: URL) {
      const self = socket.data;
      const last_event_header = self.#lastEventID ? `Last-Event-ID: ${self.#lastEventID}\r\n` : "";
      const request = `GET ${url.pathname}${url.search} HTTP/1.1\r\nHost: bun\r\nContent-type: text/event-stream\r\nContent-length: 0\r\n${last_event_header}\r\n`;
      const sended = socket.write(request);
      if (sended !== request.length) {
        self.#send_buffer = request.substring(sended);
      }
    }

    static #ProcessChunk(self: EventSource, chunks: string, offset: number) {
      for (;;) {
        if (offset >= chunks.length) {
          return;
        }
        let chunk_end_idx = -1;
        let start_idx = chunks.indexOf("\r\n", offset);
        const chunk_start_idx = start_idx + 2;
        if (start_idx > 0) {
          if (self.#content_length === 0) {
            const chunk_size = parseInt(chunks.substring(offset, start_idx), 16);
            if (chunk_size === 0) {
              // no more chunks
              self.#state = 2;
              self.#socket?.end();
              return;
            }
            chunk_end_idx = chunk_start_idx + chunk_size;
          } else {
            //not chunked
            chunk_end_idx = chunks.length;
          }
        } else {
          // wait for the chunk if is chunked
          if (self.#data_buffer.length === 0) {
            self.#data_buffer += chunks.substring(offset);
            return;
          }
          chunk_end_idx = chunks.length;
        }

        // check for chunk end
        let chunk = chunks.substring(chunk_start_idx, chunk_end_idx);
        offset = chunk_end_idx + 2;
        let chunk_offset = 0;
        // wait for data end
        let event_idx = chunk.indexOf("\n\n");
        if (event_idx == -1) {
          // wait for more data
          self.#data_buffer += chunks.substring(chunk_start_idx);
          return;
        }

        // combine data
        if (self.#data_buffer.length) {
          self.#data_buffer += chunk;
          chunk = self.#data_buffer;
          self.#data_buffer = "";
        }

        let more_events = true;
        while (more_events) {
          const event_data = chunk.substring(chunk_offset, event_idx);

          let type;
          let data = "";
          let id;
          let event_line_idx = 0;
          let retry = -1;
          for (;;) {
            let idx = event_data.indexOf("\n", event_line_idx);
            if (idx === -1) {
              if (event_line_idx >= event_data.length) {
                break;
              }
              idx = event_data.length;
            }
            const line = event_data.substring(event_line_idx, idx);
            if (line.startsWith("data:")) {
              if (data.length) {
                data += `\n${line.substring(5).trim()}`;
              } else {
                data = line.substring(5).trim();
              }
            } else if (line.startsWith("event:")) {
              type = line.substring(6).trim();
            } else if (line.startsWith("id:")) {
              id = line.substring(3).trim();
            } else if (line.startsWith("retry:")) {
              retry = parseInt(line.substring(6).trim(), 10);
              if (retry !== retry) {
                retry = -1;
              }
            }
            event_line_idx = idx + 1;
          }
          self.#lastEventID = id || "";
          if (retry >= 0) {
            self.#reconnection_time = retry;
          }

          if (data || id || type) {
            self.dispatchEvent(
              new MessageEvent(type || "message", {
                data: data || "",
                origin: self.#url.origin,
                // @ts-ignore
                source: self,
                lastEventId: id,
              }),
            );
          }

          // no more events
          if (chunk.length === event_idx + 2) {
            more_events = false;
            break;
          }

          const next_event_idx = chunk.indexOf("\n\n", event_idx + 1);
          if (next_event_idx === -1) {
            break;
          }
          chunk_offset = event_idx;
          event_idx = next_event_idx;
        }
      }
    }
    static #Handlers = {
      open(socket: Socket) {
        const self = socket.data;
        self.#socket = socket;
        if (!self.#is_tls) {
          EventSource.#SendRequest(socket, self.#url);
        }
      },
      handshake(socket: Socket, success: boolean, verifyError: Error) {
        const self = socket.data;
        if (success) {
          EventSource.#SendRequest(socket, self.#url);
        } else {
          self.#state = 2;
          self.dispatchEvent(new ErrorEvent("error", { error: verifyError }));
          socket.end();
        }
      },
      data(socket: Socket, buffer: Buffer) {
        const self = socket.data;
        switch (self.#state) {
          case 0: {
            let text = buffer.toString();
            const headers_idx = text.indexOf("\r\n\r\n");
            if (headers_idx === -1) {
              // wait headers
              self.#data_buffer += text;
              return;
            }

            if (self.#data_buffer.length) {
              self.#data_buffer += text;
              text = self.#data_buffer;
              self.#data_buffer = "";
            }
            const headers = text.substring(0, headers_idx);
            const status_idx = headers.indexOf("\r\n");

            if (status_idx === -1) {
              self.#state = 2;
              self.dispatchEvent(new ErrorEvent("error", { error: new Error("Invalid HTTP request") }));
              socket.end();
              return;
            }
            const status = headers.substring(0, status_idx);
            if (status !== "HTTP/1.1 200 OK") {
              self.#state = 2;
              self.dispatchEvent(new ErrorEvent("error", { error: new Error(status) }));
              socket.end();
              return;
            }

            let start_idx = status_idx + 1;
            let mime_type_ok = false;
            let content_length = -1;
            for (;;) {
              let header_idx = headers.indexOf("\r\n", start_idx);
              // No text/event-stream mime type
              if (header_idx === -1) {
                if (start_idx >= headers.length) {
                  if (!mime_type_ok) {
                    self.#state = 2;
                    self.dispatchEvent(
                      new ErrorEvent("error", {
                        error: new Error(
                          `EventSource's response has no MIME type and "text/event-stream" is required. Aborting the connection.`,
                        ),
                      }),
                    );
                    socket.end();
                  }
                  return;
                }

                header_idx = headers.length;
              }

              const header = headers.substring(start_idx + 1, header_idx);
              const header_name_idx = header.indexOf(":");
              const header_name = header.substring(0, header_name_idx);
              const is_content_type =
                header_name.localeCompare("content-type", undefined, { sensitivity: "accent" }) === 0;
              start_idx = header_idx + 1;

              if (is_content_type) {
                if (header.endsWith(" text/event-stream")) {
                  mime_type_ok = true;
                } else {
                  // wrong mime type
                  self.#state = 2;
                  self.dispatchEvent(
                    new ErrorEvent("error", {
                      error: new Error(
                        `EventSource's response has a MIME type that is not "text/event-stream". Aborting the connection.`,
                      ),
                    }),
                  );
                  socket.end();
                  return;
                }
              } else {
                const is_content_length =
                  header_name.localeCompare("content-length", undefined, { sensitivity: "accent" }) === 0;
                if (is_content_length) {
                  content_length = parseInt(header.substring(header_name_idx + 1).trim(), 10);
                  if (content_length !== content_length || content_length <= 0) {
                    self.dispatchEvent(
                      new ErrorEvent("error", {
                        error: new Error(`EventSource's Content-Length is invalid. Aborting the connection.`),
                      }),
                    );
                    socket.end();
                    return;
                  }
                  if (mime_type_ok) {
                    break;
                  }
                } else {
                  const is_transfer_encoding =
                    header_name.localeCompare("transfer-encoding", undefined, { sensitivity: "accent" }) === 0;
                  if (is_transfer_encoding) {
                    if (header.substring(header_name_idx + 1).trim() !== "chunked") {
                      self.dispatchEvent(
                        new ErrorEvent("error", {
                          error: new Error(`EventSource's Transfer-Encoding is invalid. Aborting the connection.`),
                        }),
                      );
                      socket.end();
                      return;
                    }
                    content_length = 0;
                    if (mime_type_ok) {
                      break;
                    }
                  }
                }
              }
            }

            self.#content_length = content_length;
            self.#state = 1;
            self.dispatchEvent(new Event("open"));
            const chunks = text.substring(headers_idx + 4);
            EventSource.#ProcessChunk(self, chunks, 0);
            if (self.#content_length > 0) {
              self.#received_length += chunks.length;
              if (self.#received_length >= self.#content_length) {
                self.#state = 2;
                socket.end();
              }
            }
            return;
          }
          case 1:
            EventSource.#ProcessChunk(self, buffer.toString(), 2);
            if (self.#content_length > 0) {
              self.#received_length += buffer.byteLength;
              if (self.#received_length >= self.#content_length) {
                self.#state = 2;
                socket.end();
              }
            }
            return;
          default:
            break;
        }
      },
      drain(socket: Socket) {
        const self = socket.data;
        if (self.#state === 0) {
          const request = self.#data_buffer;
          if (request.length) {
            const sended = socket.write(request);
            if (sended !== request.length) {
              socket.data.#send_buffer = request.substring(sended);
            } else {
              socket.data.#send_buffer = "";
            }
          }
        }
      },
      close: EventSource.#Close,
      end(socket: Socket) {
        EventSource.#Close(socket).dispatchEvent(
          new ErrorEvent("error", { error: new Error("Connection closed by server") }),
        );
      },
      timeout(socket: Socket) {
        EventSource.#Close(socket).dispatchEvent(new ErrorEvent("error", { error: new Error("Timeout") }));
      },
      binaryType: "buffer",
    };

    static #Close(socket: Socket) {
      const self = socket.data;
      self.#socket = null;
      self.#received_length = 0;
      self.#state = 2;
      if (self.#reconnect) {
        if (self.#reconnection_timer) {
          clearTimeout(self.#reconnection_timer);
        }
        self.#reconnection_timer = setTimeout(EventSource.#ConnectNextTick, self.#reconnection_time, self);
      }
      return self;
    }
    constructor(url: string, options = undefined) {
      super();
      const uri = new URL(url);
      this.#is_tls = uri.protocol === "https:";
      this.#url = uri;
      this.#state = 2;
      process.nextTick(EventSource.#ConnectNextTick, this);
    }

    // Not web standard
    ref() {
      this.#reconnection_timer?.ref();
      this.#socket?.ref();
    }

    // Not web standard
    unref() {
      this.#reconnection_timer?.unref();
      this.#socket?.unref();
    }

    #connect() {
      if (this.#state !== 2) return;
      const uri = this.#url;
      const is_tls = this.#is_tls;
      this.#state = 0;
      //@ts-ignore
      Bun.connect({
        data: this,
        socket: EventSource.#Handlers,
        hostname: uri.hostname,
        port: parseInt(uri.port || (is_tls ? "443" : "80"), 10),
        tls: is_tls
          ? {
              requestCert: true,
              rejectUnauthorized: false,
            }
          : false,
      }).catch(err => {
        super.dispatchEvent(new ErrorEvent("error", { error: err }));
        if (this.#reconnect) {
          if (this.#reconnection_timer) {
            this.#reconnection_timer.unref?.();
          }

          this.#reconnection_timer = setTimeout(EventSource.#ConnectNextTick, 1000, this);
        }
      });
    }

    get url() {
      return this.#url.href;
    }

    get readyState() {
      return this.#state;
    }

    close() {
      this.#reconnect = false;
      this.#state = 2;
      this.#socket?.unref();
      this.#socket?.end();
    }

    get onopen() {
      return this.#onopen;
    }
    get onerror() {
      return this.#onerror;
    }
    get onmessage() {
      return this.#onmessage;
    }

    set onopen(cb) {
      if (this.#onopen) {
        super.removeEventListener("close", this.#onopen);
      }
      super.addEventListener("open", cb);
      this.#onopen = cb;
    }

    set onerror(cb) {
      if (this.#onerror) {
        super.removeEventListener("error", this.#onerror);
      }
      super.addEventListener("error", cb);
      this.#onerror = cb;
    }

    set onmessage(cb) {
      if (this.#onmessage) {
        super.removeEventListener("message", this.#onmessage);
      }
      super.addEventListener("message", cb);
      this.#onmessage = cb;
    }
  }

  Object.defineProperty(EventSource.prototype, "CONNECTING", {
    enumerable: true,
    value: 0,
  });

  Object.defineProperty(EventSource.prototype, "OPEN", {
    enumerable: true,
    value: 1,
  });

  Object.defineProperty(EventSource.prototype, "CLOSED", {
    enumerable: true,
    value: 2,
  });

  return EventSource;
}