in components/places/src/bookmark_sync/engine.rs [132:464]
fn update_local_items_in_places(
db: &PlacesDb,
scope: &SqlInterruptScope,
now: Timestamp,
ops: &CompletionOps<'_>,
) -> Result<()> {
// Build a table of new and updated items.
log::debug!("Staging apply remote item ops");
sql_support::each_sized_chunk(
&ops.apply_remote_items,
sql_support::default_max_variable_number() / 3,
|chunk, _| -> Result<()> {
// CTEs in `WITH` clauses aren't indexed, so this query needs a
// full table scan on `ops`. But that's okay; a separate temp
// table for ops would also need a full scan. Note that we need
// both the local _and_ remote GUIDs here, because we haven't
// changed the local GUIDs yet.
let sql = format!(
"WITH ops(mergedGuid, localGuid, remoteGuid, remoteType,
level) AS (
VALUES {ops}
)
INSERT INTO itemsToApply(mergedGuid, localId, remoteId,
remoteGuid, newLevel, newKind,
localDateAdded, remoteDateAdded,
lastModified, oldTitle, newTitle,
oldPlaceId, newPlaceId,
newKeyword)
SELECT n.mergedGuid, b.id, v.id,
v.guid, n.level, n.remoteType,
b.dateAdded, v.dateAdded,
MAX(v.dateAdded, {now}), b.title, v.title,
b.fk, v.placeId,
v.keyword
FROM ops n
JOIN moz_bookmarks_synced v ON v.guid = n.remoteGuid
LEFT JOIN moz_bookmarks b ON b.guid = n.localGuid",
ops = sql_support::repeat_display(chunk.len(), ",", |index, f| {
let op = &chunk[index];
write!(
f,
"(?, ?, ?, {}, {})",
SyncedBookmarkKind::from(op.remote_node().kind) as u8,
op.level
)
}),
now = now,
);
// We can't avoid allocating here, since we're binding four
// parameters per descendant. Rust's `SliceConcatExt::concat`
// is semantically equivalent, but requires a second allocation,
// which we _can_ avoid by writing this out.
let mut params = Vec::with_capacity(chunk.len() * 3);
for op in chunk.iter() {
scope.err_if_interrupted()?;
let merged_guid = op.merged_node.guid.as_str();
params.push(Some(merged_guid));
let local_guid = op
.merged_node
.merge_state
.local_node()
.map(|node| node.guid.as_str());
params.push(local_guid);
let remote_guid = op.remote_node().guid.as_str();
params.push(Some(remote_guid));
}
db.execute(&sql, rusqlite::params_from_iter(params))?;
Ok(())
},
)?;
log::debug!("Staging change GUID ops");
sql_support::each_sized_chunk(
&ops.change_guids,
sql_support::default_max_variable_number() / 2,
|chunk, _| -> Result<()> {
let sql = format!(
"INSERT INTO changeGuidOps(localGuid, mergedGuid,
syncStatus, level, lastModified)
VALUES {}",
sql_support::repeat_display(chunk.len(), ",", |index, f| {
let op = &chunk[index];
// If only the local GUID changed, the item was deduped, so we
// can mark it as syncing. Otherwise, we changed an invalid
// GUID locally or remotely, so we leave its original sync
// status in place until we've uploaded it.
let sync_status = if op.merged_node.remote_guid_changed() {
None
} else {
Some(SyncStatus::Normal as u8)
};
write!(
f,
"(?, ?, {}, {}, {})",
NullableFragment(sync_status),
op.level,
now
)
}),
);
let mut params = Vec::with_capacity(chunk.len() * 2);
for op in chunk.iter() {
scope.err_if_interrupted()?;
let local_guid = op.local_node().guid.as_str();
params.push(local_guid);
let merged_guid = op.merged_node.guid.as_str();
params.push(merged_guid);
}
db.execute(&sql, rusqlite::params_from_iter(params))?;
Ok(())
},
)?;
log::debug!("Staging apply new local structure ops");
sql_support::each_sized_chunk(
&ops.apply_new_local_structure,
sql_support::default_max_variable_number() / 2,
|chunk, _| -> Result<()> {
let sql = format!(
"INSERT INTO applyNewLocalStructureOps(
mergedGuid, mergedParentGuid, position, level,
lastModified
)
VALUES {}",
sql_support::repeat_display(chunk.len(), ",", |index, f| {
let op = &chunk[index];
write!(f, "(?, ?, {}, {}, {})", op.position, op.level, now)
}),
);
let mut params = Vec::with_capacity(chunk.len() * 2);
for op in chunk.iter() {
scope.err_if_interrupted()?;
let merged_guid = op.merged_node.guid.as_str();
params.push(merged_guid);
let merged_parent_guid = op.merged_parent_node.guid.as_str();
params.push(merged_parent_guid);
}
db.execute(&sql, rusqlite::params_from_iter(params))?;
Ok(())
},
)?;
log::debug!("Removing tombstones for revived items");
sql_support::each_chunk_mapped(
&ops.delete_local_tombstones,
|op| op.guid().as_str(),
|chunk, _| -> Result<()> {
scope.err_if_interrupted()?;
db.execute(
&format!(
"DELETE FROM moz_bookmarks_deleted
WHERE guid IN ({})",
sql_support::repeat_sql_vars(chunk.len())
),
rusqlite::params_from_iter(chunk),
)?;
Ok(())
},
)?;
log::debug!("Inserting new tombstones for non-syncable and invalid items");
sql_support::each_chunk_mapped(
&ops.insert_local_tombstones,
|op| op.remote_node().guid.as_str().to_owned(),
|chunk, _| -> Result<()> {
scope.err_if_interrupted()?;
db.execute(
&format!(
"INSERT INTO moz_bookmarks_deleted(guid, dateRemoved)
VALUES {}",
sql_support::repeat_display(chunk.len(), ",", |_, f| write!(f, "(?, {})", now)),
),
rusqlite::params_from_iter(chunk),
)?;
Ok(())
},
)?;
log::debug!("Flag frecencies for removed bookmark URLs as stale");
sql_support::each_chunk_mapped(
&ops.delete_local_items,
|op| op.local_node().guid.as_str().to_owned(),
|chunk, _| -> Result<()> {
scope.err_if_interrupted()?;
db.execute(
&format!(
"REPLACE INTO moz_places_stale_frecencies(
place_id, stale_at
)
SELECT b.fk, {now}
FROM moz_bookmarks b
WHERE b.guid IN ({vars})
AND b.fk NOT NULL",
now = now,
vars = sql_support::repeat_sql_vars(chunk.len())
),
rusqlite::params_from_iter(chunk),
)?;
Ok(())
},
)?;
log::debug!("Removing deleted items from Places");
sql_support::each_chunk_mapped(
&ops.delete_local_items,
|op| op.local_node().guid.as_str().to_owned(),
|chunk, _| -> Result<()> {
scope.err_if_interrupted()?;
db.execute(
&format!(
"DELETE FROM moz_bookmarks
WHERE guid IN ({})",
sql_support::repeat_sql_vars(chunk.len())
),
rusqlite::params_from_iter(chunk),
)?;
Ok(())
},
)?;
log::debug!("Changing GUIDs");
scope.err_if_interrupted()?;
db.execute_batch("DELETE FROM changeGuidOps")?;
log::debug!("Applying remote items");
apply_remote_items(db, scope, now)?;
// Fires the `applyNewLocalStructure` trigger.
log::debug!("Applying new local structure");
scope.err_if_interrupted()?;
db.execute_batch("DELETE FROM applyNewLocalStructureOps")?;
// Similar to the check in apply_remote_items, however we do a post check
// to see if dogear was unable to fix up the issue
let orphaned_count: i64 = db.query_row(
"WITH RECURSIVE orphans(id) AS (
SELECT b.id
FROM moz_bookmarks b
WHERE b.parent IS NOT NULL
AND NOT EXISTS (
SELECT 1 FROM moz_bookmarks p WHERE p.id = b.parent
)
UNION
SELECT c.id
FROM moz_bookmarks c
JOIN orphans o ON c.parent = o.id
)
SELECT COUNT(*) FROM orphans;",
[],
|row| row.get(0),
)?;
if orphaned_count > 0 {
log::warn!("Found {} orphaned bookmarks after sync", orphaned_count);
error_support::report_error!(
"places-sync-bookmarks-orphaned",
"found local orphaned bookmarks after we applied new local structure ops: {}",
orphaned_count,
);
}
log::debug!("Resetting change counters for items that shouldn't be uploaded");
sql_support::each_chunk_mapped(
&ops.set_local_merged,
|op| op.merged_node.guid.as_str(),
|chunk, _| -> Result<()> {
scope.err_if_interrupted()?;
db.execute(
&format!(
"UPDATE moz_bookmarks SET
syncChangeCounter = 0
WHERE guid IN ({})",
sql_support::repeat_sql_vars(chunk.len()),
),
rusqlite::params_from_iter(chunk),
)?;
Ok(())
},
)?;
log::debug!("Bumping change counters for items that should be uploaded");
sql_support::each_chunk_mapped(
&ops.set_local_unmerged,
|op| op.merged_node.guid.as_str(),
|chunk, _| -> Result<()> {
scope.err_if_interrupted()?;
db.execute(
&format!(
"UPDATE moz_bookmarks SET
syncChangeCounter = 1
WHERE guid IN ({})",
sql_support::repeat_sql_vars(chunk.len()),
),
rusqlite::params_from_iter(chunk),
)?;
Ok(())
},
)?;
log::debug!("Flagging applied remote items as merged");
sql_support::each_chunk_mapped(
&ops.set_remote_merged,
|op| op.guid().as_str(),
|chunk, _| -> Result<()> {
scope.err_if_interrupted()?;
db.execute(
&format!(
"UPDATE moz_bookmarks_synced SET
needsMerge = 0
WHERE guid IN ({})",
sql_support::repeat_sql_vars(chunk.len()),
),
rusqlite::params_from_iter(chunk),
)?;
Ok(())
},
)?;
Ok(())
}