in lib/eventstream_rpc.ts [918:958]
async activate() : Promise<ResponseType> {
return new Promise<ResponseType>(async (resolve, reject) => {
try {
let stream : eventstream.ClientStream = this.operation.getStream();
stream.addListener(eventstream.ClientStream.MESSAGE, this._onStreamMessageEvent.bind(this));
stream.addListener(eventstream.ClientStream.ENDED, this._onStreamEndedEvent.bind(this));
let responsePromise : Promise<eventstream.Message> = cancel.newCancellablePromiseFromNextEvent({
cancelController: this.operationConfig.options.cancelController,
emitter : stream,
eventName : eventstream.ClientStream.MESSAGE,
eventDataTransformer: (eventData: any) => {
this.responseHandled = true;
return (eventData as eventstream.MessageEvent).message;
},
cancelMessage: "Eventstream execute() cancelled by user request"
});
let requestMessage: eventstream.Message = serializeRequest(this.serviceModel, this.operationConfig.name, this.request);
await this.operation.activate(requestMessage);
let message : eventstream.Message = await responsePromise;
let response : ResponseType = deserializeResponse(this.serviceModel, this.operationConfig.name, message);
// If the server terminated the stream, then set the operation to be ended immediately
if ((message.flags ?? 0) & eventstream.MessageFlags.TerminateStream) {
this.operation.setStateEnded();
// Server hung up on us. Immediately cleanup the operation state.
// Do this before resolving the promise so that any user-initiated
// requests will see the correct state, which is that the operation is closed.
await this.close();
}
resolve(response);
} catch (e) {
await this.close();
reject(e);
}
});
}