fn normalize_store()

in glean-core/src/event_database/mod.rs [382:545]


    fn normalize_store(
        &self,
        glean: &Glean,
        store_name: &str,
        store: &mut Vec<StoredEvent>,
        glean_start_time: DateTime<FixedOffset>,
    ) {
        let is_glean_restarted =
            |event: &RecordedEvent| event.category == "glean" && event.name == "restarted";
        let glean_restarted_meta = |store_name: &str| CommonMetricData {
            name: "restarted".into(),
            category: "glean".into(),
            send_in_pings: vec![store_name.into()],
            lifetime: Lifetime::Ping,
            ..Default::default()
        };
        // Step 1
        store.sort_by(|a, b| {
            a.execution_counter
                .cmp(&b.execution_counter)
                .then_with(|| a.event.timestamp.cmp(&b.event.timestamp))
                .then_with(|| {
                    if is_glean_restarted(&a.event) {
                        Ordering::Less
                    } else {
                        Ordering::Greater
                    }
                })
        });
        // Step 2
        // Find the index of the first and final non-`glean.restarted` events.
        // Remove events before the first and after the final.
        let final_event = match store
            .iter()
            .rposition(|event| !is_glean_restarted(&event.event))
        {
            Some(idx) => idx + 1,
            _ => 0,
        };
        store.drain(final_event..);
        let first_event = store
            .iter()
            .position(|event| !is_glean_restarted(&event.event))
            .unwrap_or(store.len());
        store.drain(..first_event);
        if store.is_empty() {
            // There was nothing but `glean.restarted` events. Job's done!
            return;
        }
        // Step 3
        // It is allowed that there might not be any `glean.restarted` event, nor
        // `execution_counter` extra values. (This should always be the case for the
        // "events" ping, for instance).
        // Other inconsistencies are evidence of errors, and so are logged.
        let mut cur_ec = 0;
        // The offset within a group of events with the same `execution_counter`.
        let mut intra_group_offset = store[0].event.timestamp;
        // The offset between this group and ping_info.start_date.
        let mut inter_group_offset = 0;
        let mut highest_ts = 0;
        for event in store.iter_mut() {
            let execution_counter = event.execution_counter.take().unwrap_or(0);
            if is_glean_restarted(&event.event) {
                // We've entered the next "event group".
                // We need a new epoch based on glean.startup.date - ping_info.start_date
                cur_ec = execution_counter;
                let glean_startup_date = event
                    .event
                    .extra
                    .as_mut()
                    .and_then(|extra| {
                        extra.remove("glean.startup.date").and_then(|date_str| {
                            DateTime::parse_from_str(&date_str, TimeUnit::Minute.format_pattern())
                                .map_err(|_| {
                                    record_error(
                                        glean,
                                        &glean_restarted_meta(store_name).into(),
                                        ErrorType::InvalidState,
                                        format!("Unparseable glean.startup.date '{}'", date_str),
                                        None,
                                    );
                                })
                                .ok()
                        })
                    })
                    .unwrap_or(glean_start_time);
                if event
                    .event
                    .extra
                    .as_ref()
                    .is_some_and(|extra| extra.is_empty())
                {
                    // Small optimization to save us sending empty dicts.
                    event.event.extra = None;
                }
                let ping_start = DatetimeMetric::new(
                    CommonMetricData {
                        name: format!("{}#start", store_name),
                        category: "".into(),
                        send_in_pings: vec![INTERNAL_STORAGE.into()],
                        lifetime: Lifetime::User,
                        ..Default::default()
                    },
                    TimeUnit::Minute,
                );
                let ping_start = ping_start
                    .get_value(glean, INTERNAL_STORAGE)
                    .unwrap_or(glean_start_time);
                let time_from_ping_start_to_glean_restarted =
                    (glean_startup_date - ping_start).num_milliseconds();
                intra_group_offset = event.event.timestamp;
                inter_group_offset =
                    u64::try_from(time_from_ping_start_to_glean_restarted).unwrap_or(0);
                if inter_group_offset < highest_ts {
                    record_error(
                        glean,
                        &glean_restarted_meta(store_name).into(),
                        ErrorType::InvalidValue,
                        format!("Time between restart and ping start {} indicates client clock weirdness.", time_from_ping_start_to_glean_restarted),
                        None,
                    );
                    // The client's clock went backwards enough that this event group's
                    // glean.restarted looks like it happened _before_ the final event of the previous group.
                    // Or, it went ahead enough to overflow u64.
                    // Adjust things so this group starts 1ms after the previous one.
                    inter_group_offset = highest_ts + 1;
                }
            } else if cur_ec == 0 {
                // bug 1811872 - cur_ec might need initialization.
                cur_ec = execution_counter;
            }
            event.event.timestamp = event.event.timestamp - intra_group_offset + inter_group_offset;
            if execution_counter != cur_ec {
                record_error(
                    glean,
                    &glean_restarted_meta(store_name).into(),
                    ErrorType::InvalidState,
                    format!(
                        "Inconsistent execution counter {} (expected {})",
                        execution_counter, cur_ec
                    ),
                    None,
                );
                // Let's fix cur_ec up and hope this isn't a sign something big is broken.
                cur_ec = execution_counter;
            }
            if highest_ts > event.event.timestamp {
                // Even though we sorted everything, something in the
                // execution_counter or glean.startup.date math went awry.
                record_error(
                    glean,
                    &glean_restarted_meta(store_name).into(),
                    ErrorType::InvalidState,
                    format!(
                        "Inconsistent previous highest timestamp {} (expected <= {})",
                        highest_ts, event.event.timestamp
                    ),
                    None,
                );
                // Let the highest_ts regress to event.timestamp to hope this minimizes weirdness.
            }
            highest_ts = event.event.timestamp
        }
    }