in progress_tracking/src/aggregator.rs [83:139]
fn get_state(&mut self) -> ProgressUpdate {
let mut update = std::mem::take(&mut self.pending);
// Copy back the accumulated stats in case this is called before another update happens.
self.pending.total_bytes = update.total_bytes;
self.pending.total_bytes_completed = update.total_bytes_completed;
self.pending.total_transfer_bytes = update.total_transfer_bytes;
self.pending.total_transfer_bytes_completed = update.total_transfer_bytes_completed;
// Now update the speed estimation if possible.
if self.speed_sample_size != 0 {
let now = Instant::now();
let earliest_idx = self.tick_index % self.speed_sample_size;
if !self.speed_window_samples.is_empty() {
// Run this as a fixed size ring buffer.
let earliest = &self.speed_window_samples[earliest_idx];
let time_passed = (now.saturating_duration_since(earliest.sample_time)).as_secs_f64().max(0.001);
update.total_bytes_completion_rate = Some(
(update.total_bytes_completed.saturating_sub(earliest.total_bytes_completed)) as f64 / time_passed,
);
update.total_transfer_bytes_completion_rate = Some(
(update
.total_transfer_bytes_completed
.saturating_sub(earliest.total_transfer_bytes_completed)) as f64
/ time_passed,
);
}
// Add the current update to the ring
let speed_sample = SpeedWindowSample {
sample_time: now,
total_bytes_completed: update.total_bytes_completed,
total_transfer_bytes_completed: update.total_transfer_bytes_completed,
};
if self.speed_window_samples.len() < self.speed_sample_size {
self.speed_window_samples.push(speed_sample);
} else {
// Cycle the insertion point in the ring.
self.speed_window_samples[earliest_idx] = speed_sample;
self.tick_index += 1;
}
}
// Preallocate enough that we minimize reallocations
self.pending.item_updates = Vec::with_capacity((4 * update.item_updates.len()) / 3);
// Clear out the lookup table.
self.item_lookup.clear();
// Return the update.
update
}