export function create()

in sdk/eventhub/event-hubs/src/connectionContext.ts [158:459]


  export function create(
    config: EventHubConnectionConfig,
    tokenCredential: SasTokenProvider | TokenCredential,
    options?: ConnectionContextOptions,
  ): ConnectionContext {
    if (!options) options = {};

    config.webSocket = options.webSocketOptions && options.webSocketOptions.webSocket;
    config.webSocketEndpointPath = "$servicebus/websocket";
    config.webSocketConstructorOptions =
      options.webSocketOptions && options.webSocketOptions.webSocketConstructorOptions;

    const parameters: CreateConnectionContextBaseParameters = {
      config: config,
      // re-enabling this will be a post-GA discussion.
      // dataTransformer: options.dataTransformer,
      isEntityPathRequired: true,
      connectionProperties: {
        product: "MSJSClient",
        userAgent: getUserAgent(options),
        version: packageJsonInfo.version,
      },
    };
    // Let us create the base context and then add EventHub specific ConnectionContext properties.
    const connectionContext = ConnectionContextBase.create(parameters) as ConnectionContext;
    connectionContext.tokenCredential = tokenCredential;
    connectionContext.wasConnectionCloseCalled = false;
    connectionContext.senders = {};
    connectionContext.receivers = {};
    const mOptions: ManagementClientOptions = {
      address: options.managementSessionAddress,
      audience: options.managementSessionAudience,
    };
    connectionContext.managementSession = new ManagementClient(connectionContext, mOptions);

    let waitForConnectionRefreshResolve: () => void;
    let waitForConnectionRefreshPromise: Promise<void> | undefined;

    Object.assign<ConnectionContext, ConnectionContextMethods>(connectionContext, {
      isConnectionClosing() {
        // When the connection is not open, but the remote end is open,
        // then the rhea connection is in the process of terminating.
        return Boolean(!this.connection.isOpen() && this.connection.isRemoteOpen());
      },
      async readyToOpenLink(optionsArg?: { abortSignal?: AbortSignalLike }) {
        // Check that the connection isn't in the process of closing.
        // This can happen when the idle timeout has been reached but
        // the underlying socket is waiting to be destroyed.
        if (this.isConnectionClosing()) {
          // Wait for the disconnected event that indicates the underlying socket has closed.
          await this.waitForDisconnectedEvent(optionsArg);
        }

        // Wait for the connection to be reset.
        await this.waitForConnectionReset();
      },
      waitForDisconnectedEvent(optionsArg?: { abortSignal?: AbortSignalLike }) {
        return createAbortablePromise((resolve) => {
          logger.verbose(
            `[${this.connectionId}] Attempting to reinitialize connection` +
              ` but the connection is in the process of closing.` +
              ` Waiting for the disconnect event before continuing.`,
          );
          this.connection.once(ConnectionEvents.disconnected, resolve);
        }, optionsArg);
      },
      waitForConnectionReset() {
        // Check if the connection is currently in the process of disconnecting.
        if (waitForConnectionRefreshPromise) {
          return waitForConnectionRefreshPromise;
        }
        return Promise.resolve();
      },
      async close() {
        try {
          if (this.connection.isOpen()) {
            // Close all the senders.
            await Promise.all(
              Object.keys(connectionContext.senders).map((name) =>
                connectionContext.senders[name]?.close().catch(() => {
                  /* error already logged, swallow it here */
                }),
              ),
            );
            // Close all the receivers.
            await Promise.all(
              Object.keys(connectionContext.receivers).map((name) =>
                connectionContext.receivers[name]?.close().catch(() => {
                  /* error already logged, swallow it here */
                }),
              ),
            );
            // Close the cbs session;
            await this.cbsSession.close();
            // Close the management session
            await this.managementSession?.close();
            await this.connection.close();
            this.wasConnectionCloseCalled = true;
            logger.info("Closed the amqp connection '%s' on the client.", this.connectionId);
          }
        } catch (err: any) {
          const errorDescription =
            err instanceof Error ? `${err.name}: ${err.message}` : JSON.stringify(err);
          logger.warning(
            `An error occurred while closing the connection "${this.connectionId}":\n${errorDescription}`,
          );
          logErrorStackTrace(err);
          throw err;
        }
      },
    });

    // Define listeners to be added to the connection object for
    // "connection_open" and "connection_error" events.
    const onConnectionOpen: OnAmqpEvent = () => {
      connectionContext.wasConnectionCloseCalled = false;
      logger.verbose(
        "[%s] setting 'wasConnectionCloseCalled' property of connection context to %s.",
        connectionContext.connection.id,
        connectionContext.wasConnectionCloseCalled,
      );
    };

    const onDisconnected: OnAmqpEvent = async (context: EventContext) => {
      if (waitForConnectionRefreshPromise) {
        return;
      }
      waitForConnectionRefreshPromise = new Promise((resolve) => {
        waitForConnectionRefreshResolve = resolve;
      });
      try {
        logger.verbose(
          "[%s] 'disconnected' event occurred on the amqp connection.",
          connectionContext.connection.id,
        );

        if (context.connection && context.connection.error) {
          logger.verbose(
            "[%s] Accompanying error on the context.connection: %O",
            connectionContext.connection.id,
            context.connection && context.connection.error,
          );
        }
        if (context.error) {
          logger.verbose(
            "[%s] Accompanying error on the context: %O",
            connectionContext.connection.id,
            context.error,
          );
        }
        const state: Readonly<{
          wasConnectionCloseCalled: boolean;
          numSenders: number;
          numReceivers: number;
        }> = {
          wasConnectionCloseCalled: connectionContext.wasConnectionCloseCalled,
          numSenders: Object.keys(connectionContext.senders).length,
          numReceivers: Object.keys(connectionContext.receivers).length,
        };
        logger.verbose(
          "[%s] Closing all open senders and receivers in the state: %O",
          connectionContext.connection.id,
          state,
        );

        // Clear internal map maintained by rhea to avoid reconnecting of old links once the
        // connection is back up.
        connectionContext.connection.removeAllSessions();

        // Close the cbs session to ensure all the event handlers are released.
        await connectionContext.cbsSession?.close().catch(() => {
          /* error already logged, swallow it here */
        });
        // Close the management session to ensure all the event handlers are released.
        await connectionContext.managementSession?.close().catch(() => {
          /* error already logged, swallow it here */
        });

        // Close all senders and receivers to ensure clean up of timers & other resources.
        if (state.numSenders || state.numReceivers) {
          await Promise.all(
            Object.keys(connectionContext.senders).map((name) =>
              connectionContext.senders[name]?.close().catch(() => {
                /* error already logged, swallow it here */
              }),
            ),
          );

          await Promise.all(
            Object.keys(connectionContext.receivers).map((name) =>
              connectionContext.receivers[name]?.close().catch(() => {
                /* error already logged, swallow it here */
              }),
            ),
          );
        }
      } catch (err: any) {
        logger.verbose(
          `[${connectionContext.connectionId}] An error occurred while closing the connection in 'disconnected'. %O`,
          err,
        );
      }

      try {
        await refreshConnection(connectionContext);
      } catch (err: any) {
        logger.verbose(
          `[${connectionContext.connectionId}] An error occurred while refreshing the connection in 'disconnected'. %O`,
          err,
        );
      } finally {
        waitForConnectionRefreshResolve();
        waitForConnectionRefreshPromise = undefined;
      }
    };

    const protocolError: OnAmqpEvent = async (context: EventContext) => {
      logger.verbose(
        "[%s] 'protocol_error' event occurred on the amqp connection.",
        connectionContext.connection.id,
      );

      if (context.connection && context.connection.error) {
        logger.verbose(
          "[%s] Accompanying error on the context.connection: %O",
          connectionContext.connection.id,
          context.connection && context.connection.error,
        );
      }
      if (context.error) {
        logger.verbose(
          "[%s] Accompanying error on the context: %O",
          connectionContext.connection.id,
          context.error,
        );
      }
    };

    const error: OnAmqpEvent = async (context: EventContext) => {
      logger.verbose(
        "[%s] 'error' event occurred on the amqp connection.",
        connectionContext.connection.id,
      );

      if (context.connection && context.connection.error) {
        logger.verbose(
          "[%s] Accompanying error on the context.connection: %O",
          connectionContext.connection.id,
          context.connection && context.connection.error,
        );
      }
      if (context.error) {
        logger.verbose(
          "[%s] Accompanying error on the context: %O",
          connectionContext.connection.id,
          context.error,
        );
      }
    };

    function addConnectionListeners(connection: Connection): void {
      // Add listeners on the connection object.
      connection.on(ConnectionEvents.connectionOpen, onConnectionOpen);
      connection.on(ConnectionEvents.disconnected, onDisconnected);
      connection.on(ConnectionEvents.protocolError, protocolError);
      connection.on(ConnectionEvents.error, error);
    }

    function cleanConnectionContext(context: ConnectionContext): Promise<void> {
      // Remove listeners from the connection object.
      context.connection.removeListener(ConnectionEvents.connectionOpen, onConnectionOpen);
      context.connection.removeListener(ConnectionEvents.disconnected, onDisconnected);
      context.connection.removeListener(ConnectionEvents.protocolError, protocolError);
      context.connection.removeListener(ConnectionEvents.error, error);
      // Close the connection
      return context.connection.close();
    }

    async function refreshConnection(context: ConnectionContext): Promise<void> {
      const originalConnectionId = context.connectionId;
      try {
        await cleanConnectionContext(context);
      } catch (err: any) {
        logger.verbose(
          `[${context.connectionId}] There was an error closing the connection before reconnecting: %O`,
          err,
        );
      }

      // Create a new connection, id, locks, and cbs client.
      context.refreshConnection();
      addConnectionListeners(context.connection);
      logger.verbose(
        `The connection "${originalConnectionId}" has been updated to "${context.connectionId}".`,
      );
    }

    addConnectionListeners(connectionContext.connection);

    logger.verbose("[%s] Created connection context successfully.", connectionContext.connectionId);
    return connectionContext;
  }