in src/backend/util/Observable.js [506:632]
function subscribe<T>(
source: Source<T>,
observer: Observer<T> | Sink<T>
): Subscription {
let closed = false;
let cleanup;
// Ideally we would simply describe a `get closed()` method on the Sink and
// Subscription objects below, however not all flow environments we expect
// Relay to be used within will support property getters, and many minifier
// tools still do not support ES5 syntax. Instead, we can use defineProperty.
const withClosed: <O>(obj: O) => {| ...O, +closed: boolean |} = (obj =>
Object.defineProperty(obj, 'closed', ({ get: () => closed }: any)): any);
function doCleanup() {
if (cleanup) {
if (cleanup.unsubscribe) {
cleanup.unsubscribe();
} else {
try {
cleanup();
} catch (error) {
hostReportError(error, true /* isUncaughtThrownError */);
}
}
cleanup = undefined;
}
}
// Create a Subscription.
const subscription: Subscription = withClosed({
unsubscribe() {
if (!closed) {
closed = true;
// Tell Observer that unsubscribe was called.
try {
observer.unsubscribe && observer.unsubscribe(subscription);
} catch (error) {
hostReportError(error, true /* isUncaughtThrownError */);
} finally {
doCleanup();
}
}
},
});
// Tell Observer that observation is about to begin.
try {
observer.start && observer.start(subscription);
} catch (error) {
hostReportError(error, true /* isUncaughtThrownError */);
}
// If closed already, don't bother creating a Sink.
if (closed) {
return subscription;
}
// Create a Sink respecting subscription state and cleanup.
const sink: Sink<T> = withClosed({
next(value) {
if (!closed && observer.next) {
try {
observer.next(value);
} catch (error) {
hostReportError(error, true /* isUncaughtThrownError */);
}
}
},
error(error, isUncaughtThrownError) {
if (closed || !observer.error) {
closed = true;
hostReportError(error, isUncaughtThrownError || false);
doCleanup();
} else {
closed = true;
try {
observer.error(error);
} catch (error2) {
hostReportError(error2, true /* isUncaughtThrownError */);
} finally {
doCleanup();
}
}
},
complete() {
if (!closed) {
closed = true;
try {
observer.complete && observer.complete();
} catch (error) {
hostReportError(error, true /* isUncaughtThrownError */);
} finally {
doCleanup();
}
}
},
});
// If anything goes wrong during observing the source, handle the error.
try {
cleanup = source(sink);
} catch (error) {
sink.error(error, true /* isUncaughtThrownError */);
}
if (__DEV__) {
// Early runtime errors for ill-formed returned cleanup.
if (
cleanup !== undefined &&
typeof cleanup !== 'function' &&
(!cleanup || typeof cleanup.unsubscribe !== 'function')
) {
throw new Error(
'Returned cleanup function which cannot be called: ' + String(cleanup)
);
}
}
// If closed before the source function existed, cleanup now.
if (closed) {
doCleanup();
}
return subscription;
}