private establishStream()

in src/watch.ts [208:247]


  private establishStream() {
    if (this.state !== State.Idle) {
      throw new ClientRuntimeError('Cannot call establishStream() if state != Idle');
    }

    // possible we reconnect and watchers are removed in the meantime
    if (this.watchers.length === 0) {
      return;
    }

    // clear anyone who is in the process of closing, we won't re-add them
    this.expectedClosers.forEach(watcher => {
      this.watchers = this.watchers.filter(w => w !== watcher);
      watcher.emit('end');
    });
    this.expectedClosers.clear();

    this.state = State.Connecting;

    this.client
      .watch()
      .then(stream => {
        this.state = State.Connected;
        this.queue = new AttachQueue(stream);
        this.stream = stream
          .on('data', res =>
            res.created ? this.handleCreatedResponse(res) : this.handleResponse(res),
          )
          .on('error', err => this.handleError(err))
          .on('end', () => this.handleError(new EtcdWatchStreamEnded()));

        // possible watchers are remove while we're connecting.
        if (this.watchers.length === 0) {
          return this.destroyStream();
        }

        this.queue!.attach(this.watchers);
      })
      .catch(err => this.handleError(err));
  }