in progress_tracking/src/upload_tracking.rs [324:429]
fn register_xorb_upload_progress(
&mut self,
xorb_hash: MerkleHash,
new_byte_progress: u64,
check_ordering: bool,
) -> ProgressUpdate {
// Should have already been registered.
debug_assert!(self.xorbs.contains_key(&xorb_hash));
// Mark as completed, return the list of files to mark as completed.
let entry = self.xorbs.entry(xorb_hash).or_default();
// If this update could arrive out of order, check to see if it's needed and ignore if not.
if !check_ordering && entry.is_completed {
// Return an empty update
return ProgressUpdate {
item_updates: vec![],
total_bytes: self.total_bytes,
total_bytes_increment: 0,
total_bytes_completed: self.total_bytes_completed,
total_bytes_completion_increment: 0,
total_transfer_bytes: self.total_upload_bytes,
total_transfer_bytes_increment: 0,
total_transfer_bytes_completed: self.total_upload_bytes_completed,
total_transfer_bytes_completion_increment: 0,
..Default::default()
};
}
// Should not be completed when this is called.
debug_assert!(!entry.is_completed);
// Is the update reasonable?
debug_assert_le!(entry.completed_bytes + new_byte_progress, entry.xorb_size);
entry.completed_bytes += new_byte_progress;
let new_completion_ratio = (entry.completed_bytes as f64) / (entry.xorb_size as f64);
// Mark all the relevant files as completed
let mut item_updates = Vec::with_capacity(entry.file_indices.len());
let mut file_bytes_processed = 0;
// For each file that depends on this xorb, update a proportion of that remove the relevant
// part from `remaining_xorbs_parts` and add to `completed_bytes`.
for &file_id in entry.file_indices.iter() {
let file_entry = &mut self.files[file_id];
// Should be registered there.
debug_assert!(file_entry.remaining_xorbs_parts.contains_key(&xorb_hash));
// Update
let incremental_update = 'update: {
let Some(xorb_part) = file_entry.remaining_xorbs_parts.get_mut(&xorb_hash) else {
break 'update 0;
};
debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
// Use floor so as to not inproperly report completion when there is still some to go.
let new_completion_bytes = ((xorb_part.n_bytes as f64) * new_completion_ratio).floor() as u64;
// Make sure this is an update
debug_assert_ge!(new_completion_bytes, xorb_part.completed_bytes);
let incremental_update = new_completion_bytes.saturating_sub(xorb_part.completed_bytes);
xorb_part.completed_bytes += incremental_update;
debug_assert_le!(xorb_part.completed_bytes, xorb_part.n_bytes);
incremental_update
};
if incremental_update != 0 {
file_entry.completed_bytes += incremental_update;
let progress_update = ItemProgressUpdate {
item_name: file_entry.name.clone(),
total_bytes: file_entry.total_bytes,
bytes_completed: file_entry.completed_bytes,
bytes_completion_increment: incremental_update,
};
file_bytes_processed += incremental_update;
item_updates.push(progress_update);
}
}
self.total_upload_bytes_completed += new_byte_progress;
debug_assert_le!(self.total_upload_bytes_completed, self.total_upload_bytes);
self.total_bytes_completed += file_bytes_processed;
debug_assert_le!(self.total_bytes_completed, self.total_bytes);
ProgressUpdate {
item_updates,
total_bytes: self.total_bytes,
total_bytes_increment: 0,
total_bytes_completed: self.total_bytes_completed,
total_bytes_completion_increment: file_bytes_processed,
total_transfer_bytes: self.total_upload_bytes,
total_transfer_bytes_increment: 0,
total_transfer_bytes_completed: self.total_upload_bytes_completed,
total_transfer_bytes_completion_increment: new_byte_progress,
..Default::default()
}
}