in MySQL.Data/src/MySqlCommand.cs [679:850]
internal async Task<MySqlDataReader> ExecuteReaderAsync(CommandBehavior behavior, bool execAsync, CancellationToken cancellationToken = default)
{
// give our interceptors a shot at it first
MySqlDataReader interceptedReader = null;
if (connection?.commandInterceptor != null && connection.commandInterceptor.ExecuteReader(CommandText, behavior, ref interceptedReader))
return interceptedReader;
// interceptors didn't handle this so we fall through
bool success = false;
CheckState();
Driver driver = connection.driver;
cmdText = cmdText.Trim();
if (String.IsNullOrEmpty(cmdText))
Throw(new InvalidOperationException(Resources.CommandTextNotInitialized));
string sql = cmdText.Trim(';');
// Load balancing getting a new connection
if (connection.hasBeenOpen && !driver.HasStatus(ServerStatusFlags.InTransaction))
await ReplicationManager.GetNewConnectionAsync(connection.Settings.Server, !IsReadOnlyCommand(sql), connection, execAsync, cancellationToken).ConfigureAwait(false);
SemaphoreSlim semaphoreSlim = new(1);
semaphoreSlim.Wait();
// We have to recheck that there is no reader, after we got the lock
if (connection.Reader != null)
Throw(new MySqlException(Resources.DataReaderOpen));
System.Transactions.Transaction curTrans = System.Transactions.Transaction.Current;
if (curTrans != null)
{
bool inRollback = false;
//TODO: ADD support for 452 and 46X
if (driver.currentTransaction != null)
inRollback = driver.currentTransaction.InRollback;
if (!inRollback)
{
System.Transactions.TransactionStatus status = System.Transactions.TransactionStatus.InDoubt;
try
{
// in some cases (during state transitions) this throws
// an exception. Ignore exceptions, we're only interested
// whether transaction was aborted or not.
status = curTrans.TransactionInformation.Status;
}
catch (System.Transactions.TransactionException) { }
if (status == System.Transactions.TransactionStatus.Aborted)
Throw(new System.Transactions.TransactionAbortedException());
}
}
commandTimer = new CommandTimer(connection, CommandTimeout);
LastInsertedId = -1;
if (CommandType == CommandType.TableDirect)
sql = "SELECT * FROM " + sql;
// if we are on a replicated connection, we are only allow readonly statements
if (connection.Settings.Replication && !InternallyCreated)
EnsureCommandIsReadOnly(sql);
if (statement == null || !statement.IsPrepared)
{
if (CommandType == CommandType.StoredProcedure)
statement = new StoredProcedure(this, sql);
else
statement = new PreparableStatement(this, sql);
}
// stored procs are the only statement type that need do anything during resolve
statement.Resolve(false);
// Now that we have completed our resolve step, we can handle our
// command behaviors
await HandleCommandBehaviorsAsync(execAsync, behavior).ConfigureAwait(false);
// Tell whoever is listening that we have started out command
#if NET5_0_OR_GREATER
CurrentActivity = MySQLActivitySource.CommandStart(this);
#endif
try
{
MySqlDataReader reader = new MySqlDataReader(this, statement, behavior);
connection.Reader = reader;
Canceled = false;
// execute the statement
await statement.ExecuteAsync(execAsync).ConfigureAwait(false);
// wait for data to return
await reader.NextResultAsync(execAsync, cancellationToken).ConfigureAwait(false);
success = true;
return reader;
}
catch (Exception ex)
{
#if NET5_0_OR_GREATER
MySQLActivitySource.SetException(CurrentActivity, ex);
#endif
if (ex is TimeoutException)
{
await connection.HandleTimeoutOrThreadAbortAsync(ex, execAsync).ConfigureAwait(false);
throw; //unreached
}
else if (ex is ThreadAbortException)
{
await connection.HandleTimeoutOrThreadAbortAsync(ex, execAsync).ConfigureAwait(false);
throw;
}
else if (ex is IOException)
{
await connection.AbortAsync(execAsync).ConfigureAwait(false); // Closes connection without returning it to the pool
throw new MySqlException(Resources.FatalErrorDuringExecute, ex);
}
else if (ex is MySqlException)
{
MySqlException mySqlException = ex as MySqlException;
if (mySqlException.InnerException is TimeoutException)
throw; // already handled
try
{
await ResetReaderAsync(execAsync).ConfigureAwait(false);
await ResetSqlSelectLimitAsync(execAsync).ConfigureAwait(false);
}
catch (Exception)
{
// Reset SqlLimit did not work, connection is hosed.
await Connection.AbortAsync(execAsync).ConfigureAwait(false);
throw new MySqlException(ex.Message, true, ex);
}
// if we caught an exception because of a cancel, then just return null
if (mySqlException.IsQueryAborted)
return null;
if (mySqlException.IsFatal)
await Connection.CloseAsync(execAsync).ConfigureAwait(false);
if (mySqlException.Number == 0)
throw new MySqlException(Resources.FatalErrorDuringExecute, mySqlException);
throw;
}
else
{
throw;
}
}
finally
{
if (connection != null)
{
if (connection.Reader == null)
{
// Something went seriously wrong, and reader would not
// be able to clear timeout on closing.
// So we clear timeout here.
ClearCommandTimer();
}
if (!success)
{
// ExecuteReader failed.Close Reader and set to null to
// prevent subsequent errors with DataReaderOpen
await ResetReaderAsync(execAsync).ConfigureAwait(false);
}
}
semaphoreSlim.Release();
}
}