fn sync()

in components/sync15/src/clients_engine/engine.rs [52:192]


    fn sync(
        &mut self,
        inbound: Vec<IncomingBso>,
        should_refresh_client: bool,
    ) -> Result<Vec<OutgoingBso>> {
        self.interruptee.err_if_interrupted()?;
        let outgoing_commands = self.command_processor.fetch_outgoing_commands()?;

        let mut has_own_client_record = false;
        let mut changes = Vec::new();

        for bso in inbound {
            self.interruptee.err_if_interrupted()?;

            let content = bso.into_content();

            let client: ClientRecord = match content.kind {
                IncomingKind::Malformed => {
                    log::debug!("Error unpacking record");
                    continue;
                }
                IncomingKind::Tombstone => {
                    log::debug!("Record has been deleted; skipping...");
                    continue;
                }
                IncomingKind::Content(client) => client,
            };

            if client.id == self.command_processor.settings().fxa_device_id {
                log::debug!("Found my record on the server");
                // If we see our own client record, apply any incoming commands,
                // remove them from the list, and reupload the record. Any
                // commands that we don't understand also go back in the list.
                // https://github.com/mozilla/application-services/issues/1800
                // tracks if that's the right thing to do.
                has_own_client_record = true;
                let mut current_client_record = self.current_client_record();
                for c in &client.commands {
                    let status = match c.as_command() {
                        Some(command) => self.command_processor.apply_incoming_command(command)?,
                        None => CommandStatus::Unsupported,
                    };
                    match status {
                        CommandStatus::Applied => {}
                        CommandStatus::Ignored => {
                            log::debug!("Ignored command {:?}", c);
                        }
                        CommandStatus::Unsupported => {
                            log::warn!("Don't know how to apply command {:?}", c);
                            current_client_record.commands.push(c.clone());
                        }
                    }
                }

                // The clients collection has a hard limit on the payload size,
                // after which the server starts rejecting our records. Large
                // command lists can cause us to exceed this, so we truncate
                // the list.
                shrink_to_fit(
                    &mut current_client_record.commands,
                    self.memcache_max_record_payload_size(),
                )?;

                // Add the new client record to our map of recently synced
                // clients, so that downstream consumers like synced tabs can
                // access them.
                self.note_recent_client(&current_client_record);

                // We periodically upload our own client record, even if it
                // doesn't change, to keep it fresh.
                if should_refresh_client || client != current_client_record {
                    log::debug!("Will update our client record on the server");
                    let envelope = OutgoingEnvelope {
                        id: content.envelope.id,
                        ttl: Some(CLIENTS_TTL),
                        ..Default::default()
                    };
                    changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
                }
            } else {
                // Add the other client to our map of recently synced clients.
                self.note_recent_client(&client);

                // Bail if we don't have any outgoing commands to write into
                // the other client's record.
                if outgoing_commands.is_empty() {
                    continue;
                }

                // Determine if we have new commands, that aren't already in the
                // client's command list.
                let current_commands: HashSet<Command> = client
                    .commands
                    .iter()
                    .filter_map(|c| c.as_command())
                    .collect();
                let mut new_outgoing_commands = outgoing_commands
                    .difference(&current_commands)
                    .cloned()
                    .collect::<Vec<_>>();
                // Sort, to ensure deterministic ordering for tests.
                new_outgoing_commands.sort();
                let mut new_client = client.clone();
                new_client
                    .commands
                    .extend(new_outgoing_commands.into_iter().map(CommandRecord::from));
                if new_client.commands.len() == client.commands.len() {
                    continue;
                }

                // Hooray, we added new commands! Make sure the record still
                // fits in the maximum record size, or the server will reject
                // our upload.
                shrink_to_fit(
                    &mut new_client.commands,
                    self.memcache_max_record_payload_size(),
                )?;

                let envelope = OutgoingEnvelope {
                    id: content.envelope.id,
                    ttl: Some(CLIENTS_TTL),
                    ..Default::default()
                };
                changes.push(OutgoingBso::from_content(envelope, new_client)?);
            }
        }

        // Upload a record for our own client, if we didn't replace it already.
        if !has_own_client_record {
            let current_client_record = self.current_client_record();
            self.note_recent_client(&current_client_record);
            let envelope = OutgoingEnvelope {
                id: Guid::new(&current_client_record.id),
                ttl: Some(CLIENTS_TTL),
                ..Default::default()
            };
            changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
        }

        Ok(changes)
    }