export function transformParseEnvelope()

in packages/dubbo/src/protocol/async-iterable.ts [862:937]


export function transformParseEnvelope<T>(
  serialization: Serialization<T>,
  endStreamFlag: number
): AsyncIterableTransform<EnvelopedMessage, T>;
/**
 * Creates an AsyncIterableTransform that takes enveloped messages as input,
 * parses the envelope payload and outputs the result.
 *
 * Note that this override will look for the given endStreamFlag, and raise
 * and error if it is set.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function transformParseEnvelope<T>(
  serialization: Serialization<T>,
  endStreamFlag: number,
  endSerialization: null
): AsyncIterableTransform<EnvelopedMessage, T>;
/**
 * Creates an AsyncIterableTransform that takes an enveloped message as input,
 * and outputs a ParsedEnvelopedMessage.
 *
 * For example, if the given endStreamFlag is set for a source envelope, its
 * payload is parsed using the given endSerialization, and an object with
 * { end: true, value: ... } is returned.
 *
 * If the endStreamFlag is not set, the payload is parsed using the given
 * serialization, and an object with { end: false, value: ... } is returned.
 *
 * @private Internal code, does not follow semantic versioning.
 */
export function transformParseEnvelope<T, E>(
  serialization: Serialization<T>,
  endStreamFlag: number,
  endSerialization: Serialization<E>
): AsyncIterableTransform<EnvelopedMessage, ParsedEnvelopedMessage<T, E>>;
export function transformParseEnvelope<T, E>(
  serialization: Serialization<T>,
  endStreamFlag?: number,
  endSerialization?: null | Serialization<E>
):
  | AsyncIterableTransform<EnvelopedMessage, T>
  | AsyncIterableTransform<EnvelopedMessage, ParsedEnvelopedMessage<T, E>> {
  // code path always yields ParsedEnvelopedMessage<T, E>
  if (endSerialization && endStreamFlag !== undefined) {
    return async function* (
      iterable: AsyncIterable<EnvelopedMessage>
    ): AsyncIterable<ParsedEnvelopedMessage<T, E>> {
      for await (const { flags, data } of iterable) {
        if ((flags & endStreamFlag) === endStreamFlag) {
          yield { value: endSerialization.parse(data), end: true };
        } else {
          yield { value: serialization.parse(data), end: false };
        }
      }
    };
  }
  // code path always yields T
  return async function* (
    iterable: AsyncIterable<EnvelopedMessage>
  ): AsyncIterable<T> {
    for await (const { flags, data } of iterable) {
      if (
        endStreamFlag !== undefined &&
        (flags & endStreamFlag) === endStreamFlag
      ) {
        if (endSerialization === null) {
          throw new ConnectError("unexpected end flag", Code.InvalidArgument);
        }
        // skips end-of-stream envelope
        continue;
      }
      yield serialization.parse(data);
    }
  };
}