in packages/firestore/src/local/local_store_impl.ts [463:595]
export function localStoreApplyRemoteEventToLocalCache(
localStore: LocalStore,
remoteEvent: RemoteEvent
): Promise<DocumentMap> {
const localStoreImpl = debugCast(localStore, LocalStoreImpl);
const remoteVersion = remoteEvent.snapshotVersion;
let newTargetDataByTargetMap = localStoreImpl.targetDataByTarget;
return localStoreImpl.persistence
.runTransaction('Apply remote event', 'readwrite-primary', txn => {
const documentBuffer = localStoreImpl.remoteDocuments.newChangeBuffer({
trackRemovals: true // Make sure document removals show up in `getNewDocumentChanges()`
});
// Reset newTargetDataByTargetMap in case this transaction gets re-run.
newTargetDataByTargetMap = localStoreImpl.targetDataByTarget;
const promises = [] as Array<PersistencePromise<void>>;
remoteEvent.targetChanges.forEach((change, targetId) => {
const oldTargetData = newTargetDataByTargetMap.get(targetId);
if (!oldTargetData) {
return;
}
// Only update the remote keys if the target is still active. This
// ensures that we can persist the updated target data along with
// the updated assignment.
promises.push(
localStoreImpl.targetCache
.removeMatchingKeys(txn, change.removedDocuments, targetId)
.next(() => {
return localStoreImpl.targetCache.addMatchingKeys(
txn,
change.addedDocuments,
targetId
);
})
);
let newTargetData = oldTargetData.withSequenceNumber(
txn.currentSequenceNumber
);
if (remoteEvent.targetMismatches.has(targetId)) {
newTargetData = newTargetData
.withResumeToken(
ByteString.EMPTY_BYTE_STRING,
SnapshotVersion.min()
)
.withLastLimboFreeSnapshotVersion(SnapshotVersion.min());
} else if (change.resumeToken.approximateByteSize() > 0) {
newTargetData = newTargetData.withResumeToken(
change.resumeToken,
remoteVersion
);
}
newTargetDataByTargetMap = newTargetDataByTargetMap.insert(
targetId,
newTargetData
);
// Update the target data if there are target changes (or if
// sufficient time has passed since the last update).
if (shouldPersistTargetData(oldTargetData, newTargetData, change)) {
promises.push(
localStoreImpl.targetCache.updateTargetData(txn, newTargetData)
);
}
});
let changedDocs = mutableDocumentMap();
remoteEvent.documentUpdates.forEach(key => {
if (remoteEvent.resolvedLimboDocuments.has(key)) {
promises.push(
localStoreImpl.persistence.referenceDelegate.updateLimboDocument(
txn,
key
)
);
}
});
// Each loop iteration only affects its "own" doc, so it's safe to get all the remote
// documents in advance in a single call.
promises.push(
populateDocumentChangeBuffer(
txn,
documentBuffer,
remoteEvent.documentUpdates
).next(result => {
changedDocs = result;
})
);
// HACK: The only reason we allow a null snapshot version is so that we
// can synthesize remote events when we get permission denied errors while
// trying to resolve the state of a locally cached document that is in
// limbo.
if (!remoteVersion.isEqual(SnapshotVersion.min())) {
const updateRemoteVersion = localStoreImpl.targetCache
.getLastRemoteSnapshotVersion(txn)
.next(lastRemoteSnapshotVersion => {
debugAssert(
remoteVersion.compareTo(lastRemoteSnapshotVersion) >= 0,
'Watch stream reverted to previous snapshot?? ' +
remoteVersion +
' < ' +
lastRemoteSnapshotVersion
);
return localStoreImpl.targetCache.setTargetsMetadata(
txn,
txn.currentSequenceNumber,
remoteVersion
);
});
promises.push(updateRemoteVersion);
}
return PersistencePromise.waitFor(promises)
.next(() => documentBuffer.apply(txn))
.next(() =>
localStoreImpl.localDocuments.applyLocalViewToDocuments(
txn,
changedDocs
)
)
.next(() => changedDocs);
})
.then(changedDocs => {
localStoreImpl.targetDataByTarget = newTargetDataByTargetMap;
return changedDocs;
});
}