export function localStoreApplyRemoteEventToLocalCache()

in packages/firestore/src/local/local_store_impl.ts [463:595]


export function localStoreApplyRemoteEventToLocalCache(
  localStore: LocalStore,
  remoteEvent: RemoteEvent
): Promise<DocumentMap> {
  const localStoreImpl = debugCast(localStore, LocalStoreImpl);
  const remoteVersion = remoteEvent.snapshotVersion;
  let newTargetDataByTargetMap = localStoreImpl.targetDataByTarget;

  return localStoreImpl.persistence
    .runTransaction('Apply remote event', 'readwrite-primary', txn => {
      const documentBuffer = localStoreImpl.remoteDocuments.newChangeBuffer({
        trackRemovals: true // Make sure document removals show up in `getNewDocumentChanges()`
      });

      // Reset newTargetDataByTargetMap in case this transaction gets re-run.
      newTargetDataByTargetMap = localStoreImpl.targetDataByTarget;

      const promises = [] as Array<PersistencePromise<void>>;
      remoteEvent.targetChanges.forEach((change, targetId) => {
        const oldTargetData = newTargetDataByTargetMap.get(targetId);
        if (!oldTargetData) {
          return;
        }

        // Only update the remote keys if the target is still active. This
        // ensures that we can persist the updated target data along with
        // the updated assignment.
        promises.push(
          localStoreImpl.targetCache
            .removeMatchingKeys(txn, change.removedDocuments, targetId)
            .next(() => {
              return localStoreImpl.targetCache.addMatchingKeys(
                txn,
                change.addedDocuments,
                targetId
              );
            })
        );

        let newTargetData = oldTargetData.withSequenceNumber(
          txn.currentSequenceNumber
        );
        if (remoteEvent.targetMismatches.has(targetId)) {
          newTargetData = newTargetData
            .withResumeToken(
              ByteString.EMPTY_BYTE_STRING,
              SnapshotVersion.min()
            )
            .withLastLimboFreeSnapshotVersion(SnapshotVersion.min());
        } else if (change.resumeToken.approximateByteSize() > 0) {
          newTargetData = newTargetData.withResumeToken(
            change.resumeToken,
            remoteVersion
          );
        }

        newTargetDataByTargetMap = newTargetDataByTargetMap.insert(
          targetId,
          newTargetData
        );

        // Update the target data if there are target changes (or if
        // sufficient time has passed since the last update).
        if (shouldPersistTargetData(oldTargetData, newTargetData, change)) {
          promises.push(
            localStoreImpl.targetCache.updateTargetData(txn, newTargetData)
          );
        }
      });

      let changedDocs = mutableDocumentMap();
      remoteEvent.documentUpdates.forEach(key => {
        if (remoteEvent.resolvedLimboDocuments.has(key)) {
          promises.push(
            localStoreImpl.persistence.referenceDelegate.updateLimboDocument(
              txn,
              key
            )
          );
        }
      });

      // Each loop iteration only affects its "own" doc, so it's safe to get all the remote
      // documents in advance in a single call.
      promises.push(
        populateDocumentChangeBuffer(
          txn,
          documentBuffer,
          remoteEvent.documentUpdates
        ).next(result => {
          changedDocs = result;
        })
      );

      // HACK: The only reason we allow a null snapshot version is so that we
      // can synthesize remote events when we get permission denied errors while
      // trying to resolve the state of a locally cached document that is in
      // limbo.
      if (!remoteVersion.isEqual(SnapshotVersion.min())) {
        const updateRemoteVersion = localStoreImpl.targetCache
          .getLastRemoteSnapshotVersion(txn)
          .next(lastRemoteSnapshotVersion => {
            debugAssert(
              remoteVersion.compareTo(lastRemoteSnapshotVersion) >= 0,
              'Watch stream reverted to previous snapshot?? ' +
                remoteVersion +
                ' < ' +
                lastRemoteSnapshotVersion
            );
            return localStoreImpl.targetCache.setTargetsMetadata(
              txn,
              txn.currentSequenceNumber,
              remoteVersion
            );
          });
        promises.push(updateRemoteVersion);
      }

      return PersistencePromise.waitFor(promises)
        .next(() => documentBuffer.apply(txn))
        .next(() =>
          localStoreImpl.localDocuments.applyLocalViewToDocuments(
            txn,
            changedDocs
          )
        )
        .next(() => changedDocs);
    })
    .then(changedDocs => {
      localStoreImpl.targetDataByTarget = newTargetDataByTargetMap;
      return changedDocs;
    });
}