in glean-core/src/upload/mod.rs [604:691]
fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
// Helper to decide whether to return PingUploadTask::Wait or PingUploadTask::Done.
//
// We want to limit the amount of PingUploadTask::Wait returned in a row,
// in case we reach MAX_WAIT_ATTEMPTS we want to actually return PingUploadTask::Done.
let wait_or_done = |time: u64| {
self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
if self.wait_attempt_count() > self.policy.max_wait_attempts() {
PingUploadTask::done()
} else {
PingUploadTask::Wait { time }
}
};
if !self.processed_pending_pings() {
log::info!(
"Tried getting an upload task, but processing is ongoing. Will come back later."
);
return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
}
// This is a no-op in case there are no cached pings.
self.enqueue_cached_pings(glean);
if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
log::warn!(
"Reached maximum recoverable failures for the current uploading window. You are done."
);
return PingUploadTask::done();
}
let mut queue = self
.queue
.write()
.expect("Can't write to pending pings queue.");
match queue.front() {
Some(request) => {
if let Some(rate_limiter) = &self.rate_limiter {
let mut rate_limiter = rate_limiter
.write()
.expect("Can't write to the rate limiter.");
if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
log::info!(
"Tried getting an upload task, but we are throttled at the moment."
);
return wait_or_done(remaining);
}
}
log::info!(
"New upload task with id {} (path: {})",
request.document_id,
request.path
);
if log_ping {
if let Some(body) = request.pretty_body() {
chunked_log_info(&request.path, &body);
} else {
chunked_log_info(&request.path, "<invalid ping payload>");
}
}
{
// Synchronous timer starts.
// We're in the uploader thread anyway.
// But also: No data is stored on disk.
let mut in_flight = self.in_flight.write().unwrap();
let success_id = self.upload_metrics.send_success.start_sync();
let failure_id = self.upload_metrics.send_failure.start_sync();
in_flight.insert(request.document_id.clone(), (success_id, failure_id));
}
let mut request = queue.pop_front().unwrap();
// Adding the `Date` header just before actual upload happens.
request
.headers
.insert("Date".to_string(), create_date_header_value(Utc::now()));
PingUploadTask::Upload { request }
}
None => {
log::info!("No more pings to upload! You are done.");
PingUploadTask::done()
}
}
}