in packages/datastore/src/sync/index.ts [502:649]
next: async ({
namespace,
modelDefinition,
items,
done,
startedAt,
isFullSync,
}) => {
const modelConstructor = this.userModelClasses[
modelDefinition.name
] as PersistentModelConstructor<any>;
if (!count.has(modelConstructor)) {
count.set(modelConstructor, {
new: 0,
updated: 0,
deleted: 0,
});
start = getNow();
newestStartedAt =
newestStartedAt === undefined
? startedAt
: Math.max(newestStartedAt, startedAt);
}
/**
* If there are mutations in the outbox for a given id, those need to be
* merged individually. Otherwise, we can merge them in batches.
*/
await this.storage.runExclusive(async storage => {
const idsInOutbox = await this.outbox.getModelIds(storage);
const oneByOne: ModelInstanceMetadata[] = [];
const page = items.filter(item => {
if (!idsInOutbox.has(item.id)) {
return true;
}
oneByOne.push(item);
return false;
});
const opTypeCount: [any, OpType][] = [];
for (const item of oneByOne) {
const opType = await this.modelMerger.merge(
storage,
item
);
if (opType !== undefined) {
opTypeCount.push([item, opType]);
}
}
opTypeCount.push(
...(await this.modelMerger.mergePage(
storage,
modelConstructor,
page
))
);
const counts = count.get(modelConstructor);
opTypeCount.forEach(([, opType]) => {
switch (opType) {
case OpType.INSERT:
counts.new++;
break;
case OpType.UPDATE:
counts.updated++;
break;
case OpType.DELETE:
counts.deleted++;
break;
default:
exhaustiveCheck(opType);
}
});
});
if (done) {
const { name: modelName } = modelDefinition;
//#region update last sync for type
let modelMetadata = await this.getModelMetadata(
namespace,
modelName
);
const { lastFullSync, fullSyncInterval } = modelMetadata;
theInterval = fullSyncInterval;
newestFullSyncStartedAt =
newestFullSyncStartedAt === undefined
? lastFullSync
: Math.max(
newestFullSyncStartedAt,
isFullSync ? startedAt : lastFullSync
);
modelMetadata = (this.modelClasses
.ModelMetadata as PersistentModelConstructor<any>).copyOf(
modelMetadata,
draft => {
draft.lastSync = startedAt;
draft.lastFullSync = isFullSync
? startedAt
: modelMetadata.lastFullSync;
}
);
await this.storage.save(
modelMetadata,
undefined,
ownSymbol
);
//#endregion
const counts = count.get(modelConstructor);
this.modelSyncedStatus.set(modelConstructor, true);
observer.next({
type: ControlMessage.SYNC_ENGINE_MODEL_SYNCED,
data: {
model: modelConstructor,
isFullSync,
isDeltaSync: !isFullSync,
counts,
},
});
paginatingModels.delete(modelDefinition);
if (paginatingModels.size === 0) {
duration = getNow() - start;
resolve();
observer.next({
type: ControlMessage.SYNC_ENGINE_SYNC_QUERIES_READY,
});
syncQueriesSubscription.unsubscribe();
}
}
},