next: async()

in packages/datastore/src/sync/index.ts [502:649]


								next: async ({
									namespace,
									modelDefinition,
									items,
									done,
									startedAt,
									isFullSync,
								}) => {
									const modelConstructor = this.userModelClasses[
										modelDefinition.name
									] as PersistentModelConstructor<any>;

									if (!count.has(modelConstructor)) {
										count.set(modelConstructor, {
											new: 0,
											updated: 0,
											deleted: 0,
										});

										start = getNow();
										newestStartedAt =
											newestStartedAt === undefined
												? startedAt
												: Math.max(newestStartedAt, startedAt);
									}

									/**
									 * If there are mutations in the outbox for a given id, those need to be
									 * merged individually. Otherwise, we can merge them in batches.
									 */
									await this.storage.runExclusive(async storage => {
										const idsInOutbox = await this.outbox.getModelIds(storage);

										const oneByOne: ModelInstanceMetadata[] = [];
										const page = items.filter(item => {
											if (!idsInOutbox.has(item.id)) {
												return true;
											}

											oneByOne.push(item);
											return false;
										});

										const opTypeCount: [any, OpType][] = [];

										for (const item of oneByOne) {
											const opType = await this.modelMerger.merge(
												storage,
												item
											);

											if (opType !== undefined) {
												opTypeCount.push([item, opType]);
											}
										}

										opTypeCount.push(
											...(await this.modelMerger.mergePage(
												storage,
												modelConstructor,
												page
											))
										);

										const counts = count.get(modelConstructor);

										opTypeCount.forEach(([, opType]) => {
											switch (opType) {
												case OpType.INSERT:
													counts.new++;
													break;
												case OpType.UPDATE:
													counts.updated++;
													break;
												case OpType.DELETE:
													counts.deleted++;
													break;
												default:
													exhaustiveCheck(opType);
											}
										});
									});

									if (done) {
										const { name: modelName } = modelDefinition;

										//#region update last sync for type
										let modelMetadata = await this.getModelMetadata(
											namespace,
											modelName
										);

										const { lastFullSync, fullSyncInterval } = modelMetadata;

										theInterval = fullSyncInterval;

										newestFullSyncStartedAt =
											newestFullSyncStartedAt === undefined
												? lastFullSync
												: Math.max(
														newestFullSyncStartedAt,
														isFullSync ? startedAt : lastFullSync
												  );

										modelMetadata = (this.modelClasses
											.ModelMetadata as PersistentModelConstructor<any>).copyOf(
											modelMetadata,
											draft => {
												draft.lastSync = startedAt;
												draft.lastFullSync = isFullSync
													? startedAt
													: modelMetadata.lastFullSync;
											}
										);

										await this.storage.save(
											modelMetadata,
											undefined,
											ownSymbol
										);
										//#endregion

										const counts = count.get(modelConstructor);

										this.modelSyncedStatus.set(modelConstructor, true);

										observer.next({
											type: ControlMessage.SYNC_ENGINE_MODEL_SYNCED,
											data: {
												model: modelConstructor,
												isFullSync,
												isDeltaSync: !isFullSync,
												counts,
											},
										});

										paginatingModels.delete(modelDefinition);

										if (paginatingModels.size === 0) {
											duration = getNow() - start;
											resolve();
											observer.next({
												type: ControlMessage.SYNC_ENGINE_SYNC_QUERIES_READY,
											});
											syncQueriesSubscription.unsubscribe();
										}
									}
								},