packages/relay-runtime/store/experimental-live-resolvers/LiveResolverStore.js (626 lines of code) (raw):

/** * Copyright (c) Meta Platforms, Inc. and affiliates. * * This source code is licensed under the MIT license found in the * LICENSE file in the root directory of this source tree. * * @flow strict-local * @format */ // flowlint ambiguous-object-type:error 'use strict'; import type {DataID, Disposable} from '../../util/RelayRuntimeTypes'; import type {Availability} from '../DataChecker'; import type {GetDataID} from '../RelayResponseNormalizer'; import type { CheckOptions, DataIDSet, LogFunction, MutableRecordSource, OperationAvailability, OperationDescriptor, OperationLoader, RecordSource, RequestDescriptor, Scheduler, SingularReaderSelector, Snapshot, Store, StoreSubscriptions, } from '../RelayStoreTypes'; import type {ResolverCache} from '../ResolverCache'; const { INTERNAL_ACTOR_IDENTIFIER_DO_NOT_USE, assertInternalActorIndentifier, } = require('../../multi-actor-environment/ActorIdentifier'); const deepFreeze = require('../../util/deepFreeze'); const RelayFeatureFlags = require('../../util/RelayFeatureFlags'); const resolveImmediate = require('../../util/resolveImmediate'); const DataChecker = require('../DataChecker'); const defaultGetDataID = require('../defaultGetDataID'); const RelayModernRecord = require('../RelayModernRecord'); const RelayOptimisticRecordSource = require('../RelayOptimisticRecordSource'); const RelayReader = require('../RelayReader'); const RelayReferenceMarker = require('../RelayReferenceMarker'); const RelayStoreReactFlightUtils = require('../RelayStoreReactFlightUtils'); const RelayStoreSubscriptions = require('../RelayStoreSubscriptions'); const RelayStoreUtils = require('../RelayStoreUtils'); const {ROOT_ID, ROOT_TYPE} = require('../RelayStoreUtils'); const {LiveResolverCache} = require('./LiveResolverCache'); const invariant = require('invariant'); export type LiveState<T> = {| read(): T, subscribe(cb: () => void): () => void, |}; // HACK // The type of Store is defined using an opaque type that only RelayModernStore // can create. For now, we just lie via any/FlowFixMe and pretend we really have // the opaque version, but in reality it's our local verison. opaque type InvalidationState = {| dataIDs: $ReadOnlyArray<DataID>, invalidations: Map<DataID, ?number>, |}; type InvalidationSubscription = {| callback: () => void, invalidationState: InvalidationState, |}; const DEFAULT_RELEASE_BUFFER_SIZE = 10; /** * @public * * Experimental fork of RelayModernStore */ class LiveResolverStore implements Store { _currentWriteEpoch: number; _gcHoldCounter: number; _gcReleaseBufferSize: number; _gcRun: ?Generator<void, void, void>; _gcScheduler: Scheduler; _getDataID: GetDataID; _globalInvalidationEpoch: ?number; _invalidationSubscriptions: Set<InvalidationSubscription>; _invalidatedRecordIDs: DataIDSet; __log: ?LogFunction; _queryCacheExpirationTime: ?number; _operationLoader: ?OperationLoader; _optimisticSource: ?MutableRecordSource; _recordSource: MutableRecordSource; _resolverCache: ResolverCache; _releaseBuffer: Array<string>; _roots: Map< string, {| operation: OperationDescriptor, refCount: number, epoch: ?number, fetchTime: ?number, |}, >; _shouldScheduleGC: boolean; _storeSubscriptions: StoreSubscriptions; _updatedRecordIDs: DataIDSet; _shouldProcessClientComponents: ?boolean; constructor( source: MutableRecordSource, options?: {| gcScheduler?: ?Scheduler, log?: ?LogFunction, operationLoader?: ?OperationLoader, getDataID?: ?GetDataID, gcReleaseBufferSize?: ?number, queryCacheExpirationTime?: ?number, shouldProcessClientComponents?: ?boolean, |}, ) { // Prevent mutation of a record from outside the store. if (__DEV__) { const storeIDs = source.getRecordIDs(); for (let ii = 0; ii < storeIDs.length; ii++) { const record = source.get(storeIDs[ii]); if (record) { RelayModernRecord.freeze(record); } } } this._currentWriteEpoch = 0; this._gcHoldCounter = 0; this._gcReleaseBufferSize = options?.gcReleaseBufferSize ?? DEFAULT_RELEASE_BUFFER_SIZE; this._gcRun = null; this._gcScheduler = options?.gcScheduler ?? resolveImmediate; this._getDataID = options?.getDataID ?? defaultGetDataID; this._globalInvalidationEpoch = null; this._invalidationSubscriptions = new Set(); this._invalidatedRecordIDs = new Set(); this.__log = options?.log ?? null; this._queryCacheExpirationTime = options?.queryCacheExpirationTime; this._operationLoader = options?.operationLoader ?? null; this._optimisticSource = null; this._recordSource = source; this._releaseBuffer = []; this._roots = new Map(); this._shouldScheduleGC = false; this._resolverCache = new LiveResolverCache( () => this._getMutableRecordSource(), this, ); this._storeSubscriptions = new RelayStoreSubscriptions( options?.log, this._resolverCache, ); this._updatedRecordIDs = new Set(); this._shouldProcessClientComponents = options?.shouldProcessClientComponents; initializeRecordSource(this._recordSource); } getSource(): RecordSource { return this._optimisticSource ?? this._recordSource; } _getMutableRecordSource(): MutableRecordSource { return this._optimisticSource ?? this._recordSource; } check( operation: OperationDescriptor, options?: CheckOptions, ): OperationAvailability { const selector = operation.root; const source = this._getMutableRecordSource(); const globalInvalidationEpoch = this._globalInvalidationEpoch; const rootEntry = this._roots.get(operation.request.identifier); const operationLastWrittenAt = rootEntry != null ? rootEntry.epoch : null; // Check if store has been globally invalidated if (globalInvalidationEpoch != null) { // If so, check if the operation we're checking was last written // before or after invalidation occurred. if ( operationLastWrittenAt == null || operationLastWrittenAt <= globalInvalidationEpoch ) { // If the operation was written /before/ global invalidation occurred, // or if this operation has never been written to the store before, // we will consider the data for this operation to be stale // (i.e. not resolvable from the store). return {status: 'stale'}; } } const handlers = options?.handlers ?? []; const getSourceForActor = options?.getSourceForActor ?? (actorIdentifier => { assertInternalActorIndentifier(actorIdentifier); return source; }); const getTargetForActor = options?.getTargetForActor ?? (actorIdentifier => { assertInternalActorIndentifier(actorIdentifier); return source; }); const operationAvailability = DataChecker.check( getSourceForActor, getTargetForActor, options?.defaultActorIdentifier ?? INTERNAL_ACTOR_IDENTIFIER_DO_NOT_USE, selector, handlers, this._operationLoader, this._getDataID, this._shouldProcessClientComponents, ); return getAvailabilityStatus( operationAvailability, operationLastWrittenAt, rootEntry?.fetchTime, this._queryCacheExpirationTime, ); } retain(operation: OperationDescriptor): Disposable { const id = operation.request.identifier; let disposed = false; const dispose = () => { // Ensure each retain can only dispose once if (disposed) { return; } disposed = true; // For Flow: guard against the entry somehow not existing const rootEntry = this._roots.get(id); if (rootEntry == null) { return; } // Decrement the ref count: if it becomes zero it is eligible // for release. rootEntry.refCount--; if (rootEntry.refCount === 0) { const {_queryCacheExpirationTime} = this; const rootEntryIsStale = rootEntry.fetchTime != null && _queryCacheExpirationTime != null && rootEntry.fetchTime <= Date.now() - _queryCacheExpirationTime; if (rootEntryIsStale) { this._roots.delete(id); this.scheduleGC(); } else { this._releaseBuffer.push(id); // If the release buffer is now over-full, remove the least-recently // added entry and schedule a GC. Note that all items in the release // buffer have a refCount of 0. if (this._releaseBuffer.length > this._gcReleaseBufferSize) { const _id = this._releaseBuffer.shift(); this._roots.delete(_id); this.scheduleGC(); } } } }; const rootEntry = this._roots.get(id); if (rootEntry != null) { if (rootEntry.refCount === 0) { // This entry should be in the release buffer, but it no longer belongs // there since it's retained. Remove it to maintain the invariant that // all release buffer entries have a refCount of 0. this._releaseBuffer = this._releaseBuffer.filter(_id => _id !== id); } // If we've previously retained this operation, increment the refCount rootEntry.refCount += 1; } else { // Otherwise create a new entry for the operation this._roots.set(id, { operation, refCount: 1, epoch: null, fetchTime: null, }); } return {dispose}; } lookup(selector: SingularReaderSelector): Snapshot { const source = this.getSource(); const snapshot = RelayReader.read(source, selector, this._resolverCache); if (__DEV__) { deepFreeze(snapshot); } return snapshot; } // This method will return a list of updated owners from the subscriptions notify( sourceOperation?: OperationDescriptor, invalidateStore?: boolean, ): $ReadOnlyArray<RequestDescriptor> { const log = this.__log; if (log != null) { log({ name: 'store.notify.start', sourceOperation, }); } // Increment the current write when notifying after executing // a set of changes to the store. this._currentWriteEpoch++; if (invalidateStore === true) { this._globalInvalidationEpoch = this._currentWriteEpoch; } if (RelayFeatureFlags.ENABLE_RELAY_RESOLVERS) { // When a record is updated, we need to also handle records that depend on it, // specifically Relay Resolver result records containing results based on the // updated records. This both adds to updatedRecordIDs and invalidates any // cached data as needed. this._resolverCache.invalidateDataIDs(this._updatedRecordIDs); } const source = this.getSource(); const updatedOwners = []; this._storeSubscriptions.updateSubscriptions( source, this._updatedRecordIDs, updatedOwners, sourceOperation, ); this._invalidationSubscriptions.forEach(subscription => { this._updateInvalidationSubscription( subscription, invalidateStore === true, ); }); if (log != null) { log({ name: 'store.notify.complete', sourceOperation, updatedRecordIDs: this._updatedRecordIDs, invalidatedRecordIDs: this._invalidatedRecordIDs, }); } this._updatedRecordIDs.clear(); this._invalidatedRecordIDs.clear(); // If a source operation was provided (indicating the operation // that produced this update to the store), record the current epoch // at which this operation was written. if (sourceOperation != null) { // We only track the epoch at which the operation was written if // it was previously retained, to keep the size of our operation // epoch map bounded. If a query wasn't retained, we assume it can // may be deleted at any moment and thus is not relevant for us to track // for the purposes of invalidation. const id = sourceOperation.request.identifier; const rootEntry = this._roots.get(id); if (rootEntry != null) { rootEntry.epoch = this._currentWriteEpoch; rootEntry.fetchTime = Date.now(); } else if ( sourceOperation.request.node.params.operationKind === 'query' && this._gcReleaseBufferSize > 0 && this._releaseBuffer.length < this._gcReleaseBufferSize ) { // The operation isn't retained but there is space in the release buffer: // temporarily track this operation in case the data can be reused soon. const temporaryRootEntry = { operation: sourceOperation, refCount: 0, epoch: this._currentWriteEpoch, fetchTime: Date.now(), }; this._releaseBuffer.push(id); this._roots.set(id, temporaryRootEntry); } } return updatedOwners; } publish(source: RecordSource, idsMarkedForInvalidation?: DataIDSet): void { const target = this._getMutableRecordSource(); updateTargetFromSource( target, source, // We increment the current epoch at the end of the set of updates, // in notify(). Here, we pass what will be the incremented value of // the epoch to use to write to invalidated records. this._currentWriteEpoch + 1, idsMarkedForInvalidation, this._updatedRecordIDs, this._invalidatedRecordIDs, ); // NOTE: log *after* processing the source so that even if a bad log function // mutates the source, it doesn't affect Relay processing of it. const log = this.__log; if (log != null) { log({ name: 'store.publish', source, optimistic: target === this._optimisticSource, }); } } subscribe( snapshot: Snapshot, callback: (snapshot: Snapshot) => void, ): Disposable { return this._storeSubscriptions.subscribe(snapshot, callback); } holdGC(): Disposable { if (this._gcRun) { this._gcRun = null; this._shouldScheduleGC = true; } this._gcHoldCounter++; const dispose = () => { if (this._gcHoldCounter > 0) { this._gcHoldCounter--; if (this._gcHoldCounter === 0 && this._shouldScheduleGC) { this.scheduleGC(); this._shouldScheduleGC = false; } } }; return {dispose}; } toJSON(): mixed { return 'LiveResolverStore()'; } getEpoch(): number { return this._currentWriteEpoch; } // Internal API __getUpdatedRecordIDs(): DataIDSet { return this._updatedRecordIDs; } // HACK // This should return an InvalidationState, but that's an opaque type that only the // real RelayModernStore can create. For now we just use any. // $FlowFixMe lookupInvalidationState(dataIDs: $ReadOnlyArray<DataID>): any { const invalidations = new Map(); dataIDs.forEach(dataID => { const record = this.getSource().get(dataID); invalidations.set( dataID, RelayModernRecord.getInvalidationEpoch(record) ?? null, ); }); invalidations.set('global', this._globalInvalidationEpoch); const invalidationState: InvalidationState = { dataIDs, invalidations, }; return invalidationState; } // HACK // This should accept an InvalidationState, but that's an opaque type that only the // real RelayModernStore can create. For now we just use any. // $FlowFixMe checkInvalidationState(prevInvalidationState: InvalidationState): boolean { const latestInvalidationState = this.lookupInvalidationState( prevInvalidationState.dataIDs, ); const currentInvalidations = latestInvalidationState.invalidations; const prevInvalidations = prevInvalidationState.invalidations; // Check if global invalidation has changed if ( currentInvalidations.get('global') !== prevInvalidations.get('global') ) { return true; } // Check if the invalidation state for any of the ids has changed. for (const dataID of prevInvalidationState.dataIDs) { if (currentInvalidations.get(dataID) !== prevInvalidations.get(dataID)) { return true; } } return false; } // HACK // This should accept an InvalidationState, but that's an opaque type that only the // real RelayModernStore can create. For now we just use any. subscribeToInvalidationState( // $FlowFixMe invalidationState: InvalidationState, callback: () => void, ): Disposable { const subscription = {callback, invalidationState}; const dispose = () => { this._invalidationSubscriptions.delete(subscription); }; this._invalidationSubscriptions.add(subscription); return {dispose}; } _updateInvalidationSubscription( subscription: InvalidationSubscription, invalidatedStore: boolean, ) { const {callback, invalidationState} = subscription; const {dataIDs} = invalidationState; const isSubscribedToInvalidatedIDs = invalidatedStore || dataIDs.some(dataID => this._invalidatedRecordIDs.has(dataID)); if (!isSubscribedToInvalidatedIDs) { return; } callback(); } snapshot(): void { invariant( this._optimisticSource == null, 'LiveResolverStore: Unexpected call to snapshot() while a previous ' + 'snapshot exists.', ); const log = this.__log; if (log != null) { log({ name: 'store.snapshot', }); } this._storeSubscriptions.snapshotSubscriptions(this.getSource()); if (this._gcRun) { this._gcRun = null; this._shouldScheduleGC = true; } this._optimisticSource = RelayOptimisticRecordSource.create( this.getSource(), ); } restore(): void { invariant( this._optimisticSource != null, 'LiveResolverStore: Unexpected call to restore(), expected a snapshot ' + 'to exist (make sure to call snapshot()).', ); const log = this.__log; if (log != null) { log({ name: 'store.restore', }); } this._optimisticSource = null; if (this._shouldScheduleGC) { this.scheduleGC(); } this._storeSubscriptions.restoreSubscriptions(); } scheduleGC() { if (this._gcHoldCounter > 0) { this._shouldScheduleGC = true; return; } if (this._gcRun) { return; } this._gcRun = this._collect(); this._gcScheduler(this._gcStep); } /** * Run a full GC synchronously. */ __gc(): void { // Don't run GC while there are optimistic updates applied if (this._optimisticSource != null) { return; } const gcRun = this._collect(); while (!gcRun.next().done) {} } _gcStep = () => { if (this._gcRun) { if (this._gcRun.next().done) { this._gcRun = null; } else { this._gcScheduler(this._gcStep); } } }; *_collect(): Generator<void, void, void> { /* eslint-disable no-labels */ top: while (true) { const startEpoch = this._currentWriteEpoch; const references = new Set(); // Mark all records that are traversable from a root for (const {operation} of this._roots.values()) { const selector = operation.root; RelayReferenceMarker.mark( this._recordSource, selector, references, this._operationLoader, this._shouldProcessClientComponents, ); // Yield for other work after each operation yield; // If the store was updated, restart if (startEpoch !== this._currentWriteEpoch) { continue top; } } const log = this.__log; if (log != null) { log({ name: 'store.gc', references, }); } // Sweep records without references if (references.size === 0) { // Short-circuit if *nothing* is referenced this._recordSource.clear(); } else { // Evict any unreferenced nodes const storeIDs = this._recordSource.getRecordIDs(); for (let ii = 0; ii < storeIDs.length; ii++) { const dataID = storeIDs[ii]; if (!references.has(dataID)) { this._recordSource.remove(dataID); } } } return; } } } function initializeRecordSource(target: MutableRecordSource) { if (!target.has(ROOT_ID)) { const rootRecord = RelayModernRecord.create(ROOT_ID, ROOT_TYPE); target.set(ROOT_ID, rootRecord); } } /** * Updates the target with information from source, also updating a mapping of * which records in the target were changed as a result. * Additionally, will mark records as invalidated at the current write epoch * given the set of record ids marked as stale in this update. */ function updateTargetFromSource( target: MutableRecordSource, source: RecordSource, currentWriteEpoch: number, idsMarkedForInvalidation: ?DataIDSet, updatedRecordIDs: DataIDSet, invalidatedRecordIDs: DataIDSet, ): void { // First, update any records that were marked for invalidation. // For each provided dataID that was invalidated, we write the // INVALIDATED_AT_KEY on the record, indicating // the epoch at which the record was invalidated. if (idsMarkedForInvalidation) { idsMarkedForInvalidation.forEach(dataID => { const targetRecord = target.get(dataID); const sourceRecord = source.get(dataID); // If record was deleted during the update (and also invalidated), // we don't need to count it as an invalidated id if (sourceRecord === null) { return; } let nextRecord; if (targetRecord != null) { // If the target record exists, use it to set the epoch // at which it was invalidated. This record will be updated with // any changes from source in the section below // where we update the target records based on the source. nextRecord = RelayModernRecord.clone(targetRecord); } else { // If the target record doesn't exist, it means that a new record // in the source was created (and also invalidated), so we use that // record to set the epoch at which it was invalidated. This record // will be updated with any changes from source in the section below // where we update the target records based on the source. nextRecord = sourceRecord != null ? RelayModernRecord.clone(sourceRecord) : null; } if (!nextRecord) { return; } RelayModernRecord.setValue( nextRecord, RelayStoreUtils.INVALIDATED_AT_KEY, currentWriteEpoch, ); invalidatedRecordIDs.add(dataID); target.set(dataID, nextRecord); }); } // Update the target based on the changes present in source const dataIDs = source.getRecordIDs(); for (let ii = 0; ii < dataIDs.length; ii++) { const dataID = dataIDs[ii]; const sourceRecord = source.get(dataID); const targetRecord = target.get(dataID); // Prevent mutation of a record from outside the store. if (__DEV__) { if (sourceRecord) { RelayModernRecord.freeze(sourceRecord); } } if (sourceRecord && targetRecord) { // ReactFlightClientResponses are lazy and only materialize when readRoot // is called when we read the field, so if the record is a Flight field // we always use the new record's data regardless of whether // it actually changed. Let React take care of reconciliation instead. const nextRecord = RelayModernRecord.getType(targetRecord) === RelayStoreReactFlightUtils.REACT_FLIGHT_TYPE_NAME ? sourceRecord : RelayModernRecord.update(targetRecord, sourceRecord); if (nextRecord !== targetRecord) { // Prevent mutation of a record from outside the store. if (__DEV__) { RelayModernRecord.freeze(nextRecord); } updatedRecordIDs.add(dataID); target.set(dataID, nextRecord); } } else if (sourceRecord === null) { target.delete(dataID); if (targetRecord !== null) { updatedRecordIDs.add(dataID); } } else if (sourceRecord) { target.set(dataID, sourceRecord); updatedRecordIDs.add(dataID); } // don't add explicit undefined } } /** * Returns an OperationAvailability given the Availability returned * by checking an operation, and when that operation was last written to the store. * Specifically, the provided Availability of an operation will contain the * value of when a record referenced by the operation was most recently * invalidated; given that value, and given when this operation was last * written to the store, this function will return the overall * OperationAvailability for the operation. */ function getAvailabilityStatus( operationAvailability: Availability, operationLastWrittenAt: ?number, operationFetchTime: ?number, queryCacheExpirationTime: ?number, ): OperationAvailability { const {mostRecentlyInvalidatedAt, status} = operationAvailability; if (typeof mostRecentlyInvalidatedAt === 'number') { // If some record referenced by this operation is stale, then the operation itself is stale // if either the operation itself was never written *or* the operation was last written // before the most recent invalidation of its reachable records. if ( operationLastWrittenAt == null || mostRecentlyInvalidatedAt > operationLastWrittenAt ) { return {status: 'stale'}; } } if (status === 'missing') { return {status: 'missing'}; } if (operationFetchTime != null && queryCacheExpirationTime != null) { const isStale = operationFetchTime <= Date.now() - queryCacheExpirationTime; if (isStale) { return {status: 'stale'}; } } // There were no invalidations of any reachable records *or* the operation is known to have // been fetched after the most recent record invalidation. return {status: 'available', fetchTime: operationFetchTime ?? null}; } module.exports = LiveResolverStore;