in components/sync15/src/clients_engine/engine.rs [52:192]
fn sync(
&mut self,
inbound: Vec<IncomingBso>,
should_refresh_client: bool,
) -> Result<Vec<OutgoingBso>> {
self.interruptee.err_if_interrupted()?;
let outgoing_commands = self.command_processor.fetch_outgoing_commands()?;
let mut has_own_client_record = false;
let mut changes = Vec::new();
for bso in inbound {
self.interruptee.err_if_interrupted()?;
let content = bso.into_content();
let client: ClientRecord = match content.kind {
IncomingKind::Malformed => {
log::debug!("Error unpacking record");
continue;
}
IncomingKind::Tombstone => {
log::debug!("Record has been deleted; skipping...");
continue;
}
IncomingKind::Content(client) => client,
};
if client.id == self.command_processor.settings().fxa_device_id {
log::debug!("Found my record on the server");
// If we see our own client record, apply any incoming commands,
// remove them from the list, and reupload the record. Any
// commands that we don't understand also go back in the list.
// https://github.com/mozilla/application-services/issues/1800
// tracks if that's the right thing to do.
has_own_client_record = true;
let mut current_client_record = self.current_client_record();
for c in &client.commands {
let status = match c.as_command() {
Some(command) => self.command_processor.apply_incoming_command(command)?,
None => CommandStatus::Unsupported,
};
match status {
CommandStatus::Applied => {}
CommandStatus::Ignored => {
log::debug!("Ignored command {:?}", c);
}
CommandStatus::Unsupported => {
log::warn!("Don't know how to apply command {:?}", c);
current_client_record.commands.push(c.clone());
}
}
}
// The clients collection has a hard limit on the payload size,
// after which the server starts rejecting our records. Large
// command lists can cause us to exceed this, so we truncate
// the list.
shrink_to_fit(
&mut current_client_record.commands,
self.memcache_max_record_payload_size(),
)?;
// Add the new client record to our map of recently synced
// clients, so that downstream consumers like synced tabs can
// access them.
self.note_recent_client(¤t_client_record);
// We periodically upload our own client record, even if it
// doesn't change, to keep it fresh.
if should_refresh_client || client != current_client_record {
log::debug!("Will update our client record on the server");
let envelope = OutgoingEnvelope {
id: content.envelope.id,
ttl: Some(CLIENTS_TTL),
..Default::default()
};
changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
}
} else {
// Add the other client to our map of recently synced clients.
self.note_recent_client(&client);
// Bail if we don't have any outgoing commands to write into
// the other client's record.
if outgoing_commands.is_empty() {
continue;
}
// Determine if we have new commands, that aren't already in the
// client's command list.
let current_commands: HashSet<Command> = client
.commands
.iter()
.filter_map(|c| c.as_command())
.collect();
let mut new_outgoing_commands = outgoing_commands
.difference(¤t_commands)
.cloned()
.collect::<Vec<_>>();
// Sort, to ensure deterministic ordering for tests.
new_outgoing_commands.sort();
let mut new_client = client.clone();
new_client
.commands
.extend(new_outgoing_commands.into_iter().map(CommandRecord::from));
if new_client.commands.len() == client.commands.len() {
continue;
}
// Hooray, we added new commands! Make sure the record still
// fits in the maximum record size, or the server will reject
// our upload.
shrink_to_fit(
&mut new_client.commands,
self.memcache_max_record_payload_size(),
)?;
let envelope = OutgoingEnvelope {
id: content.envelope.id,
ttl: Some(CLIENTS_TTL),
..Default::default()
};
changes.push(OutgoingBso::from_content(envelope, new_client)?);
}
}
// Upload a record for our own client, if we didn't replace it already.
if !has_own_client_record {
let current_client_record = self.current_client_record();
self.note_recent_client(¤t_client_record);
let envelope = OutgoingEnvelope {
id: Guid::new(¤t_client_record.id),
ttl: Some(CLIENTS_TTL),
..Default::default()
};
changes.push(OutgoingBso::from_content(envelope, current_client_record)?);
}
Ok(changes)
}