in packages/dubbo/src/protocol/async-iterable.ts [862:937]
export function transformParseEnvelope<T>(
serialization: Serialization<T>,
endStreamFlag: number
): AsyncIterableTransform<EnvelopedMessage, T>;
/**
* Creates an AsyncIterableTransform that takes enveloped messages as input,
* parses the envelope payload and outputs the result.
*
* Note that this override will look for the given endStreamFlag, and raise
* and error if it is set.
*
* @private Internal code, does not follow semantic versioning.
*/
export function transformParseEnvelope<T>(
serialization: Serialization<T>,
endStreamFlag: number,
endSerialization: null
): AsyncIterableTransform<EnvelopedMessage, T>;
/**
* Creates an AsyncIterableTransform that takes an enveloped message as input,
* and outputs a ParsedEnvelopedMessage.
*
* For example, if the given endStreamFlag is set for a source envelope, its
* payload is parsed using the given endSerialization, and an object with
* { end: true, value: ... } is returned.
*
* If the endStreamFlag is not set, the payload is parsed using the given
* serialization, and an object with { end: false, value: ... } is returned.
*
* @private Internal code, does not follow semantic versioning.
*/
export function transformParseEnvelope<T, E>(
serialization: Serialization<T>,
endStreamFlag: number,
endSerialization: Serialization<E>
): AsyncIterableTransform<EnvelopedMessage, ParsedEnvelopedMessage<T, E>>;
export function transformParseEnvelope<T, E>(
serialization: Serialization<T>,
endStreamFlag?: number,
endSerialization?: null | Serialization<E>
):
| AsyncIterableTransform<EnvelopedMessage, T>
| AsyncIterableTransform<EnvelopedMessage, ParsedEnvelopedMessage<T, E>> {
// code path always yields ParsedEnvelopedMessage<T, E>
if (endSerialization && endStreamFlag !== undefined) {
return async function* (
iterable: AsyncIterable<EnvelopedMessage>
): AsyncIterable<ParsedEnvelopedMessage<T, E>> {
for await (const { flags, data } of iterable) {
if ((flags & endStreamFlag) === endStreamFlag) {
yield { value: endSerialization.parse(data), end: true };
} else {
yield { value: serialization.parse(data), end: false };
}
}
};
}
// code path always yields T
return async function* (
iterable: AsyncIterable<EnvelopedMessage>
): AsyncIterable<T> {
for await (const { flags, data } of iterable) {
if (
endStreamFlag !== undefined &&
(flags & endStreamFlag) === endStreamFlag
) {
if (endSerialization === null) {
throw new ConnectError("unexpected end flag", Code.InvalidArgument);
}
// skips end-of-stream envelope
continue;
}
yield serialization.parse(data);
}
};
}