export function createTransport()

in packages/dubbo/src/protocol-triple/transport.ts [61:307]


export function createTransport(opt: CommonTransportOptions): Transport {
  return {
    async unary<
      I extends Message<I> = AnyMessage,
      O extends Message<O> = AnyMessage
    >(
      service: ServiceType,
      method: MethodInfo<I, O>,
      signal: AbortSignal | undefined,
      timeoutMs: number | undefined,
      header: HeadersInit | undefined,
      message: PartialMessage<I>
    ): Promise<UnaryResponse<I, O>> {
      const serialization = createMethodSerializationLookup(
        method,
        opt.binaryOptions,
        opt.jsonOptions,
        opt
      );
      return await runUnaryCall<I, O>({
        interceptors: opt.interceptors,
        signal,
        timeoutMs,
        req: {
          stream: false,
          service,
          method,
          url: createMethodUrl(opt.baseUrl, service, method),
          init: {},
          header: requestHeaderWithCompression(
            method.kind,
            opt.useBinaryFormat,
            timeoutMs,
            header,
            opt.acceptCompression,
            opt.sendCompression
          ),
          message:
            message instanceof method.I ? message : new method.I(message),
        },
        next: async (req: UnaryRequest<I, O>): Promise<UnaryResponse<I, O>> => {
          let requestBody = serialization
            .getI(opt.useBinaryFormat)
            .serialize(req.message);
          if (
            opt.sendCompression &&
            requestBody.byteLength > opt.compressMinBytes
          ) {
            requestBody = await opt.sendCompression.compress(requestBody);
            req.header.set(headerUnaryEncoding, opt.sendCompression.name);
          } else {
            req.header.delete(headerUnaryEncoding);
          }
          const useGet =
            opt.useHttpGet === true &&
            method.idempotency === MethodIdempotency.NoSideEffects;
          let body: AsyncIterable<Uint8Array> | undefined;
          if (useGet) {
            req = transformConnectPostToGetRequest(
              req,
              requestBody,
              opt.useBinaryFormat
            );
          } else {
            body = createAsyncIterable([requestBody]);
          }
          const universalResponse = await opt.httpClient({
            url: req.url,
            method: req.init.method ?? "POST",
            header: req.header,
            signal: req.signal,
            body,
          });
          const { compression, isUnaryError, unaryError } =
            validateResponseWithCompression(
              method.kind,
              opt.acceptCompression,
              universalResponse.status,
              universalResponse.header
            );
          const [header, trailer] = trailerDemux(universalResponse.header);
          let responseBody = await pipeTo(
            universalResponse.body,
            sinkAllBytes(
              opt.readMaxBytes,
              universalResponse.header.get(headerUnaryContentLength)
            ),
            { propagateDownStreamError: false }
          );
          if (compression) {
            responseBody = await compression.decompress(
              responseBody,
              opt.readMaxBytes
            );
          }
          if (isUnaryError) {
            throw errorFromJsonBytes(
              responseBody,
              appendHeaders(header, trailer),
              unaryError
            );
          }
          return <UnaryResponse<I, O>>{
            stream: false,
            service,
            method,
            header,
            message: serialization
              .getO(opt.useBinaryFormat)
              .parse(responseBody),
            trailer,
          };
        },
      });
    },

    async stream<
      I extends Message<I> = AnyMessage,
      O extends Message<O> = AnyMessage
    >(
      service: ServiceType,
      method: MethodInfo<I, O>,
      signal: AbortSignal | undefined,
      timeoutMs: number | undefined,
      header: HeadersInit | undefined,
      input: AsyncIterable<I>
    ): Promise<StreamResponse<I, O>> {
      const serialization = createMethodSerializationLookup(
        method,
        opt.binaryOptions,
        opt.jsonOptions,
        opt
      );
      const endStreamSerialization = createEndStreamSerialization(
        opt.jsonOptions
      );
      return runStreamingCall<I, O>({
        interceptors: opt.interceptors,
        signal,
        timeoutMs,
        req: {
          stream: true,
          service,
          method,
          url: createMethodUrl(opt.baseUrl, service, method),
          init: {
            method: "POST",
            redirect: "error",
            mode: "cors",
          },
          header: requestHeaderWithCompression(
            method.kind,
            opt.useBinaryFormat,
            timeoutMs,
            header,
            opt.acceptCompression,
            opt.sendCompression
          ),
          message: pipe(input, transformNormalizeMessage(method.I), {
            propagateDownStreamError: true,
          }),
        },
        next: async (req: StreamRequest<I, O>) => {
          const uRes = await opt.httpClient({
            url: req.url,
            method: "POST",
            header: req.header,
            signal: req.signal,
            body: pipe(
              req.message,
              transformNormalizeMessage(method.I),
              transformSerializeEnvelope(
                serialization.getI(opt.useBinaryFormat)
              ),
              transformCompressEnvelope(
                opt.sendCompression,
                opt.compressMinBytes
              ),
              transformJoinEnvelopes(),
              { propagateDownStreamError: true }
            ),
          });
          const { compression } = validateResponseWithCompression(
            method.kind,
            opt.acceptCompression,
            uRes.status,
            uRes.header
          );
          const res: StreamResponse<I, O> = {
            ...req,
            header: uRes.header,
            trailer: new Headers(),
            message: pipe(
              uRes.body,
              transformSplitEnvelope(opt.readMaxBytes),
              transformDecompressEnvelope(
                compression ?? null,
                opt.readMaxBytes
              ),
              transformParseEnvelope(
                serialization.getO(opt.useBinaryFormat),
                endStreamFlag,
                endStreamSerialization
              ),
              async function* (iterable) {
                let endStreamReceived = false;
                for await (const chunk of iterable) {
                  if (chunk.end) {
                    if (endStreamReceived) {
                      throw new ConnectError(
                        "protocol error: received extra EndStreamResponse",
                        Code.InvalidArgument
                      );
                    }
                    endStreamReceived = true;
                    if (chunk.value.error) {
                      throw chunk.value.error;
                    }
                    chunk.value.metadata.forEach((value, key) =>
                      res.trailer.set(key, value)
                    );
                    continue;
                  }
                  if (endStreamReceived) {
                    throw new ConnectError(
                      "protocol error: received extra message after EndStreamResponse",
                      Code.InvalidArgument
                    );
                  }
                  yield chunk.value;
                }
                if (!endStreamReceived) {
                  throw new ConnectError(
                    "protocol error: missing EndStreamResponse",
                    Code.InvalidArgument
                  );
                }
              },
              { propagateDownStreamError: true }
            ),
          };
          return res;
        },
      });
    },
  };
}