in lib/eventstream_rpc.ts [740:775]
async activate(message: eventstream.Message) : Promise<OperationActivationResult> {
return new Promise<OperationActivationResult>(async (resolve, reject) => {
if (this.state != OperationState.None) {
reject(createRpcError(RpcErrorType.ClientStateError, "Eventstream operations may only have activate() invoked once"));
return;
}
this.state = OperationState.Activating;
try {
let activatePromise = this.stream.activate({
operation : this.operationConfig.name,
message : message,
cancelController : this.operationConfig.options.cancelController
});
await activatePromise;
} catch (e) {
if (this.state == OperationState.Activating) {
this.state = OperationState.Ended;
setImmediate(() => { this.close(); });
}
reject(createRpcError(RpcErrorType.InternalError, "Operation stream activation failure", e as CrtError));
return;
}
if (this.state != OperationState.Activating) {
reject(createRpcError(RpcErrorType.InternalError, "Operation stream activation interruption"));
return;
}
this.state = OperationState.Activated;
resolve({});
});
}