in packages/datastore/src/sync/index.ts [164:433]
start(params: StartParams) {
return new Observable<ControlMessageType<ControlMessage>>(observer => {
logger.log('starting sync engine...');
let subscriptions: ZenObservable.Subscription[] = [];
(async () => {
try {
await this.setupModels(params);
} catch (err) {
observer.error(err);
return;
}
const startPromise = new Promise(resolve => {
this.datastoreConnectivity.status().subscribe(async ({ online }) => {
// From offline to online
if (online && !this.online) {
this.online = online;
observer.next({
type: ControlMessage.SYNC_ENGINE_NETWORK_STATUS,
data: {
active: this.online,
},
});
let ctlSubsObservable: Observable<CONTROL_MSG>;
let dataSubsObservable: Observable<[
TransformerMutationType,
SchemaModel,
PersistentModel
]>;
if (isNode) {
logger.warn(
'Realtime disabled when in a server-side environment'
);
} else {
//#region GraphQL Subscriptions
[
// const ctlObservable: Observable<CONTROL_MSG>
ctlSubsObservable,
// const dataObservable: Observable<[TransformerMutationType, SchemaModel, Readonly<{
// id: string;
// } & Record<string, any>>]>
dataSubsObservable,
] = this.subscriptionsProcessor.start();
try {
await new Promise((resolve, reject) => {
const ctlSubsSubscription = ctlSubsObservable.subscribe({
next: msg => {
if (msg === CONTROL_MSG.CONNECTED) {
resolve();
}
},
error: err => {
reject(err);
const handleDisconnect = this.disconnectionHandler();
handleDisconnect(err);
},
});
subscriptions.push(ctlSubsSubscription);
});
} catch (err) {
observer.error(err);
return;
}
logger.log('Realtime ready');
observer.next({
type: ControlMessage.SYNC_ENGINE_SUBSCRIPTIONS_ESTABLISHED,
});
//#endregion
}
//#region Base & Sync queries
try {
await new Promise((resolve, reject) => {
const syncQuerySubscription = this.syncQueriesObservable().subscribe(
{
next: message => {
const { type } = message;
if (
type === ControlMessage.SYNC_ENGINE_SYNC_QUERIES_READY
) {
resolve();
}
observer.next(message);
},
complete: () => {
resolve();
},
error: error => {
reject(error);
},
}
);
if (syncQuerySubscription) {
subscriptions.push(syncQuerySubscription);
}
});
} catch (error) {
observer.error(error);
return;
}
//#endregion
//#region process mutations
subscriptions.push(
this.mutationsProcessor
.start()
.subscribe(({ modelDefinition, model: item, hasMore }) => {
const modelConstructor = this.userModelClasses[
modelDefinition.name
] as PersistentModelConstructor<any>;
const model = this.modelInstanceCreator(
modelConstructor,
item
);
this.storage.runExclusive(storage =>
this.modelMerger.merge(storage, model)
);
observer.next({
type:
ControlMessage.SYNC_ENGINE_OUTBOX_MUTATION_PROCESSED,
data: {
model: modelConstructor,
element: model,
},
});
observer.next({
type: ControlMessage.SYNC_ENGINE_OUTBOX_STATUS,
data: {
isEmpty: !hasMore,
},
});
})
);
//#endregion
//#region Merge subscriptions buffer
// TODO: extract to function
if (!isNode) {
subscriptions.push(
dataSubsObservable.subscribe(
([_transformerMutationType, modelDefinition, item]) => {
const modelConstructor = this.userModelClasses[
modelDefinition.name
] as PersistentModelConstructor<any>;
const model = this.modelInstanceCreator(
modelConstructor,
item
);
this.storage.runExclusive(storage =>
this.modelMerger.merge(storage, model)
);
}
)
);
}
//#endregion
} else if (!online) {
this.online = online;
observer.next({
type: ControlMessage.SYNC_ENGINE_NETWORK_STATUS,
data: {
active: this.online,
},
});
subscriptions.forEach(sub => sub.unsubscribe());
subscriptions = [];
}
resolve();
});
});
this.storage
.observe(null, null, ownSymbol)
.filter(({ model }) => {
const modelDefinition = this.getModelDefinition(model);
return modelDefinition.syncable === true;
})
.subscribe({
next: async ({ opType, model, element, condition }) => {
const namespace = this.schema.namespaces[
this.namespaceResolver(model)
];
const MutationEventConstructor = this.modelClasses[
'MutationEvent'
] as PersistentModelConstructor<MutationEvent>;
const graphQLCondition = predicateToGraphQLCondition(condition);
const mutationEvent = createMutationInstanceFromModelOperation(
namespace.relationships,
this.getModelDefinition(model),
opType,
model,
element,
graphQLCondition,
MutationEventConstructor,
this.modelInstanceCreator
);
await this.outbox.enqueue(this.storage, mutationEvent);
observer.next({
type: ControlMessage.SYNC_ENGINE_OUTBOX_MUTATION_ENQUEUED,
data: {
model,
element,
},
});
observer.next({
type: ControlMessage.SYNC_ENGINE_OUTBOX_STATUS,
data: {
isEmpty: false,
},
});
await startPromise;
if (this.online) {
this.mutationsProcessor.resume();
}
},
});
observer.next({
type: ControlMessage.SYNC_ENGINE_STORAGE_SUBSCRIBED,
});
const hasMutationsInOutbox =
(await this.outbox.peek(this.storage)) === undefined;
observer.next({
type: ControlMessage.SYNC_ENGINE_OUTBOX_STATUS,
data: {
isEmpty: hasMutationsInOutbox,
},
});
await startPromise;
observer.next({
type: ControlMessage.SYNC_ENGINE_READY,
});
})();
return () => {
subscriptions.forEach(sub => sub.unsubscribe());
};
});
}