in packages/hub/src/utils/XetBlob.ts [314:429]
fetchData: while (!done && totalBytesRead < maxBytes) {
const result = await reader.read();
listener?.({ event: "read" });
done = result.done;
log("read", result.value?.byteLength, "bytes", "total read", totalBytesRead, "toSkip", readBytesToSkip);
if (!result.value) {
log("no data in result, cancelled", result);
continue;
}
totalFetchBytes += result.value.byteLength;
if (leftoverBytes) {
result.value = new Uint8Array([...leftoverBytes, ...result.value]);
leftoverBytes = undefined;
}
while (totalBytesRead < maxBytes && result.value.byteLength) {
if (result.value.byteLength < 8) {
// We need 8 bytes to parse the chunk header
leftoverBytes = result.value;
continue fetchData;
}
const header = new DataView(result.value.buffer, result.value.byteOffset, CHUNK_HEADER_BYTES);
const chunkHeader: ChunkHeader = {
version: header.getUint8(0),
compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
compression_scheme: header.getUint8(4),
uncompressed_length: header.getUint8(5) | (header.getUint8(6) << 8) | (header.getUint8(7) << 16),
};
log("chunk header", chunkHeader, "to skip", readBytesToSkip);
if (chunkHeader.version !== 0) {
throw new Error(`Unsupported chunk version ${chunkHeader.version}`);
}
if (
chunkHeader.compression_scheme !== CompressionScheme.None &&
chunkHeader.compression_scheme !== CompressionScheme.LZ4 &&
chunkHeader.compression_scheme !== CompressionScheme.ByteGroupingLZ4
) {
throw new Error(
`Unsupported compression scheme ${
compressionSchemeLabels[chunkHeader.compression_scheme] ?? chunkHeader.compression_scheme
}`
);
}
if (result.value.byteLength < chunkHeader.compressed_length + CHUNK_HEADER_BYTES) {
// We need more data to read the full chunk
leftoverBytes = result.value;
continue fetchData;
}
result.value = result.value.slice(CHUNK_HEADER_BYTES);
let uncompressed =
chunkHeader.compression_scheme === CompressionScheme.LZ4
? lz4_decompress(result.value.slice(0, chunkHeader.compressed_length), chunkHeader.uncompressed_length)
: chunkHeader.compression_scheme === CompressionScheme.ByteGroupingLZ4
? bg4_regoup_bytes(
lz4_decompress(
result.value.slice(0, chunkHeader.compressed_length),
chunkHeader.uncompressed_length
)
)
: result.value.slice(0, chunkHeader.compressed_length);
const range = ranges.find((range) => chunkIndex >= range.start && chunkIndex < range.end);
const shouldYield = chunkIndex >= term.range.start && chunkIndex < term.range.end;
const minRefCountToStore = shouldYield ? 2 : 1;
let stored = false;
// Assuming non-overlapping fetch_info ranges for the same hash
if (range && range.refCount >= minRefCountToStore) {
range.data ??= [];
range.data.push(uncompressed);
stored = true;
}
if (shouldYield) {
if (readBytesToSkip) {
const skipped = Math.min(readBytesToSkip, uncompressed.byteLength);
uncompressed = uncompressed.slice(readBytesToSkip);
readBytesToSkip -= skipped;
}
if (uncompressed.byteLength > maxBytes - totalBytesRead) {
uncompressed = uncompressed.slice(0, maxBytes - totalBytesRead);
}
if (uncompressed.byteLength) {
log(
"yield",
uncompressed.byteLength,
"bytes",
result.value.byteLength,
"total read",
totalBytesRead,
stored
);
totalBytesRead += uncompressed.byteLength;
yield stored ? uncompressed.slice() : uncompressed;
listener?.({ event: "progress", progress: { read: totalBytesRead, total: maxBytes } });
}
}
chunkIndex++;
result.value = result.value.slice(chunkHeader.compressed_length);
}
}