in hf_xet_wasm/src/wasm_file_cleaner.rs [78:114]
fn new_with_cpu_task_in_worker_thread(
session: Arc<FileUploadSession>,
file_id: u64,
sha256: Option<MerkleHash>,
) -> Self {
let (input_tx, mut input_rx) = mpsc::unbounded_channel::<Vec<u8>>();
let (chunks_tx, chunks_rx) = mpsc::unbounded_channel::<Vec<Chunk>>();
let cpu_worker = wasmtokio::task::spawn_blocking(move || {
futures::executor::block_on(async move {
let mut chunker = Chunker::default();
let mut sha_generation = ShaGeneration::new(sha256);
while let Some(input) = input_rx.recv().await {
let chunks = chunker.next_block(&input, false);
chunks_tx.send(chunks).map_err(DataProcessingError::internal)?;
sha_generation.update_with_bytes(&input);
}
if let Some(chunk) = chunker.finish() {
sha_generation.update_with_bytes(&chunk.data);
chunks_tx.send(vec![chunk]).map_err(DataProcessingError::internal)?;
}
sha_generation.finalize()
})
});
let cpu_task = CPUTask::WorkerThread((cpu_worker, input_tx, chunks_rx, 0));
Self {
_file_id: file_id,
session: session.clone(),
cpu_task,
dedup_manager: FileDeduper::new(UploadSessionDataManager::new(session), file_id),
}
}