in src/election.ts [109:175]
private async loop() {
// @see https://github.com/etcd-io/etcd/blob/28d1af294e4394df1ed967a4ac4fbaf437be3463/client/v3/concurrency/election.go#L177
while (this.running) {
const allKeys = await this.namespace.getAll().sort('Create', 'Ascend').limit(1).exec();
let leader: IKeyValue | undefined = allKeys.kvs[0];
let revision = allKeys.header.revision;
if (!this.running) {
return; // if closed when doing async work
}
if (!leader) {
this.setLeader(undefined);
const watcher = this.namespace
.watch()
.startRevision(allKeys.header.revision)
.prefix('')
.only('put')
.watcher();
await new Promise<void>((resolve, reject) => {
watcher.on('data', data => {
let done = false;
for (const event of data.events) {
if (event.type === 'Put') {
leader = event.kv;
revision = event.kv.mod_revision;
done = true;
}
}
if (done) {
resolve();
}
});
watcher.on('error', reject);
this.disposer = resolve;
}).finally(() => watcher.cancel());
if (!this.running) {
return; // if closed when doing async work
}
}
if (!leader) {
throw new ClientRuntimeError('unreachable lack of election leader');
}
this.setLeader(leader);
const watcher = this.namespace
.watch()
.startRevision(new BigNumber(revision).plus(1).toString())
.key(leader.key)
.watcher();
await new Promise<void>((resolve, reject) => {
watcher!.on('put', kv => this.setLeader(kv));
watcher!.on('delete', () => resolve());
watcher!.on('error', reject);
this.disposer = () => {
resolve();
return watcher.cancel();
};
}).finally(() => watcher.cancel());
}
}