export function createGrpcWebTransport()

in packages/dubbo-web/src/grpc-web-transport.ts [117:333]


export function createGrpcWebTransport(
  options: GrpcWebTransportOptions
): Transport {
  assertFetchApi();
  const useBinaryFormat = options.useBinaryFormat ?? true;
  const fetch = options.fetch ?? globalThis.fetch;
  return {
    async unary<
      I extends Message<I> = AnyMessage,
      O extends Message<O> = AnyMessage
    >(
      service: ServiceType,
      method: MethodInfo<I, O>,
      signal: AbortSignal | undefined,
      timeoutMs: number | undefined,
      header: Headers,
      message: PartialMessage<I>
    ): Promise<UnaryResponse<I, O>> {
      const { normalize, serialize, parse } = createClientMethodSerializers(
        method,
        useBinaryFormat,
        options.jsonOptions,
        options.binaryOptions
      );
      return await runUnaryCall<I, O>({
        interceptors: options.interceptors,
        signal,
        timeoutMs,
        req: {
          stream: false,
          service,
          method,
          url: createMethodUrl(options.baseUrl, service, method),
          init: {
            method: "POST",
            credentials: options.credentials ?? "same-origin",
            redirect: "error",
            mode: "cors",
          },
          header: requestHeader(useBinaryFormat, timeoutMs, header),
          message: normalize(message),
        },
        next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
          const response = await fetch(req.url, {
            ...req.init,
            headers: req.header,
            signal: req.signal,
            body: encodeEnvelope(0, serialize(req.message)),
          });
          validateResponse(response.status, response.headers);
          if (!response.body) {
            throw "missing response body";
          }
          const reader = createEnvelopeReadableStream(
            response.body
          ).getReader();
          let trailer: Headers | undefined;
          let message: O | undefined;
          for (;;) {
            const r = await reader.read();
            if (r.done) {
              break;
            }
            const { flags, data } = r.value;
            if (flags === trailerFlag) {
              if (trailer !== undefined) {
                throw "extra trailer";
              }
              // Unary responses require exactly one response message, but in
              // case of an error, it is perfectly valid to have a response body
              // that only contains error trailers.
              trailer = trailerParse(data);
              continue;
            }
            if (message !== undefined) {
              throw "extra message";
            }
            message = parse(data);
          }
          if (trailer === undefined) {
            throw "missing trailer";
          }
          validateTrailer(trailer);
          if (message === undefined) {
            throw "missing message";
          }
          return <UnaryResponse<I, O>>{
            stream: false,
            header: response.headers,
            message,
            trailer,
          };
        },
      });
    },

    async stream<
      I extends Message<I> = AnyMessage,
      O extends Message<O> = AnyMessage
    >(
      service: ServiceType,
      method: MethodInfo<I, O>,
      signal: AbortSignal | undefined,
      timeoutMs: number | undefined,
      header: HeadersInit | undefined,
      input: AsyncIterable<I>
    ): Promise<StreamResponse<I, O>> {
      const { serialize, parse } = createClientMethodSerializers(
        method,
        useBinaryFormat,
        options.jsonOptions,
        options.binaryOptions
      );

      async function* parseResponseBody(
        body: ReadableStream<Uint8Array>,
        foundStatus: boolean,
        trailerTarget: Headers
      ) {
        const reader = createEnvelopeReadableStream(body).getReader();
        if (foundStatus) {
          // A grpc-status: 0 response header was present. This is a "trailers-only"
          // response (a response without a body and no trailers).
          //
          // The spec seems to disallow a trailers-only response for status 0 - we are
          // lenient and only verify that the body is empty.
          //
          // > [...] Trailers-Only is permitted for calls that produce an immediate error.
          // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
          if (!(await reader.read()).done) {
            throw "extra data for trailers-only";
          }
          return;
        }
        let trailerReceived = false;
        for (;;) {
          const result = await reader.read();
          if (result.done) {
            break;
          }
          const { flags, data } = result.value;
          if ((flags & trailerFlag) === trailerFlag) {
            if (trailerReceived) {
              throw "extra trailer";
            }
            trailerReceived = true;
            const trailer = trailerParse(data);
            validateTrailer(trailer);
            trailer.forEach((value, key) => trailerTarget.set(key, value));
            continue;
          }
          if (trailerReceived) {
            throw "extra message";
          }
          yield parse(data);
          continue;
        }
        if (!trailerReceived) {
          throw "missing trailer";
        }
      }

      async function createRequestBody(
        input: AsyncIterable<I>
      ): Promise<Uint8Array> {
        if (method.kind != MethodKind.ServerStreaming) {
          throw "The fetch API does not support streaming request bodies";
        }
        const r = await input[Symbol.asyncIterator]().next();
        if (r.done == true) {
          throw "missing request message";
        }
        return encodeEnvelope(0, serialize(r.value));
      }

      return runStreamingCall<I, O>({
        interceptors: options.interceptors,
        signal,
        timeoutMs,
        req: {
          stream: true,
          service,
          method,
          url: createMethodUrl(options.baseUrl, service, method),
          init: {
            method: "POST",
            credentials: options.credentials ?? "same-origin",
            redirect: "error",
            mode: "cors",
          },
          header: requestHeader(useBinaryFormat, timeoutMs, header),
          message: input,
        },
        next: async (req) => {
          const fRes = await fetch(req.url, {
            ...req.init,
            headers: req.header,
            signal: req.signal,
            body: await createRequestBody(req.message),
          });
          const { foundStatus } = validateResponse(fRes.status, fRes.headers);
          if (!fRes.body) {
            throw "missing response body";
          }
          const trailer = new Headers();
          const res: StreamResponse<I, O> = {
            ...req,
            header: fRes.headers,
            trailer,
            message: parseResponseBody(fRes.body, foundStatus, trailer),
          };
          return res;
        },
      });
    },
  };
}