fetchData: while()

in packages/hub/src/utils/XetBlob.ts [314:429]


				fetchData: while (!done && totalBytesRead < maxBytes) {
					const result = await reader.read();
					listener?.({ event: "read" });

					done = result.done;

					log("read", result.value?.byteLength, "bytes", "total read", totalBytesRead, "toSkip", readBytesToSkip);

					if (!result.value) {
						log("no data in result, cancelled", result);
						continue;
					}

					totalFetchBytes += result.value.byteLength;

					if (leftoverBytes) {
						result.value = new Uint8Array([...leftoverBytes, ...result.value]);
						leftoverBytes = undefined;
					}

					while (totalBytesRead < maxBytes && result.value.byteLength) {
						if (result.value.byteLength < 8) {
							// We need 8 bytes to parse the chunk header
							leftoverBytes = result.value;
							continue fetchData;
						}

						const header = new DataView(result.value.buffer, result.value.byteOffset, CHUNK_HEADER_BYTES);
						const chunkHeader: ChunkHeader = {
							version: header.getUint8(0),
							compressed_length: header.getUint8(1) | (header.getUint8(2) << 8) | (header.getUint8(3) << 16),
							compression_scheme: header.getUint8(4),
							uncompressed_length: header.getUint8(5) | (header.getUint8(6) << 8) | (header.getUint8(7) << 16),
						};

						log("chunk header", chunkHeader, "to skip", readBytesToSkip);

						if (chunkHeader.version !== 0) {
							throw new Error(`Unsupported chunk version ${chunkHeader.version}`);
						}

						if (
							chunkHeader.compression_scheme !== CompressionScheme.None &&
							chunkHeader.compression_scheme !== CompressionScheme.LZ4 &&
							chunkHeader.compression_scheme !== CompressionScheme.ByteGroupingLZ4
						) {
							throw new Error(
								`Unsupported compression scheme ${
									compressionSchemeLabels[chunkHeader.compression_scheme] ?? chunkHeader.compression_scheme
								}`
							);
						}

						if (result.value.byteLength < chunkHeader.compressed_length + CHUNK_HEADER_BYTES) {
							// We need more data to read the full chunk
							leftoverBytes = result.value;
							continue fetchData;
						}

						result.value = result.value.slice(CHUNK_HEADER_BYTES);

						let uncompressed =
							chunkHeader.compression_scheme === CompressionScheme.LZ4
								? lz4_decompress(result.value.slice(0, chunkHeader.compressed_length), chunkHeader.uncompressed_length)
								: chunkHeader.compression_scheme === CompressionScheme.ByteGroupingLZ4
								  ? bg4_regoup_bytes(
											lz4_decompress(
												result.value.slice(0, chunkHeader.compressed_length),
												chunkHeader.uncompressed_length
											)
								    )
								  : result.value.slice(0, chunkHeader.compressed_length);

						const range = ranges.find((range) => chunkIndex >= range.start && chunkIndex < range.end);
						const shouldYield = chunkIndex >= term.range.start && chunkIndex < term.range.end;
						const minRefCountToStore = shouldYield ? 2 : 1;
						let stored = false;

						// Assuming non-overlapping fetch_info ranges for the same hash
						if (range && range.refCount >= minRefCountToStore) {
							range.data ??= [];
							range.data.push(uncompressed);
							stored = true;
						}

						if (shouldYield) {
							if (readBytesToSkip) {
								const skipped = Math.min(readBytesToSkip, uncompressed.byteLength);
								uncompressed = uncompressed.slice(readBytesToSkip);
								readBytesToSkip -= skipped;
							}

							if (uncompressed.byteLength > maxBytes - totalBytesRead) {
								uncompressed = uncompressed.slice(0, maxBytes - totalBytesRead);
							}

							if (uncompressed.byteLength) {
								log(
									"yield",
									uncompressed.byteLength,
									"bytes",
									result.value.byteLength,
									"total read",
									totalBytesRead,
									stored
								);
								totalBytesRead += uncompressed.byteLength;
								yield stored ? uncompressed.slice() : uncompressed;
								listener?.({ event: "progress", progress: { read: totalBytesRead, total: maxBytes } });
							}
						}

						chunkIndex++;
						result.value = result.value.slice(chunkHeader.compressed_length);
					}
				}