in packages/dubbo/src/protocol/async-iterable.ts [893:937]
export function transformParseEnvelope<T, E>(
serialization: Serialization<T>,
endStreamFlag: number,
endSerialization: Serialization<E>
): AsyncIterableTransform<EnvelopedMessage, ParsedEnvelopedMessage<T, E>>;
export function transformParseEnvelope<T, E>(
serialization: Serialization<T>,
endStreamFlag?: number,
endSerialization?: null | Serialization<E>
):
| AsyncIterableTransform<EnvelopedMessage, T>
| AsyncIterableTransform<EnvelopedMessage, ParsedEnvelopedMessage<T, E>> {
// code path always yields ParsedEnvelopedMessage<T, E>
if (endSerialization && endStreamFlag !== undefined) {
return async function* (
iterable: AsyncIterable<EnvelopedMessage>
): AsyncIterable<ParsedEnvelopedMessage<T, E>> {
for await (const { flags, data } of iterable) {
if ((flags & endStreamFlag) === endStreamFlag) {
yield { value: endSerialization.parse(data), end: true };
} else {
yield { value: serialization.parse(data), end: false };
}
}
};
}
// code path always yields T
return async function* (
iterable: AsyncIterable<EnvelopedMessage>
): AsyncIterable<T> {
for await (const { flags, data } of iterable) {
if (
endStreamFlag !== undefined &&
(flags & endStreamFlag) === endStreamFlag
) {
if (endSerialization === null) {
throw new ConnectError("unexpected end flag", Code.InvalidArgument);
}
// skips end-of-stream envelope
continue;
}
yield serialization.parse(data);
}
};
}