internal async Task ExecuteReaderAsync()

in MySQL.Data/src/MySqlCommand.cs [679:850]


    internal async Task<MySqlDataReader> ExecuteReaderAsync(CommandBehavior behavior, bool execAsync, CancellationToken cancellationToken = default)
    {
      // give our interceptors a shot at it first
      MySqlDataReader interceptedReader = null;

      if (connection?.commandInterceptor != null && connection.commandInterceptor.ExecuteReader(CommandText, behavior, ref interceptedReader))
        return interceptedReader;

      // interceptors didn't handle this so we fall through
      bool success = false;
      CheckState();
      Driver driver = connection.driver;

      cmdText = cmdText.Trim();
      if (String.IsNullOrEmpty(cmdText))
        Throw(new InvalidOperationException(Resources.CommandTextNotInitialized));

      string sql = cmdText.Trim(';');

      // Load balancing getting a new connection
      if (connection.hasBeenOpen && !driver.HasStatus(ServerStatusFlags.InTransaction))
        await ReplicationManager.GetNewConnectionAsync(connection.Settings.Server, !IsReadOnlyCommand(sql), connection, execAsync, cancellationToken).ConfigureAwait(false);

      SemaphoreSlim semaphoreSlim = new(1);
      semaphoreSlim.Wait();

      // We have to recheck that there is no reader, after we got the lock
      if (connection.Reader != null)
        Throw(new MySqlException(Resources.DataReaderOpen));

      System.Transactions.Transaction curTrans = System.Transactions.Transaction.Current;

      if (curTrans != null)
      {
        bool inRollback = false;
        //TODO: ADD support for 452 and 46X
        if (driver.currentTransaction != null)
          inRollback = driver.currentTransaction.InRollback;

        if (!inRollback)
        {
          System.Transactions.TransactionStatus status = System.Transactions.TransactionStatus.InDoubt;
          try
          {
            // in some cases (during state transitions) this throws
            // an exception. Ignore exceptions, we're only interested 
            // whether transaction was aborted or not.
            status = curTrans.TransactionInformation.Status;
          }
          catch (System.Transactions.TransactionException) { }

          if (status == System.Transactions.TransactionStatus.Aborted)
            Throw(new System.Transactions.TransactionAbortedException());
        }
      }

      commandTimer = new CommandTimer(connection, CommandTimeout);
      LastInsertedId = -1;

      if (CommandType == CommandType.TableDirect)
        sql = "SELECT * FROM " + sql;

      // if we are on a replicated connection, we are only allow readonly statements
      if (connection.Settings.Replication && !InternallyCreated)
        EnsureCommandIsReadOnly(sql);

      if (statement == null || !statement.IsPrepared)
      {
        if (CommandType == CommandType.StoredProcedure)
          statement = new StoredProcedure(this, sql);
        else
          statement = new PreparableStatement(this, sql);
      }

      // stored procs are the only statement type that need do anything during resolve
      statement.Resolve(false);

      // Now that we have completed our resolve step, we can handle our
      // command behaviors
      await HandleCommandBehaviorsAsync(execAsync, behavior).ConfigureAwait(false);


      // Tell whoever is listening that we have started out command
#if NET5_0_OR_GREATER
      CurrentActivity = MySQLActivitySource.CommandStart(this);
#endif
      try
      {
        MySqlDataReader reader = new MySqlDataReader(this, statement, behavior);
        connection.Reader = reader;
        Canceled = false;
        // execute the statement
        await statement.ExecuteAsync(execAsync).ConfigureAwait(false);
        // wait for data to return
        await reader.NextResultAsync(execAsync, cancellationToken).ConfigureAwait(false);
        success = true;
        return reader;
      }
      catch (Exception ex)
      {
#if NET5_0_OR_GREATER
        MySQLActivitySource.SetException(CurrentActivity, ex);
#endif    
        if (ex is TimeoutException) 
        {
          await connection.HandleTimeoutOrThreadAbortAsync(ex, execAsync).ConfigureAwait(false);
          throw; //unreached
        } 
        else if (ex is ThreadAbortException)
        {
          await connection.HandleTimeoutOrThreadAbortAsync(ex, execAsync).ConfigureAwait(false);
          throw;
        } 
        else if (ex is IOException)
        {
          await connection.AbortAsync(execAsync).ConfigureAwait(false); // Closes connection without returning it to the pool
          throw new MySqlException(Resources.FatalErrorDuringExecute, ex);
        } 
        else if (ex is MySqlException)
        {
          MySqlException mySqlException = ex as MySqlException;
          if (mySqlException.InnerException is TimeoutException)
            throw; // already handled

          try
          {
            await ResetReaderAsync(execAsync).ConfigureAwait(false);
            await ResetSqlSelectLimitAsync(execAsync).ConfigureAwait(false);
          }
          catch (Exception)
          {
            // Reset SqlLimit did not work, connection is hosed.
            await Connection.AbortAsync(execAsync).ConfigureAwait(false);
            throw new MySqlException(ex.Message, true, ex);
          }

          // if we caught an exception because of a cancel, then just return null
          if (mySqlException.IsQueryAborted)
            return null;
          if (mySqlException.IsFatal)
            await Connection.CloseAsync(execAsync).ConfigureAwait(false);
          if (mySqlException.Number == 0)
            throw new MySqlException(Resources.FatalErrorDuringExecute, mySqlException);
          throw;
        }
        else
        {
          throw;
        }
      }
      finally
      {
        if (connection != null)
        {
          if (connection.Reader == null)
          {
            // Something went seriously wrong,  and reader would not
            // be able to clear timeout on closing.
            // So we clear timeout here.
            ClearCommandTimer();
          }
          if (!success)
          {
            // ExecuteReader failed.Close Reader and set to null to 
            // prevent subsequent errors with DataReaderOpen
            await ResetReaderAsync(execAsync).ConfigureAwait(false);
          }
        }

        semaphoreSlim.Release();
      }
    }