in components/sync15/src/client/state.rs [297:523]
fn advance(&mut self, from: SetupState) -> error::Result<SetupState> {
match from {
// Fetch `info/configuration` with current server limits, and
// `info/collections` with collection last modified times.
Initial => {
let config = match self.client.fetch_info_configuration()? {
Sync15ClientResponse::Success { record, .. } => record,
Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
InfoConfiguration::default()
}
other => return Err(other.create_storage_error()),
};
Ok(InitialWithConfig { config })
}
// XXX - we could consider combining these Initial* states, because we don't
// attempt to support filling in "missing" global state - *any* 404 in them
// means `FreshStart`.
// IOW, in all cases, they either `Err()`, move to `FreshStartRequired`, or
// advance to a specific next state.
InitialWithConfig { config } => {
match self.client.fetch_info_collections()? {
Sync15ClientResponse::Success {
record: collections,
..
} => Ok(InitialWithInfo {
config,
collections,
}),
// If the server doesn't have a `crypto/keys`, start over
// and reupload our `meta/global` and `crypto/keys`.
Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
Ok(FreshStartRequired { config })
}
other => Err(other.create_storage_error()),
}
}
InitialWithInfo {
config,
collections,
} => {
match self.client.fetch_meta_global()? {
Sync15ClientResponse::Success {
record: mut global,
last_modified: mut global_timestamp,
..
} => {
// If the server has a newer storage version, we can't
// sync until our client is updated.
if global.storage_version > STORAGE_VERSION {
return Err(ErrorKind::ClientUpgradeRequired);
}
// If the server has an older storage version, wipe and
// reupload.
if global.storage_version < STORAGE_VERSION {
Ok(FreshStartRequired { config })
} else {
log::info!("Have info/collections and meta/global. Computing new engine states");
let initial_global_declined: HashSet<String> =
global.declined.iter().cloned().collect();
let result = compute_engine_states(EngineStateInput {
local_declined: self.pgs.get_declined().iter().cloned().collect(),
user_changes: self.engine_updates.cloned().unwrap_or_default(),
remote: Some(RemoteEngineState {
declined: initial_global_declined.clone(),
info_collections: collections.keys().cloned().collect(),
}),
});
// Persist the new declined.
self.pgs
.set_declined(result.declined.iter().cloned().collect());
// If the declined engines differ from remote, fix that.
let fixed_declined = if result.declined != initial_global_declined {
global.declined = result.declined.iter().cloned().collect();
log::info!(
"Uploading new declined {:?} to meta/global with timestamp {:?}",
global.declined,
global_timestamp,
);
true
} else {
false
};
// If there are missing syncIds, we need to fix those as well
let fixed_ids = if fixup_meta_global(&mut global) {
log::info!(
"Uploading corrected meta/global with timestamp {:?}",
global_timestamp,
);
true
} else {
false
};
if fixed_declined || fixed_ids {
global_timestamp =
self.client.put_meta_global(global_timestamp, &global)?;
log::debug!("new global_timestamp: {:?}", global_timestamp);
}
// Update the set of changes needed.
if self.changes_needed.is_some() {
// Should never happen (we prevent state machine
// loops elsewhere) but if it did, the info is stale
// anyway.
log::warn!("Already have a set of changes needed, Overwriting...");
}
self.changes_needed = Some(result.changes_needed);
Ok(InitialWithMetaGlobal {
config,
collections,
global,
global_timestamp,
})
}
}
Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
Ok(FreshStartRequired { config })
}
other => Err(other.create_storage_error()),
}
}
InitialWithMetaGlobal {
config,
collections,
global,
global_timestamp,
} => {
// Now try and get keys etc - if we fresh-start we'll re-use declined.
match self.client.fetch_crypto_keys()? {
Sync15ClientResponse::Success {
record,
last_modified,
..
} => {
// Note that collection/keys is itself a bso, so the
// json body also carries the timestamp. If they aren't
// identical something has screwed up and we should die.
assert_eq!(last_modified, record.envelope.modified);
let state = GlobalState {
config,
collections,
global,
global_timestamp,
keys: record.payload,
keys_timestamp: last_modified,
};
Ok(Ready { state })
}
// If the server doesn't have a `crypto/keys`, start over
// and reupload our `meta/global` and `crypto/keys`.
Sync15ClientResponse::Error(ErrorResponse::NotFound { .. }) => {
Ok(FreshStartRequired { config })
}
other => Err(other.create_storage_error()),
}
}
// We've got old state that's likely to be OK.
// We keep things simple here - if there's evidence of a new/missing
// meta/global or new/missing keys we just restart from scratch.
WithPreviousState { old_state } => match self.client.fetch_info_collections()? {
Sync15ClientResponse::Success {
record: collections,
..
} => Ok(
if self.engine_updates.is_none()
&& is_same_timestamp(old_state.global_timestamp, &collections, "meta")
&& is_same_timestamp(old_state.keys_timestamp, &collections, "crypto")
{
Ready {
state: GlobalState {
collections,
..old_state
},
}
} else {
InitialWithConfig {
config: old_state.config,
}
},
),
_ => Ok(InitialWithConfig {
config: old_state.config,
}),
},
Ready { state } => Ok(Ready { state }),
FreshStartRequired { config } => {
// Wipe the server.
log::info!("Fresh start: wiping remote");
self.client.wipe_all_remote()?;
// Upload a fresh `meta/global`...
log::info!("Uploading meta/global");
let computed = compute_engine_states(EngineStateInput {
local_declined: self.pgs.get_declined().iter().cloned().collect(),
user_changes: self.engine_updates.cloned().unwrap_or_default(),
remote: None,
});
self.pgs
.set_declined(computed.declined.iter().cloned().collect());
self.changes_needed = Some(computed.changes_needed);
let new_global = new_global(self.pgs);
self.client
.put_meta_global(ServerTimestamp::default(), &new_global)?;
// ...And a fresh `crypto/keys`.
let new_keys = CollectionKeys::new_random()?.to_encrypted_payload(self.root_key)?;
let bso = OutgoingEncryptedBso::new(Guid::new("keys").into(), new_keys);
self.client
.put_crypto_keys(ServerTimestamp::default(), &bso)?;
// TODO(lina): Can we pass along server timestamps from the PUTs
// above, and avoid re-fetching the `m/g` and `c/k` we just
// uploaded?
// OTOH(mark): restarting the state machine keeps life simple and rare.
Ok(InitialWithConfig { config })
}
}
}