fn get_upload_task_internal()

in glean-core/src/upload/mod.rs [604:691]


    fn get_upload_task_internal(&self, glean: &Glean, log_ping: bool) -> PingUploadTask {
        // Helper to decide whether to return PingUploadTask::Wait or PingUploadTask::Done.
        //
        // We want to limit the amount of PingUploadTask::Wait returned in a row,
        // in case we reach MAX_WAIT_ATTEMPTS we want to actually return PingUploadTask::Done.
        let wait_or_done = |time: u64| {
            self.wait_attempt_count.fetch_add(1, Ordering::SeqCst);
            if self.wait_attempt_count() > self.policy.max_wait_attempts() {
                PingUploadTask::done()
            } else {
                PingUploadTask::Wait { time }
            }
        };

        if !self.processed_pending_pings() {
            log::info!(
                "Tried getting an upload task, but processing is ongoing. Will come back later."
            );
            return wait_or_done(WAIT_TIME_FOR_PING_PROCESSING);
        }

        // This is a no-op in case there are no cached pings.
        self.enqueue_cached_pings(glean);

        if self.recoverable_failure_count() >= self.policy.max_recoverable_failures() {
            log::warn!(
                "Reached maximum recoverable failures for the current uploading window. You are done."
            );
            return PingUploadTask::done();
        }

        let mut queue = self
            .queue
            .write()
            .expect("Can't write to pending pings queue.");
        match queue.front() {
            Some(request) => {
                if let Some(rate_limiter) = &self.rate_limiter {
                    let mut rate_limiter = rate_limiter
                        .write()
                        .expect("Can't write to the rate limiter.");
                    if let RateLimiterState::Throttled(remaining) = rate_limiter.get_state() {
                        log::info!(
                            "Tried getting an upload task, but we are throttled at the moment."
                        );
                        return wait_or_done(remaining);
                    }
                }

                log::info!(
                    "New upload task with id {} (path: {})",
                    request.document_id,
                    request.path
                );

                if log_ping {
                    if let Some(body) = request.pretty_body() {
                        chunked_log_info(&request.path, &body);
                    } else {
                        chunked_log_info(&request.path, "<invalid ping payload>");
                    }
                }

                {
                    // Synchronous timer starts.
                    // We're in the uploader thread anyway.
                    // But also: No data is stored on disk.
                    let mut in_flight = self.in_flight.write().unwrap();
                    let success_id = self.upload_metrics.send_success.start_sync();
                    let failure_id = self.upload_metrics.send_failure.start_sync();
                    in_flight.insert(request.document_id.clone(), (success_id, failure_id));
                }

                let mut request = queue.pop_front().unwrap();

                // Adding the `Date` header just before actual upload happens.
                request
                    .headers
                    .insert("Date".to_string(), create_date_header_value(Utc::now()));

                PingUploadTask::Upload { request }
            }
            None => {
                log::info!("No more pings to upload! You are done.");
                PingUploadTask::done()
            }
        }
    }