start()

in packages/datastore/src/sync/index.ts [164:433]


	start(params: StartParams) {
		return new Observable<ControlMessageType<ControlMessage>>(observer => {
			logger.log('starting sync engine...');

			let subscriptions: ZenObservable.Subscription[] = [];

			(async () => {
				try {
					await this.setupModels(params);
				} catch (err) {
					observer.error(err);
					return;
				}

				const startPromise = new Promise(resolve => {
					this.datastoreConnectivity.status().subscribe(async ({ online }) => {
						// From offline to online
						if (online && !this.online) {
							this.online = online;

							observer.next({
								type: ControlMessage.SYNC_ENGINE_NETWORK_STATUS,
								data: {
									active: this.online,
								},
							});

							let ctlSubsObservable: Observable<CONTROL_MSG>;
							let dataSubsObservable: Observable<[
								TransformerMutationType,
								SchemaModel,
								PersistentModel
							]>;

							if (isNode) {
								logger.warn(
									'Realtime disabled when in a server-side environment'
								);
							} else {
								//#region GraphQL Subscriptions
								[
									// const ctlObservable: Observable<CONTROL_MSG>
									ctlSubsObservable,
									// const dataObservable: Observable<[TransformerMutationType, SchemaModel, Readonly<{
									// id: string;
									// } & Record<string, any>>]>
									dataSubsObservable,
								] = this.subscriptionsProcessor.start();

								try {
									await new Promise((resolve, reject) => {
										const ctlSubsSubscription = ctlSubsObservable.subscribe({
											next: msg => {
												if (msg === CONTROL_MSG.CONNECTED) {
													resolve();
												}
											},
											error: err => {
												reject(err);
												const handleDisconnect = this.disconnectionHandler();
												handleDisconnect(err);
											},
										});

										subscriptions.push(ctlSubsSubscription);
									});
								} catch (err) {
									observer.error(err);
									return;
								}

								logger.log('Realtime ready');

								observer.next({
									type: ControlMessage.SYNC_ENGINE_SUBSCRIPTIONS_ESTABLISHED,
								});

								//#endregion
							}

							//#region Base & Sync queries
							try {
								await new Promise((resolve, reject) => {
									const syncQuerySubscription = this.syncQueriesObservable().subscribe(
										{
											next: message => {
												const { type } = message;

												if (
													type === ControlMessage.SYNC_ENGINE_SYNC_QUERIES_READY
												) {
													resolve();
												}

												observer.next(message);
											},
											complete: () => {
												resolve();
											},
											error: error => {
												reject(error);
											},
										}
									);

									if (syncQuerySubscription) {
										subscriptions.push(syncQuerySubscription);
									}
								});
							} catch (error) {
								observer.error(error);
								return;
							}
							//#endregion

							//#region process mutations
							subscriptions.push(
								this.mutationsProcessor
									.start()
									.subscribe(({ modelDefinition, model: item, hasMore }) => {
										const modelConstructor = this.userModelClasses[
											modelDefinition.name
										] as PersistentModelConstructor<any>;

										const model = this.modelInstanceCreator(
											modelConstructor,
											item
										);

										this.storage.runExclusive(storage =>
											this.modelMerger.merge(storage, model)
										);

										observer.next({
											type:
												ControlMessage.SYNC_ENGINE_OUTBOX_MUTATION_PROCESSED,
											data: {
												model: modelConstructor,
												element: model,
											},
										});

										observer.next({
											type: ControlMessage.SYNC_ENGINE_OUTBOX_STATUS,
											data: {
												isEmpty: !hasMore,
											},
										});
									})
							);
							//#endregion

							//#region Merge subscriptions buffer
							// TODO: extract to function
							if (!isNode) {
								subscriptions.push(
									dataSubsObservable.subscribe(
										([_transformerMutationType, modelDefinition, item]) => {
											const modelConstructor = this.userModelClasses[
												modelDefinition.name
											] as PersistentModelConstructor<any>;

											const model = this.modelInstanceCreator(
												modelConstructor,
												item
											);

											this.storage.runExclusive(storage =>
												this.modelMerger.merge(storage, model)
											);
										}
									)
								);
							}
							//#endregion
						} else if (!online) {
							this.online = online;

							observer.next({
								type: ControlMessage.SYNC_ENGINE_NETWORK_STATUS,
								data: {
									active: this.online,
								},
							});

							subscriptions.forEach(sub => sub.unsubscribe());
							subscriptions = [];
						}

						resolve();
					});
				});

				this.storage
					.observe(null, null, ownSymbol)
					.filter(({ model }) => {
						const modelDefinition = this.getModelDefinition(model);

						return modelDefinition.syncable === true;
					})
					.subscribe({
						next: async ({ opType, model, element, condition }) => {
							const namespace = this.schema.namespaces[
								this.namespaceResolver(model)
							];
							const MutationEventConstructor = this.modelClasses[
								'MutationEvent'
							] as PersistentModelConstructor<MutationEvent>;
							const graphQLCondition = predicateToGraphQLCondition(condition);
							const mutationEvent = createMutationInstanceFromModelOperation(
								namespace.relationships,
								this.getModelDefinition(model),
								opType,
								model,
								element,
								graphQLCondition,
								MutationEventConstructor,
								this.modelInstanceCreator
							);

							await this.outbox.enqueue(this.storage, mutationEvent);

							observer.next({
								type: ControlMessage.SYNC_ENGINE_OUTBOX_MUTATION_ENQUEUED,
								data: {
									model,
									element,
								},
							});

							observer.next({
								type: ControlMessage.SYNC_ENGINE_OUTBOX_STATUS,
								data: {
									isEmpty: false,
								},
							});

							await startPromise;

							if (this.online) {
								this.mutationsProcessor.resume();
							}
						},
					});

				observer.next({
					type: ControlMessage.SYNC_ENGINE_STORAGE_SUBSCRIBED,
				});

				const hasMutationsInOutbox =
					(await this.outbox.peek(this.storage)) === undefined;
				observer.next({
					type: ControlMessage.SYNC_ENGINE_OUTBOX_STATUS,
					data: {
						isEmpty: hasMutationsInOutbox,
					},
				});

				await startPromise;

				observer.next({
					type: ControlMessage.SYNC_ENGINE_READY,
				});
			})();

			return () => {
				subscriptions.forEach(sub => sub.unsubscribe());
			};
		});
	}