in sdk/servicebus/service-bus/src/connectionContext.ts [249:606]
export function create(
config: ConnectionConfig,
tokenCredential: SasTokenProvider | TokenCredential,
options?: ServiceBusClientOptions,
): ConnectionContext {
if (!options) options = {};
const userAgent = `${formatUserAgentPrefix(
options.userAgentOptions?.userAgentPrefix,
)} ${getRuntimeInfo()}`;
const parameters: CreateConnectionContextBaseParameters = {
config: config,
// re-enabling this will be a post-GA discussion similar to event-hubs.
// dataTransformer: options.dataTransformer,
isEntityPathRequired: false,
connectionProperties: {
product: "MSJSClient",
userAgent,
version: packageJsonInfo.version,
},
};
// Let us create the base context and then add ServiceBus specific ConnectionContext properties.
const connectionContext = ConnectionContextBase.create(parameters) as ConnectionContext;
connectionContext.tokenCredential = tokenCredential;
connectionContext.senders = {};
connectionContext.messageReceivers = {};
connectionContext.messageSessions = {};
connectionContext.managementClients = {};
let waitForConnectionRefreshResolve: () => void;
let waitForConnectionRefreshPromise: Promise<void> | undefined;
Object.assign<ConnectionContext, ConnectionContextMethods>(connectionContext, {
isConnectionClosing() {
// When the connection is not open, but the remote end is open,
// then the rhea connection is in the process of terminating.
return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen());
},
async readyToOpenLink() {
logger.verbose(
`[${this.connectionId}] Waiting until the connection is ready to open link.`,
);
// Check that the connection isn't in the process of closing.
// This can happen when the idle timeout has been reached but
// the underlying socket is waiting to be destroyed.
if (this.isConnectionClosing()) {
logger.verbose(
`[${this.connectionId}] Connection is closing, waiting for disconnected event`,
);
// Wait for the disconnected event that indicates the underlying socket has closed.
await this.waitForDisconnectedEvent();
}
// Wait for the connection to be reset.
await this.waitForConnectionReset();
logger.verbose(`[${this.connectionId}] Connection is ready to open link.`);
},
waitForDisconnectedEvent() {
return new Promise((resolve) => {
logger.verbose(
`[${this.connectionId}] Attempting to reinitialize connection` +
` but the connection is in the process of closing.` +
` Waiting for the disconnect event before continuing.`,
);
this.connection.once(ConnectionEvents.disconnected, resolve);
});
},
waitForConnectionReset() {
// Check if the connection is currently in the process of disconnecting.
if (waitForConnectionRefreshPromise) {
logger.verbose(`[${this.connectionId}] Waiting for connection reset`);
return waitForConnectionRefreshPromise;
}
logger.verbose(
`[${this.connectionId}] Connection not waiting to be reset. Resolving immediately.`,
);
return Promise.resolve();
},
getReceiverFromCache(
receiverName: string,
sessionId?: string,
): MessageReceiver | MessageSession | undefined {
if (sessionId != null && this.messageSessions[receiverName]) {
return this.messageSessions[receiverName];
}
if (this.messageReceivers[receiverName]) {
return this.messageReceivers[receiverName];
}
let existingReceivers = "";
if (sessionId != null) {
for (const messageSessionName of Object.keys(this.messageSessions)) {
if (this.messageSessions[messageSessionName].sessionId === sessionId) {
existingReceivers = this.messageSessions[messageSessionName].name;
break;
}
}
} else {
existingReceivers +=
(existingReceivers ? ", " : "") + Object.keys(this.messageReceivers).join(",");
}
logger.verbose(
"[%s] Failed to find receiver '%s' among existing receivers: %s",
this.connectionId,
receiverName,
existingReceivers,
);
return;
},
getManagementClient(entityPath: string): ManagementClient {
if (!this.managementClients[entityPath]) {
this.managementClients[entityPath] = new ManagementClient(this, entityPath, {
address: `${entityPath}/$management`,
});
}
return this.managementClients[entityPath];
},
});
// Define listeners to be added to the connection object for
// "connection_open" and "connection_error" events.
const onConnectionOpen: OnAmqpEvent = () => {
connectionContext.wasConnectionCloseCalled = false;
logger.verbose(
"[%s] setting 'wasConnectionCloseCalled' property of connection context to %s.",
connectionContext.connection.id,
connectionContext.wasConnectionCloseCalled,
);
};
const disconnected: OnAmqpEvent = async (context: EventContext) => {
if (waitForConnectionRefreshPromise) {
return;
}
waitForConnectionRefreshPromise = new Promise((resolve) => {
waitForConnectionRefreshResolve = resolve;
});
const connectionError =
context.connection && context.connection.error ? context.connection.error : undefined;
if (connectionError) {
logger.logError(
connectionError,
"[%s] Error (context.connection.error) occurred on the amqp connection",
connectionContext.connection.id,
);
}
const contextError = context.error;
if (contextError) {
logger.logError(
contextError,
"[%s] Error (context.error) occurred on the amqp connection",
connectionContext.connection.id,
);
}
const state: Readonly<{
wasConnectionCloseCalled: boolean;
numSenders: number;
numReceivers: number;
}> = {
wasConnectionCloseCalled: connectionContext.wasConnectionCloseCalled,
numSenders: Object.keys(connectionContext.senders).length,
numReceivers:
Object.keys(connectionContext.messageReceivers).length +
Object.keys(connectionContext.messageSessions).length,
};
// Clear internal map maintained by rhea to avoid reconnecting of old links once the
// connection is back up.
connectionContext.connection.removeAllSessions();
// Close the cbs session to ensure all the event handlers are released.
await connectionContext.cbsSession.close();
// Close the management sessions to ensure all the event handlers are released.
for (const entityPath of Object.keys(connectionContext.managementClients)) {
await connectionContext.managementClients[entityPath].close();
}
if (state.wasConnectionCloseCalled) {
// Do Nothing
} else {
// Calling onDetached on sender
if (state.numSenders) {
// We don't do recovery for the sender:
// Because we don't want to keep the sender active all the time
// and the "next" send call would bear the burden of creating the link.
// Call onDetached() on sender so that it can gracefully shutdown
// by cleaning up the timers and closing the links.
// We don't call onDetached for sender after `refreshConnection()`
// because any new send calls that potentially initialize links would also get affected if called later.
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${state.numSenders} ` +
`senders. We should not reconnect.`,
);
const detachCalls: Promise<void>[] = [];
for (const senderName of Object.keys(connectionContext.senders)) {
const sender = connectionContext.senders[senderName];
if (sender) {
logger.verbose(
"[%s] calling detached on sender '%s'.",
connectionContext.connection.id,
sender.name,
);
detachCalls.push(
sender.onDetached().catch((err) => {
logger.logError(
err,
"[%s] An error occurred while calling onDetached() the sender '%s'",
connectionContext.connection.id,
sender.name,
);
}),
);
}
}
await Promise.all(detachCalls);
}
// Calling onDetached on batching receivers for the same reasons as sender
const numBatchingReceivers = getNumberOfReceivers(connectionContext, "batching");
if (numBatchingReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numBatchingReceivers} ` +
`batching receivers. We should not reconnect.`,
);
// Call onDetached() on receivers so that batching receivers it can gracefully close any ongoing batch operation
await callOnDetachedOnReceivers(
connectionContext,
connectionError || contextError,
"batching",
);
}
// Calling onDetached on session receivers
const numSessionReceivers = getNumberOfReceivers(connectionContext, "session");
if (numSessionReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numSessionReceivers} ` +
`session receivers. We should close them.`,
);
await callOnDetachedOnSessionReceivers(
connectionContext,
connectionError || contextError,
);
}
}
await refreshConnection();
waitForConnectionRefreshResolve();
waitForConnectionRefreshPromise = undefined;
// The connection should always be brought back up if the sdk did not call connection.close()
// and there was at least one receiver link on the connection before it went down.
logger.verbose("[%s] state: %O", connectionContext.connectionId, state);
// Calling onDetached on streaming receivers
const numStreamingReceivers = getNumberOfReceivers(connectionContext, "streaming");
if (!state.wasConnectionCloseCalled && numStreamingReceivers) {
logger.verbose(
`[${connectionContext.connection.id}] connection.close() was not called from the sdk and there were ${numStreamingReceivers} ` +
`streaming receivers. We should reconnect.`,
);
// Calling `onDetached()` on streaming receivers after the refreshConnection() since `onDetached()` would
// recover the streaming receivers and that would only be possible after the connection is refreshed.
//
// This is different from the batching receiver since `onDetached()` for the batching receiver would
// return the outstanding messages and close the receive link.
await callOnDetachedOnReceivers(
connectionContext,
connectionError || contextError,
"streaming",
);
}
};
const protocolError: OnAmqpEvent = async (context: EventContext) => {
if (context.connection && context.connection.error) {
logger.logError(
context.connection.error,
"[%s] Error (context.connection.error) occurred on the amqp connection",
connectionContext.connection.id,
);
}
if (context.error) {
logger.logError(
context.error,
"[%s] Error (context.error) occurred on the amqp connection",
connectionContext.connection.id,
);
}
};
const error: OnAmqpEvent = async (context: EventContext) => {
if (context.connection && context.connection.error) {
logger.logError(
context.connection.error,
"[%s] Error (context.connection.error) occurred on the amqp connection",
connectionContext.connection.id,
);
}
if (context.error) {
logger.logError(
context.error,
"[%s] Error (context.error) occurred on the amqp connection",
connectionContext.connection.id,
);
}
};
async function refreshConnection(): Promise<void> {
const originalConnectionId = connectionContext.connectionId;
try {
await cleanConnectionContext();
} catch (err: any) {
logger.logError(
err,
`[${connectionContext.connectionId}] There was an error closing the connection before reconnecting`,
);
}
// Create a new connection, id, locks, and cbs client.
connectionContext.refreshConnection();
addConnectionListeners(connectionContext.connection);
logger.verbose(
`The connection "${originalConnectionId}" has been updated to "${connectionContext.connectionId}".`,
);
}
function addConnectionListeners(connection: Connection): void {
// Add listeners on the connection object.
connection.on(ConnectionEvents.connectionOpen, onConnectionOpen);
connection.on(ConnectionEvents.disconnected, disconnected);
connection.on(ConnectionEvents.protocolError, protocolError);
connection.on(ConnectionEvents.error, error);
}
async function cleanConnectionContext(): Promise<void> {
// Remove listeners from the connection object.
connectionContext.connection.removeListener(
ConnectionEvents.connectionOpen,
onConnectionOpen,
);
connectionContext.connection.removeListener(ConnectionEvents.disconnected, disconnected);
connectionContext.connection.removeListener(ConnectionEvents.protocolError, protocolError);
connectionContext.connection.removeListener(ConnectionEvents.error, error);
// Close the connection
await connectionContext.connection.close();
}
addConnectionListeners(connectionContext.connection);
logger.verbose("[%s] Created connection context successfully.", connectionContext.connectionId);
return connectionContext;
}