in src/watch.ts [208:247]
private establishStream() {
if (this.state !== State.Idle) {
throw new ClientRuntimeError('Cannot call establishStream() if state != Idle');
}
// possible we reconnect and watchers are removed in the meantime
if (this.watchers.length === 0) {
return;
}
// clear anyone who is in the process of closing, we won't re-add them
this.expectedClosers.forEach(watcher => {
this.watchers = this.watchers.filter(w => w !== watcher);
watcher.emit('end');
});
this.expectedClosers.clear();
this.state = State.Connecting;
this.client
.watch()
.then(stream => {
this.state = State.Connected;
this.queue = new AttachQueue(stream);
this.stream = stream
.on('data', res =>
res.created ? this.handleCreatedResponse(res) : this.handleResponse(res),
)
.on('error', err => this.handleError(err))
.on('end', () => this.handleError(new EtcdWatchStreamEnded()));
// possible watchers are remove while we're connecting.
if (this.watchers.length === 0) {
return this.destroyStream();
}
this.queue!.attach(this.watchers);
})
.catch(err => this.handleError(err));
}