export function createConnectTransport()

in packages/dubbo-web/src/dubbo-transport.ts [122:328]


export function createConnectTransport(
  options: ConnectTransportOptions
): Transport {
  assertFetchApi();
  const useBinaryFormat = options.useBinaryFormat ?? false;
  const fetch = options.fetch ?? globalThis.fetch;
  return {
    async unary<
      I extends Message<I> = AnyMessage,
      O extends Message<O> = AnyMessage
    >(
      service: ServiceType,
      method: MethodInfo<I, O>,
      signal: AbortSignal | undefined,
      timeoutMs: number | undefined,
      header: HeadersInit | undefined,
      message: PartialMessage<I>
    ): Promise<UnaryResponse<I, O>> {
      const { normalize, serialize, parse } = createClientMethodSerializers(
        method,
        useBinaryFormat,
        options.jsonOptions,
        options.binaryOptions
      );
      return await runUnaryCall<I, O>({
        interceptors: options.interceptors,
        signal,
        timeoutMs,
        req: {
          stream: false,
          service,
          method,
          url: createMethodUrl(options.baseUrl, service, method),
          init: {
            method: "POST",
            credentials: options.credentials ?? "same-origin",
            redirect: "error",
            mode: "cors",
          },
          header: requestHeader(
            method.kind,
            useBinaryFormat,
            timeoutMs,
            header
          ),
          message: normalize(message),
        },
        next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
          const useGet =
            options.useHttpGet === true &&
            method.idempotency === MethodIdempotency.NoSideEffects;
          let body: BodyInit | null = null;
          if (useGet) {
            req = transformConnectPostToGetRequest(
              req,
              serialize(req.message),
              useBinaryFormat
            );
          } else {
            body = serialize(req.message);
          }
          const response = await fetch(req.url, {
            ...req.init,
            headers: req.header,
            signal: req.signal,
            body,
          });
          const { isUnaryError, unaryError } = validateResponse(
            method.kind,
            response.status,
            response.headers
          );
          if (isUnaryError) {
            throw errorFromJson(
              (await response.json()) as JsonValue,
              appendHeaders(...trailerDemux(response.headers)),
              unaryError
            );
          }
          const [demuxedHeader, demuxedTrailer] = trailerDemux(
            response.headers
          );

          return <UnaryResponse<I, O>>{
            stream: false,
            service,
            method,
            header: demuxedHeader,
            message: useBinaryFormat
              ? parse(new Uint8Array(await response.arrayBuffer()))
              : method.O.fromJson(
                  (await response.json()) as JsonValue,
                  getJsonOptions(options.jsonOptions)
                ),
            trailer: demuxedTrailer,
          };
        },
      });
    },

    async stream<
      I extends Message<I> = AnyMessage,
      O extends Message<O> = AnyMessage
    >(
      service: ServiceType,
      method: MethodInfo<I, O>,
      signal: AbortSignal | undefined,
      timeoutMs: number | undefined,
      header: HeadersInit | undefined,
      input: AsyncIterable<I>
    ): Promise<StreamResponse<I, O>> {
      const { serialize, parse } = createClientMethodSerializers(
        method,
        useBinaryFormat,
        options.jsonOptions,
        options.binaryOptions
      );

      async function* parseResponseBody(
        body: ReadableStream<Uint8Array>,
        trailerTarget: Headers
      ) {
        const reader = createEnvelopeReadableStream(body).getReader();
        let endStreamReceived = false;
        for (;;) {
          const result = await reader.read();
          if (result.done) {
            break;
          }
          const { flags, data } = result.value;
          if ((flags & endStreamFlag) === endStreamFlag) {
            endStreamReceived = true;
            const endStream = endStreamFromJson(data);
            if (endStream.error) {
              throw endStream.error;
            }
            endStream.metadata.forEach((value, key) =>
              trailerTarget.set(key, value)
            );
            continue;
          }
          yield parse(data);
        }
        if (!endStreamReceived) {
          throw "missing EndStreamResponse";
        }
      }

      async function createRequestBody(
        input: AsyncIterable<I>
      ): Promise<Uint8Array> {
        if (method.kind != MethodKind.ServerStreaming) {
          throw "The fetch API does not support streaming request bodies";
        }
        const r = await input[Symbol.asyncIterator]().next();
        if (r.done == true) {
          throw "missing request message";
        }
        return encodeEnvelope(0, serialize(r.value));
      }
      return await runStreamingCall<I, O>({
        interceptors: options.interceptors,
        timeoutMs,
        signal,
        req: {
          stream: true,
          service,
          method,
          url: createMethodUrl(options.baseUrl, service, method),
          init: {
            method: "POST",
            credentials: options.credentials ?? "same-origin",
            redirect: "error",
            mode: "cors",
          },
          header: requestHeader(
            method.kind,
            useBinaryFormat,
            timeoutMs,
            header
          ),
          message: input,
        },
        next: async (req) => {
          const fRes = await fetch(req.url, {
            ...req.init,
            headers: req.header,
            signal: req.signal,
            body: await createRequestBody(req.message),
          });
          validateResponse(method.kind, fRes.status, fRes.headers);
          if (fRes.body === null) {
            throw "missing response body";
          }
          const trailer = new Headers();
          const res: StreamResponse<I, O> = {
            ...req,
            header: fRes.headers,
            trailer,
            message: parseResponseBody(fRes.body, trailer),
          };
          return res;
        },
      });
    },
  };
}