lib/eventstream_rpc.ts (642 lines of code) (raw):
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/
/**
* @packageDocumentation
* @module eventstream_rpc
*/
import {CrtError, eventstream, io, cancel} from 'aws-crt';
import {EventEmitter} from 'events';
import {toUtf8} from "@aws-sdk/util-utf8-browser";
/**
* @internal
*
* Service model data about an individual operation
*/
export interface EventstreamRpcServiceModelOperation {
requestShape: string,
responseShape: string,
outboundMessageShape?: string,
inboundMessageShape?: string,
errorShapes: Set<string>
}
/**
* @internal
*
* Normalizers strip unmodeled data from a value (by making a new value with only modeled data)
*/
export type ShapeNormalizer = (value: any) => any;
/**
* @internal
*
* Validators throw errors if modeled data is of the wrong type, is missing when required, or violates any
* other checkable protocol constraint we can think of.
*
* Validation is only performed on input shapes. Output shapes must be left alone in order to support
* service evolution indepedent of client.
*/
export type ShapeValidator = (value: any) => void;
/**
* @internal
*
* Deserializers take a raw eventstream message and return a modeled shape. If a modeled shape could not be
* produced, an error is thrown.
*/
export type ShapeDeserializer = (message: eventstream.Message) => any;
/**
* @internal
*
* Serializers take a modeled shape and return an eventstream message. The assumption is that once a value
* has been both validated and normalized, serializing the final value will result in a message that
* an eventstream_rpc server can deserialize without error.
*/
export type ShapeSerializer = (value: any) => eventstream.Message;
/**
* @internal
*
* Collection of service-specific utility functions and definitions that enable an eventstream RPC client to
* correctly perform operations
*/
export interface EventstreamRpcServiceModel {
normalizers: Map<string, ShapeNormalizer>;
validators: Map<string, ShapeValidator>;
deserializers: Map<string, ShapeDeserializer>;
serializers: Map<string, ShapeSerializer>;
operations: Map<string, EventstreamRpcServiceModelOperation>;
enums: Map<string, Set<string>>;
}
/**
* Indicates the general category of an error thrown by the eventstream RPC implementation
*/
export enum RpcErrorType {
/**
* An error occurred while serializing a client model into a message in the eventstream protocol.
*/
SerializationError,
/**
* An error occurred while deserializing a message from the eventstream protocol into the client model.
*/
DeserializationError,
/**
* An error occurred during the connect-connack handshake between client and server. Usually this means
* the connect was not accepted by the server and thus hints at an authentication problem.
*/
HandshakeError,
/**
* An error that isn't classifiable as one of the other RpcErrorType values.
*/
InternalError,
/**
* An error occurred due to an attempt to invoke an API while the target operation or client is not in the
* right state to perform the API.
*/
ClientStateError,
/**
* An error occurred ostensibly due to an underlying networking failure.
*/
NetworkError,
/**
* An error occurred where the underlying transport was shut down unexpectedly.
*/
InterruptionError,
/**
* Invalid data was passed into the RPC client.
*/
ValidationError,
/**
* A formally-modeled error sent from server to client
*/
ServiceError
}
/**
* @internal
*/
interface RpcErrorModel {
type: RpcErrorType;
description: string;
internalError?: CrtError;
serviceError?: any;
};
/**
* Wrapper type for all exceptions thrown by rpc clients and operations. This includes rejected promises.
*
* The intention is for this data model to help users make better decisions in the presence of errors. Not all errors
* are fatal/terminal, but JS doesn't really give a natural way to classify or conditionally react to general errors.
*/
export class RpcError extends Error {
/** The error's broad category */
readonly type: RpcErrorType;
/** Plain language description of the error */
readonly description: string;
/** Optional inner/triggering error that can contain additional context. */
readonly internalError?: CrtError;
/** Optional service-specific modelled error data */
readonly serviceError?: any;
/** @internal */
constructor(model: RpcErrorModel) {
super(model.description);
this.type = model.type;
this.description = model.description;
if (model.internalError) {
this.internalError = model.internalError;
}
if (model.serviceError) {
this.serviceError = model.serviceError;
}
}
}
/**
* Wrapper for all data associated with an RPC client disconnection event
*/
export interface DisconnectionEvent {
/**
* Underlying reason for the disconnection
*/
reason : CrtError;
}
/**
* Event listener type signature for listening to client disconnection events
*/
export type DisconnectionListener = (eventData?: DisconnectionEvent) => void;
/**
* All data associated with the client successfully establishing an eventstream connection.
*
* Exists for future proofing at the moment. Could eventually take connack properties, etc...
*/
export interface SuccessfulConnectionResult {
}
/**
* Configuration for the (potentially) asynchronous message transformation applied to the CONNECT message
* sent by the client once the underlying transport connection has been completed.
*/
export interface RpcMessageTransformationOptions {
/**
* (CONNECT) message to transform
*/
message: eventstream.Message,
/**
* Optional controller that allows for cancellation of the asynchronous process. The transformation implementation
* is responsible for respecting this.
*/
cancelController?: cancel.ICancelController
}
/**
* Type signature for an asynchronous function that can transform eventstream messages. Used to allow client
* implementations to modify the initial eventstream connect message.
*/
export type RpcMessageTransformation = (options: RpcMessageTransformationOptions) => Promise<eventstream.Message>;
/**
* All configuration options for creating a new eventstream RPC client
*/
export interface RpcClientConfig {
/**
* Name of the host to connect to
*/
hostName: string;
/**
* Port of the host to connect to
*/
port: number;
/**
* Optional, additional socket options for the underlying connection
*/
socketOptions?: io.SocketOptions;
/**
* Optional TLS context to use when establishing a connection
*/
tlsCtx?: io.ClientTlsContext;
/**
* Optional message transformation function to apply to the eventstream connect message sent by the client.
*/
connectTransform?: RpcMessageTransformation;
}
/**
* Checks an RPC Client configuration structure and throws an exception if there is a problem with one of the
* required properties. Does explicit type checks in spite of typescript to validate even when used from a
* pure Javascript project.
*
* @param config RPC client configuration to validate
*/
export function validateRpcClientConfig(config: RpcClientConfig) {
if (!config) {
throw createRpcError(RpcErrorType.ValidationError, "Eventstream RPC client configuration is undefined");
}
if (!config.hostName) {
throw createRpcError(RpcErrorType.ValidationError, "Eventstream RPC client configuration must have a valid host name");
}
if (typeof config.hostName !== 'string') {
throw createRpcError(RpcErrorType.ValidationError, "Eventstream RPC client configuration host name must be a string");
}
if (config.port === undefined || config.port === null) {
throw createRpcError(RpcErrorType.ValidationError, "Eventstream RPC client configuration must have a valid port");
}
if (typeof config.port !== 'number' || !Number.isSafeInteger(config.port as number) || config.port < 0 || config.port > 65535) {
throw createRpcError(RpcErrorType.ValidationError, "Eventstream RPC client configuration host name must be 16-bit integer");
}
}
/**
* @internal a rough mirror of the internal connection state, but ultimately must be independent due to the more
* complex connection establishment process (connect/connack). Used to prevent API invocations when the client
* is not in the proper state to attempt them.
*/
enum ClientState {
None,
Connecting,
Connected,
Finished,
Closed
}
/**
* Configuration options for the RPC client's connect step
*/
export interface RpcClientConnectOptions {
/**
* Optional controller that allows the user to cancel the asynchronous connect process.
*
* For example:
*
* ```
* setTimeout(() => {controller.cancel();}, 30000);
* await client.connect({
* cancelController: controller
* });
* ```
*
* would apply a 30 second timeout to the client's connect call.
*/
cancelController?: cancel.ICancelController;
}
/**
* Eventstream RPC client - uses an underlying eventstream connection to implement the eventstream RPC protocol
*/
export class RpcClient extends EventEmitter {
private emitDisconnectOnClose : boolean;
private state: ClientState;
private connection: eventstream.ClientConnection;
private unclosedOperations? : Set<OperationBase>;
private disconnectionReason? : CrtError;
private constructor(private config: RpcClientConfig) {
super();
this.unclosedOperations = new Set<OperationBase>();
this.state = ClientState.None;
this.emitDisconnectOnClose = false;
let connectionOptions : eventstream.ClientConnectionOptions = {
hostName: config.hostName,
port: config.port,
socketOptions: config.socketOptions,
tlsCtx: config.tlsCtx
};
try {
// consider factoring connect timeout into socket options to help bound promise resolution/wait time in
// connect()
this.connection = new eventstream.ClientConnection(connectionOptions);
} catch (e) {
throw createRpcError(RpcErrorType.InternalError, "Failed to create eventstream connection", e as CrtError);
}
}
/**
* Factory method to create a new client
*
* @param config configuration options that the new client must use
*
* Returns a new client on success, otherwise throws an RpcError
*/
static new(config: RpcClientConfig) : RpcClient {
return new RpcClient(config);
}
/**
* Attempts to open a network connection to the configured remote endpoint. Returned promise will be fulfilled if
* the transport-level connection is successfully established and the eventstream handshake completes without
* error.
*
* Returns a promise that is resolved with additional context on a successful connection, otherwise rejected.
*
* connect() may only be called once.
*/
async connect(options?: RpcClientConnectOptions) : Promise<SuccessfulConnectionResult> {
return new Promise<SuccessfulConnectionResult>(async (resolve, reject) => {
if (this.state != ClientState.None) {
reject(createRpcError(RpcErrorType.ClientStateError, "RpcClient.connect() can only be called once"));
return;
}
let onDisconnectWhileConnecting : eventstream.DisconnectionListener = (eventData: eventstream.DisconnectionEvent) => {
if (this.state == ClientState.Connecting) {
this.state = ClientState.Finished;
reject(createRpcError(RpcErrorType.NetworkError, "RpcClient.connect() failed - connection closed"));
setImmediate(() => { this.close(); });
}
};
this.connection.on('disconnection', onDisconnectWhileConnecting);
this.state = ClientState.Connecting;
let connack = undefined;
try {
await this.connection.connect({
cancelController: options?.cancelController
});
// create, transform, and send the connect
let connectMessage: eventstream.Message = {
type: eventstream.MessageType.Connect
};
if (this.config.connectTransform) {
connectMessage = await this.config.connectTransform({
message: connectMessage,
cancelController: options?.cancelController
});
}
this._applyEventstreamRpcHeadersToConnect(connectMessage);
let connackPromise : Promise<eventstream.Message> = cancel.newCancellablePromiseFromNextEvent({
cancelController: options?.cancelController,
emitter : this.connection,
eventName : eventstream.ClientConnection.PROTOCOL_MESSAGE,
eventDataTransformer: (eventData: any) => { return (eventData as eventstream.MessageEvent).message; },
cancelMessage: "Eventstream connect() cancelled by user request"
});
await this.connection.sendProtocolMessage({
message: connectMessage,
cancelController: options?.cancelController
});
// wait for the conn ack or cancel
connack = await connackPromise;
} catch (err) {
if (this.state == ClientState.Connecting) {
this.state = ClientState.Finished;
setImmediate(() => { this.close(); });
}
reject(createRpcError(RpcErrorType.InternalError, "Failed to establish eventstream RPC connection", err as CrtError));
return;
}
if (this.state != ClientState.Connecting) {
reject(createRpcError(RpcErrorType.InternalError, "Eventstream RPC connection attempt interrupted"));
return;
}
if (!connack || !RpcClient.isValidConnack(connack)) {
this.state = ClientState.Finished;
reject(createRpcError(RpcErrorType.HandshakeError, "Failed to establish eventstream RPC connection - invalid connack"));
setImmediate(() => { this.close(); });
return;
}
/*
* Remove the promise-rejecting disconnect listener and replace it with a regular old listener that
* doesn't reject the connect() promise since we're going to resolve it now.
*/
this.connection.removeListener('disconnection', onDisconnectWhileConnecting);
this.connection.on('disconnection', (eventData: eventstream.DisconnectionEvent) => {
if (eventData.errorCode != 0) {
this.disconnectionReason = new CrtError(eventData.errorCode);
}
setImmediate(() => { this.close(); });
});
/* Per the client contract, we only emit disconnect after a successful connection establishment */
this.emitDisconnectOnClose = true;
this.state = ClientState.Connected;
resolve({});
});
}
/**
* Returns true if the connection is currently open and ready-to-use, false otherwise.
*/
isConnected() : boolean {
return this.state == ClientState.Connected;
}
/**
* @internal
*
* Adds an unclosed operation to the set tracked by the client. When the client is closed, all unclosed operations
* will also be closed. While not foolproof, this enables us to avoid many kinds of resource leaks when the user
* doesn't do exactly what we would like them to do (which may not be obvious to them, in all fairness).
*
* @param operation unclosed operation to register
*/
registerUnclosedOperation(operation: OperationBase) {
if (!this.isConnected() || !this.unclosedOperations) {
throw createRpcError(RpcErrorType.ClientStateError, "Operation registration only allowed when the client is connected");
}
this.unclosedOperations.add(operation);
}
/**
* @internal
*
* Removes an unclosed operation from the set tracked by the client. When the client is closed, all unclosed operations
* will also be closed.
*
* @param operation operation to remove, presumably because it just got closed
*/
removeUnclosedOperation(operation: OperationBase) {
if (this.unclosedOperations) {
this.unclosedOperations.delete(operation);
}
}
/**
* Shuts down the client and begins the process of release all native resources associated with the client
* and in-progress operations. It is critical that this function be called when finished with the client;
* otherwise, native resources will leak.
*
* The client tracks unclosed operations and, as part of this process, closes them as well.
*/
async close() : Promise<void> {
return new Promise<void>(async (resolve, reject) => {
try {
if (this.state == ClientState.Closed) {
resolve();
return;
}
this.state = ClientState.Closed;
if (this.emitDisconnectOnClose) {
this.emitDisconnectOnClose = false;
if (!this.disconnectionReason) {
this.disconnectionReason = new CrtError("User-initiated disconnect");
}
setImmediate(() => {
this.emit('disconnection', {reason: this.disconnectionReason});
});
}
if (this.unclosedOperations) {
let unclosedOperations: Set<OperationBase> = this.unclosedOperations;
this.unclosedOperations = undefined;
for (const operation of unclosedOperations) {
await operation.close();
}
}
this.connection.close();
resolve();
} catch (err) {
reject(err);
}
});
}
/**
* @internal
*
* Creates a new stream on the client's connection for an RPC operation to use.
*
* Returns a new stream on success, otherwise throws an RpcError
*/
newStream() : eventstream.ClientStream {
if (this.state != ClientState.Connected) {
throw createRpcError(RpcErrorType.ClientStateError, "New streams may only be created while the client is connected");
}
try {
return this.connection.newStream();
} catch (e) {
throw createRpcError(RpcErrorType.InternalError, "Failed to create new event stream", e as CrtError);
}
}
/**
* Event emitted when the client's underlying network connection is ended. Only emitted if the connection
* was previously successfully established, including a successful connect/connack handshake.
*
* Listener type: {@link DisconnectionListener}
*
* @event
*/
static DISCONNECTION : string = 'disconnection';
on(event: 'disconnection', listener: DisconnectionListener): this;
on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
}
private static isValidConnack(message: eventstream.Message) : boolean {
if (message.type != eventstream.MessageType.ConnectAck) {
return false;
}
if (((message.flags ?? 0) & eventstream.MessageFlags.ConnectionAccepted) == 0) {
return false;
}
return true;
}
private _applyEventstreamRpcHeadersToConnect(connectMessage : eventstream.Message) {
if (!connectMessage.headers) {
connectMessage.headers = [];
}
connectMessage.headers.push(
eventstream.Header.newString(':version', '0.1.0')
);
}
}
/**
* Event data wrapper for the result of activating an operation
*/
export interface OperationActivationResult {
}
/**
* @internal a rough mirror of the internal stream binding state.
*/
enum OperationState {
None,
Activating,
Activated,
Ended,
Closed
}
/**
* User-facing operation configuration
*/
export interface OperationOptions {
/**
* Optional cancel controller to cancel the sending of eventstream messages with. Cancellation includes both
* operation activation and sending of streaming messages. It does not affect a streaming operation's state.
*/
cancelController?: cancel.ICancelController;
/**
* Disables client-side data validation. Useful for testing how the client handles errors from the service.
*/
disableValidation? : boolean;
}
/**
* @internal
*
* Internal eventstream RPC operation configuration
*/
export interface OperationConfig {
/**
* Service-prefixed operation name. Ex: `awstest#EchoMessage`
*/
name: string;
/**
* RPC client to use to perform the operation
*/
client: RpcClient;
/**
* Additional user-supplied configuration
*/
options: OperationOptions;
}
/**
* @internal
*
* Common eventstream RPC operation class that includes self-cleaning functionality (via the RPC client's
* unclosed operations logic)
*/
class OperationBase extends EventEmitter {
private state : OperationState;
private stream : eventstream.ClientStream;
constructor(readonly operationConfig: OperationConfig) {
super();
this.state = OperationState.None;
this.stream = operationConfig.client.newStream();
operationConfig.client.registerUnclosedOperation(this);
}
/**
* Shuts down the operation's stream binding, with an optional flush of a termination message to the server.
* Also removes the operation from the associated client's unclosed operation set.
*/
async close() {
return new Promise<void>(async (resolve, reject) => {
if (this.state == OperationState.Closed) {
resolve();
return;
}
this.operationConfig.client.removeUnclosedOperation(this);
let shouldTerminateStream: boolean = this.state == OperationState.Activated;
this.state = OperationState.Closed;
if (shouldTerminateStream) {
try {
await this.stream.sendMessage({
message: {
type: eventstream.MessageType.ApplicationMessage,
flags: eventstream.MessageFlags.TerminateStream
}
});
} catch (e) {
// an exception generated from trying to gently end the stream should not propagate
}
}
setImmediate(() => {
this.stream.close();
});
resolve();
});
}
/**
* Activates an eventstream RPC operation
*
* @param message eventstream message to send as part of stream activation
*/
async activate(message: eventstream.Message) : Promise<OperationActivationResult> {
return new Promise<OperationActivationResult>(async (resolve, reject) => {
if (this.state != OperationState.None) {
reject(createRpcError(RpcErrorType.ClientStateError, "Eventstream operations may only have activate() invoked once"));
return;
}
this.state = OperationState.Activating;
try {
let activatePromise = this.stream.activate({
operation : this.operationConfig.name,
message : message,
cancelController : this.operationConfig.options.cancelController
});
await activatePromise;
} catch (e) {
if (this.state == OperationState.Activating) {
this.state = OperationState.Ended;
setImmediate(() => { this.close(); });
}
reject(createRpcError(RpcErrorType.InternalError, "Operation stream activation failure", e as CrtError));
return;
}
if (this.state != OperationState.Activating) {
reject(createRpcError(RpcErrorType.InternalError, "Operation stream activation interruption"));
return;
}
this.state = OperationState.Activated;
resolve({});
});
}
/**
* @return true if the stream is currently active and ready-to-use, false otherwise.
*/
isActive() : boolean {
return this.state == OperationState.Activated;
}
/**
* @return the operation's underlying event stream binding object
*/
getStream() : eventstream.ClientStream { return this.stream; }
/**
* Set this operation state to be "Ended" so that closing the operation will not send a terminate message.
*/
setStateEnded() {
this.state = OperationState.Ended;
}
}
/**
* Implementation for request-response eventstream RPC operations.
*/
export class RequestResponseOperation<RequestType, ResponseType> extends EventEmitter {
private operation : OperationBase;
/**
* @internal
*
* @param operationConfig
* @param serviceModel
*/
constructor(private operationConfig: OperationConfig, private serviceModel: EventstreamRpcServiceModel) {
if (!serviceModel.operations.has(operationConfig.name)) {
throw createRpcError(RpcErrorType.InternalError, `service model has no operation named ${operationConfig.name}`);
}
super();
this.operation = new OperationBase(this.operationConfig);
}
/**
* Performs the request-response interaction
*
* @param request modeled request data
*/
async activate(request: RequestType) : Promise<ResponseType> {
let resultPromise : Promise<ResponseType> = new Promise<ResponseType>(async (resolve, reject) => {
try {
let stream : eventstream.ClientStream = this.operation.getStream();
let responsePromise : Promise<eventstream.Message> = cancel.newCancellablePromiseFromNextEvent({
cancelController: this.operationConfig.options.cancelController,
emitter : stream,
eventName : eventstream.ClientStream.MESSAGE,
eventDataTransformer: (eventData: any) => { return (eventData as eventstream.MessageEvent).message; },
cancelMessage: "Eventstream execute() cancelled by user request"
});
if (!this.operationConfig.options.disableValidation) {
validateRequest(this.serviceModel, this.operationConfig.name, request);
}
let requestMessage: eventstream.Message = serializeRequest(this.serviceModel, this.operationConfig.name, request);
await this.operation.activate(requestMessage);
let message : eventstream.Message = await responsePromise;
// If the server terminated the stream, then set the operation to be ended immediately
if ((message.flags ?? 0) & eventstream.MessageFlags.TerminateStream) {
this.operation.setStateEnded();
}
let response : ResponseType = deserializeResponse(this.serviceModel, this.operationConfig.name, message);
resolve(response);
} catch (e) {
reject(e);
}
});
let autoClosePromise : Promise<ResponseType> = resultPromise.finally(async () => { await this.operation.close(); });
return autoClosePromise;
}
}
/**
* Event listener type signature for listening to streaming operation error events. These occcur after
* successful activation when the operation receives a modeled error or is unable to deserialize an inbound message
* into a modeled type.
*/
export type StreamingRpcErrorListener = (eventData: RpcError) => void;
/**
* Event data emitted when a streaming operation is ended.
*/
export interface StreamingOperationEndedEvent {
}
/**
* Event listener type signature for listening to ended events for streaming operations
*/
export type StreamingOperationEndedListener = (eventData: StreamingOperationEndedEvent) => void;
/**
* Implementation of a bi-direction streaming operation.
*
* TODO: may change slightly for uni-directional operations
*/
export class StreamingOperation<RequestType, ResponseType, OutboundMessageType, InboundMessageType> extends EventEmitter {
private operation : OperationBase;
private responseHandled : boolean;
/**
* @internal
*
* @param request
* @param operationConfig
* @param serviceModel
*/
constructor(private request: RequestType, private operationConfig: OperationConfig, private serviceModel: EventstreamRpcServiceModel) {
if (!serviceModel.operations.has(operationConfig.name)) {
throw createRpcError(RpcErrorType.InternalError, `service model has no operation named ${operationConfig.name}`);
}
if (!operationConfig.options.disableValidation) {
validateRequest(serviceModel, operationConfig.name, request);
}
super();
this.operation = new OperationBase(operationConfig);
this.responseHandled = false;
}
/**
* Activates a streaming operation
*/
async activate() : Promise<ResponseType> {
return new Promise<ResponseType>(async (resolve, reject) => {
try {
let stream : eventstream.ClientStream = this.operation.getStream();
stream.addListener(eventstream.ClientStream.MESSAGE, this._onStreamMessageEvent.bind(this));
stream.addListener(eventstream.ClientStream.ENDED, this._onStreamEndedEvent.bind(this));
let responsePromise : Promise<eventstream.Message> = cancel.newCancellablePromiseFromNextEvent({
cancelController: this.operationConfig.options.cancelController,
emitter : stream,
eventName : eventstream.ClientStream.MESSAGE,
eventDataTransformer: (eventData: any) => {
this.responseHandled = true;
return (eventData as eventstream.MessageEvent).message;
},
cancelMessage: "Eventstream execute() cancelled by user request"
});
let requestMessage: eventstream.Message = serializeRequest(this.serviceModel, this.operationConfig.name, this.request);
await this.operation.activate(requestMessage);
let message : eventstream.Message = await responsePromise;
let response : ResponseType = deserializeResponse(this.serviceModel, this.operationConfig.name, message);
// If the server terminated the stream, then set the operation to be ended immediately
if ((message.flags ?? 0) & eventstream.MessageFlags.TerminateStream) {
this.operation.setStateEnded();
// Server hung up on us. Immediately cleanup the operation state.
// Do this before resolving the promise so that any user-initiated
// requests will see the correct state, which is that the operation is closed.
await this.close();
}
resolve(response);
} catch (e) {
await this.close();
reject(e);
}
});
}
/**
* Sends an outbound message on a streaming operation, if the operation allows outbound streaming messages.
*
* @param message modeled data to send
*/
async sendMessage(message: OutboundMessageType) : Promise<void> {
return new Promise<void>(async (resolve, reject) => {
try {
if (!doesOperationAllowOutboundMessages(this.serviceModel, this.operationConfig.name)) {
throw createRpcError(RpcErrorType.ValidationError, `Operation '${this.operationConfig.name}' does not allow outbound streaming messages.`);
}
if (!this.operationConfig.options.disableValidation) {
validateOutboundMessage(this.serviceModel, this.operationConfig.name, message);
}
let serializedMessage: eventstream.Message = serializeOutboundMessage(this.serviceModel, this.operationConfig.name, message);
let stream: eventstream.ClientStream = this.operation.getStream();
await stream.sendMessage({
message: serializedMessage,
cancelController : this.operationConfig.options.cancelController
});
resolve();
} catch (err) {
reject(err);
}
});
}
/**
* Asynchronous close method for the underlying event stream. The user should call this function when finished
* with the operation in order to clean up native resources. Failing to do so will cause the native resources
* to persist until the client is closed. If the client is never closed then every unclosed operation will leak.
*/
async close() : Promise<void> {
return this.operation.close();
}
/**
* Event emitted when the operation's stream has ended. Only emitted if the stream was successfully activated.
*
* Listener type: {@link StreamingOperationEndedListener}
*
* @event
*/
static ENDED : string = 'ended';
/**
* Event emitted when an incoming eventstream message resulted in some kind of error. Usually this is either
* a modeled service error or a deserialization error for messages that cannot be mapped to the service model.
*
* Listener type: {@link StreamingRpcErrorListener}
*
* @event
*/
static STREAM_ERROR : string = 'streamError';
/**
* Event emitted when an incoming eventstream message is successfully deserialized into a modeled inbound streaming
* shape type.
*
* @event
*/
static MESSAGE : string = 'message';
on(event: 'ended', listener: StreamingOperationEndedListener): this;
on(event: 'streamError', listener: StreamingRpcErrorListener): this;
on(event: 'message', listener: (message: InboundMessageType) => void): this;
on(event: string | symbol, listener: (...args: any[]) => void): this {
super.on(event, listener);
return this;
}
private _onStreamMessageEvent(eventData: eventstream.MessageEvent) {
if (this.responseHandled) {
try {
let streamingMessage: InboundMessageType = deserializeInboundMessage(this.serviceModel, this.operationConfig.name, eventData.message);
setImmediate(() => {
this.emit(StreamingOperation.MESSAGE, streamingMessage);
});
} catch (err) {
setImmediate(() => { this.emit(StreamingOperation.STREAM_ERROR, err as RpcError); });
}
}
}
private _onStreamEndedEvent(eventData: eventstream.StreamEndedEvent) {
setImmediate(async () => {
this.emit(StreamingOperation.ENDED, {});
await this.close();
})
}
}
/**
* Utility function to create RpcError errors
*
* @param type type of error
* @param description longer description of error
* @param internalError optional CrtError that caused this error
* @param serviceError optional modeled eventstream RPC service error that triggered this error
*
* @return a new RpcError object
*/
export function createRpcError(type: RpcErrorType, description: string, internalError?: CrtError, serviceError?: any) {
return new RpcError({
type: type,
description: description,
internalError: internalError,
serviceError: serviceError
});
}
const SERVICE_MODEL_TYPE_HEADER_NAME : string = 'service-model-type';
const CONTENT_TYPE_HEADER_NAME : string = ':content-type';
const CONTENT_TYPE_PLAIN_TEXT : string = 'text/plain';
function getEventStreamMessageHeaderValueAsString(message: eventstream.Message, headerName : string) : string | undefined {
if (!message.headers) {
return undefined;
}
try {
for (const header of message.headers) {
if (header.name === headerName) {
return header.asString();
}
}
} catch (err) {
return undefined;
}
return undefined;
}
type OperationShapeSelector = (operation : EventstreamRpcServiceModelOperation) => string | undefined;
function validateShape(model: EventstreamRpcServiceModel, shapeName: string, shape: any) : void {
if (!shape) {
throw createRpcError(RpcErrorType.ValidationError, `Shape of type '${shapeName}' is undefined`);
}
let validator = model.validators.get(shapeName);
if (!validator) {
throw createRpcError(RpcErrorType.ValidationError, `No shape named '${shapeName}' exists in the service model`);
}
validator(shape);
}
function validateOperationShape(model: EventstreamRpcServiceModel, operationName: string, shape: any, shapeSelector : OperationShapeSelector) : void {
let operation = model.operations.get(operationName);
if (!operation) {
throw createRpcError(RpcErrorType.InternalError, `No operation named '${operationName}' exists in the service model`);
}
let selectedShape : string | undefined = shapeSelector(operation);
if (!selectedShape) {
throw createRpcError(RpcErrorType.ValidationError, `Operation '${operationName}' does not have a defined selection shape`);
}
return validateShape(model, selectedShape, shape);
}
function validateRequest(model: EventstreamRpcServiceModel, operationName: string, request: any) : void {
validateOperationShape(model, operationName, request, (operation : EventstreamRpcServiceModelOperation) => { return operation.requestShape; });
}
function validateOutboundMessage(model: EventstreamRpcServiceModel, operationName: string, message: any) : void {
validateOperationShape(model, operationName, message, (operation : EventstreamRpcServiceModelOperation) => { return operation.outboundMessageShape; });
}
function doesOperationAllowOutboundMessages(model: EventstreamRpcServiceModel, operationName: string) : boolean {
let operation = model.operations.get(operationName);
if (!operation) {
throw createRpcError(RpcErrorType.InternalError, `No operation named '${operationName}' exists in the service model`);
}
return operation.outboundMessageShape !== undefined;
}
function serializeMessage(model: EventstreamRpcServiceModel, operationName: string, message: any, shapeSelector: OperationShapeSelector) : eventstream.Message {
let operation = model.operations.get(operationName);
if (!operation) {
throw createRpcError(RpcErrorType.InternalError, `No operation named '${operationName}' exists in the service model`);
}
let shapeName : string | undefined = shapeSelector(operation);
if (!shapeName) {
throw createRpcError(RpcErrorType.InternalError, `Operation '${operationName}' does not have a defined selection shape`);
}
let serializer = model.serializers.get(shapeName);
if (!serializer) {
throw createRpcError(RpcErrorType.InternalError, `No top-level shape serializer for '${shapeName}' exists in the service model`);
}
return serializer(message);
}
function serializeRequest(model: EventstreamRpcServiceModel, operationName: string, request: any) : eventstream.Message {
return serializeMessage(model, operationName, request, (operation : EventstreamRpcServiceModelOperation) => { return operation.requestShape; });
}
function serializeOutboundMessage(model: EventstreamRpcServiceModel, operationName: string, message: any) : eventstream.Message {
return serializeMessage(model, operationName, message, (operation : EventstreamRpcServiceModelOperation) => { return operation.outboundMessageShape; });
}
function throwResponseError(model: EventstreamRpcServiceModel, errorShapes: Set<string>, shapeName: string | undefined, message: eventstream.Message) : void {
if (!shapeName) {
if (message.type != eventstream.MessageType.ApplicationMessage) {
if (message.type == eventstream.MessageType.ApplicationError) {
let contentType : string | undefined = getEventStreamMessageHeaderValueAsString(message, CONTENT_TYPE_HEADER_NAME);
if (contentType && contentType === CONTENT_TYPE_PLAIN_TEXT) {
let payloadAsString : string = toUtf8(new Uint8Array(message.payload as ArrayBuffer));;
throw createRpcError(RpcErrorType.InternalError, `Eventstream (response) message was not a modelled shape. Plain text payload is: '${payloadAsString}'`);
}
}
}
throw createRpcError(RpcErrorType.InternalError, "Eventstream (response) message was not an application message");
}
let isErrorShape : boolean = errorShapes.has(shapeName);
let serviceError : any | undefined = undefined;
if (isErrorShape) {
let errorDeserializer = model.deserializers.get(shapeName);
if (errorDeserializer) {
serviceError = errorDeserializer(message);
}
}
let errorType : RpcErrorType = serviceError ? RpcErrorType.ServiceError : RpcErrorType.InternalError;
let errorDescription : string = serviceError ? "Eventstream RPC request failed. Check serviceError property for details." : `Unexpected response shape received: '${shapeName}'`
let rpcError : RpcError = createRpcError(errorType, errorDescription, undefined, serviceError);
throw rpcError;
}
function deserializeMessage(model: EventstreamRpcServiceModel, operationName: string, message: eventstream.Message, shapeSelector : OperationShapeSelector) {
let operation = model.operations.get(operationName);
if (!operation) {
throw createRpcError(RpcErrorType.InternalError, `No operation named '${operationName}' exists in the service model`);
}
let messageShape : string | undefined = getEventStreamMessageHeaderValueAsString(message, SERVICE_MODEL_TYPE_HEADER_NAME);
let operationShape : string | undefined = shapeSelector(operation);
if (!messageShape || messageShape !== operationShape || !operationShape) {
throwResponseError(model, operation.errorShapes, messageShape, message);
return;
}
let deserializer = model.deserializers.get(operationShape);
if (!deserializer) {
throw createRpcError(RpcErrorType.InternalError, `No top-level shape deserializer for '${operationShape}' exists in the service model`);
}
let response = deserializer(message);
return response;
}
function deserializeResponse(model: EventstreamRpcServiceModel, operationName: string, message: eventstream.Message) : any {
return deserializeMessage(model, operationName, message, (operation : EventstreamRpcServiceModelOperation) => { return operation.responseShape;});
}
function deserializeInboundMessage(model: EventstreamRpcServiceModel, operationName: string, message: eventstream.Message) : any {
return deserializeMessage(model, operationName, message, (operation : EventstreamRpcServiceModelOperation) => { return operation.inboundMessageShape;});
}