in packages/dubbo/src/protocol/envelope.ts [64:99]
async pull(controller): Promise<void> {
let header: { length: number; flags: number } | undefined = undefined;
for (;;) {
if (header === undefined && buffer.byteLength >= 5) {
let length = 0;
for (let i = 1; i < 5; i++) {
length = (length << 8) + buffer[i];
}
header = { flags: buffer[0], length };
}
if (header !== undefined && buffer.byteLength >= header.length + 5) {
break;
}
const result = await reader.read();
if (result.done) {
break;
}
append(result.value);
}
if (header === undefined) {
if (buffer.byteLength == 0) {
controller.close();
return;
}
controller.error(
new ConnectError("premature end of stream", Code.DataLoss)
);
return;
}
const data = buffer.subarray(5, 5 + header.length);
buffer = buffer.subarray(5 + header.length);
controller.enqueue({
flags: header.flags,
data,
});
},