in System.Data/fx/src/data/System/Data/SqlClient/SqlDataReader.cs [3132:4789]
private bool TryReadInternal(bool setTimeout, out bool more) {
SqlStatistics statistics = null;
IntPtr hscp;
Bid.ScopeEnter(out hscp, "<sc.SqlDataReader.Read|API> %d#", ObjectID);
RuntimeHelpers.PrepareConstrainedRegions();
try {
#if DEBUG
TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
RuntimeHelpers.PrepareConstrainedRegions();
try {
tdsReliabilitySection.Start();
#else
{
#endif //DEBUG
statistics = SqlStatistics.StartTimer(Statistics);
if (null != _parser) {
if (setTimeout) {
SetTimeout(_defaultTimeoutMilliseconds);
}
if (_sharedState._dataReady) {
if (!TryCleanPartialRead()) {
more = false;
return false;
}
}
// clear out our buffers
SqlBuffer.Clear(_data);
_sharedState._nextColumnHeaderToRead = 0;
_sharedState._nextColumnDataToRead = 0;
_sharedState._columnDataBytesRemaining = -1; // unknown
_lastColumnWithDataChunkRead = -1;
if (!_haltRead) {
bool moreRows;
if (!TryHasMoreRows(out moreRows)) {
more = false;
return false;
}
if (moreRows) {
// read the row from the backend (unless it's an altrow were the marker is already inside the altrow ...)
while (_stateObj._pendingData) {
if (_altRowStatus != ALTROWSTATUS.AltRow) {
// if this is an ordinary row we let the run method consume the ROW token
if (!_parser.TryRun(RunBehavior.ReturnImmediately, _command, this, null, _stateObj, out _sharedState._dataReady)) {
more = false;
return false;
}
if (_sharedState._dataReady) {
break;
}
}
else {
// ALTROW token and AltrowId are already consumed ...
Debug.Assert (_altRowStatus == ALTROWSTATUS.AltRow, "invalid AltRowStatus");
_altRowStatus = ALTROWSTATUS.Done;
_sharedState._dataReady = true;
break;
}
}
if (_sharedState._dataReady) {
_haltRead = IsCommandBehavior(CommandBehavior.SingleRow);
more = true;
return true;
}
}
if (!_stateObj._pendingData) {
if (!TryCloseInternal(false /*closeReader*/)) {
more = false;
return false;
}
}
}
else {
// if we did not get a row and halt is true, clean off rows of result
// success must be false - or else we could have just read off row and set
// halt to true
bool moreRows;
if (!TryHasMoreRows(out moreRows)) {
more = false;
return false;
}
while (moreRows) {
// if we are in SingleRow mode, and we've read the first row,
// read the rest of the rows, if any
while (_stateObj._pendingData && !_sharedState._dataReady) {
if (!_parser.TryRun(RunBehavior.ReturnImmediately, _command, this, null, _stateObj, out _sharedState._dataReady)) {
more = false;
return false;
}
}
if (_sharedState._dataReady) {
if (!TryCleanPartialRead()) {
more = false;
return false;
}
}
// clear out our buffers
SqlBuffer.Clear(_data);
_sharedState._nextColumnHeaderToRead = 0;
if (!TryHasMoreRows(out moreRows)) {
more = false;
return false;
}
}
// reset haltRead
_haltRead = false;
}
}
else if (IsClosed) {
throw ADP.DataReaderClosed("Read");
}
more = false;
#if DEBUG
if ((!_sharedState._dataReady) && (_stateObj._pendingData)) {
byte token;
if (!_stateObj.TryPeekByte(out token)) {
return false;
}
Debug.Assert(TdsParser.IsValidTdsToken(token), string.Format("DataReady is false, but next token is invalid: {0,-2:X2}", token));
}
#endif
return true;
}
#if DEBUG
finally {
tdsReliabilitySection.Stop();
}
#endif //DEBUG
}
catch (System.OutOfMemoryException e) {
_isClosed = true;
SqlConnection con = _connection;
if (con != null) {
con.Abort(e);
}
throw;
}
catch (System.StackOverflowException e) {
_isClosed = true;
SqlConnection con = _connection;
if (con != null) {
con.Abort(e);
}
throw;
}
catch (System.Threading.ThreadAbortException e) {
_isClosed = true;
SqlConnection con = _connection;
if (con != null) {
con.Abort(e);
}
throw;
}
finally {
SqlStatistics.StopTimer(statistics);
Bid.ScopeLeave(ref hscp);
}
}
private void ReadColumn(int i, bool setTimeout = true, bool allowPartiallyReadColumn = false) {
if (_currentTask != null) {
throw ADP.AsyncOperationPending();
}
Debug.Assert(_stateObj == null || _stateObj._syncOverAsync, "Should not attempt pends in a synchronous call");
bool result = TryReadColumn(i, setTimeout, allowPartiallyReadColumn);
if (!result) { throw SQL.SynchronousCallMayNotPend(); }
}
private bool TryReadColumn(int i, bool setTimeout, bool allowPartiallyReadColumn = false) {
CheckDataIsReady(columnIndex: i, permitAsync: true, allowPartiallyReadColumn: allowPartiallyReadColumn);
RuntimeHelpers.PrepareConstrainedRegions();
try {
#if DEBUG
TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
RuntimeHelpers.PrepareConstrainedRegions();
try {
tdsReliabilitySection.Start();
#else
{
#endif //DEBUG
Debug.Assert(_sharedState._nextColumnHeaderToRead <= _metaData.Length, "_sharedState._nextColumnHeaderToRead too large");
Debug.Assert(_sharedState._nextColumnDataToRead <= _metaData.Length, "_sharedState._nextColumnDataToRead too large");
if (setTimeout) {
SetTimeout(_defaultTimeoutMilliseconds);
}
if (!TryReadColumnInternal(i, readHeaderOnly: false)) {
return false;
}
Debug.Assert(null != _data[i], " data buffer is null?");
}
#if DEBUG
finally {
tdsReliabilitySection.Stop();
}
#endif //DEBUG
}
catch (System.OutOfMemoryException e) {
_isClosed = true;
if (null != _connection) {
_connection.Abort(e);
}
throw;
}
catch (System.StackOverflowException e) {
_isClosed = true;
if (null != _connection) {
_connection.Abort(e);
}
throw;
}
catch (System.Threading.ThreadAbortException e) {
_isClosed = true;
if (null != _connection) {
_connection.Abort(e);
}
throw;
}
return true;
}
private bool TryReadColumnData() {
// If we've already read the value (because it was NULL) we don't
// bother to read here.
if (!_data[_sharedState._nextColumnDataToRead].IsNull) {
_SqlMetaData columnMetaData = _metaData[_sharedState._nextColumnDataToRead];
if (!_parser.TryReadSqlValue(_data[_sharedState._nextColumnDataToRead], columnMetaData, (int)_sharedState._columnDataBytesRemaining, _stateObj,
_command != null ? _command.ColumnEncryptionSetting : SqlCommandColumnEncryptionSetting.UseConnectionSetting,
columnMetaData.column)) { // will read UDTs as VARBINARY.
return false;
}
_sharedState._columnDataBytesRemaining = 0;
}
_sharedState._nextColumnDataToRead++;
return true;
}
private void ReadColumnHeader(int i) {
Debug.Assert(_stateObj == null || _stateObj._syncOverAsync, "Should not attempt pends in a synchronous call");
bool result = TryReadColumnHeader(i);
if (!result) { throw SQL.SynchronousCallMayNotPend(); }
}
private bool TryReadColumnHeader(int i) {
if (!_sharedState._dataReady) {
throw SQL.InvalidRead();
}
RuntimeHelpers.PrepareConstrainedRegions();
try {
#if DEBUG
TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
RuntimeHelpers.PrepareConstrainedRegions();
try {
tdsReliabilitySection.Start();
#endif //DEBUG
return TryReadColumnInternal(i, readHeaderOnly: true);
#if DEBUG
}
finally {
tdsReliabilitySection.Stop();
}
#endif //DEBUG
}
catch (System.OutOfMemoryException e) {
_isClosed = true;
if (null != _connection) {
_connection.Abort(e);
}
throw;
}
catch (System.StackOverflowException e) {
_isClosed = true;
if (null != _connection) {
_connection.Abort(e);
}
throw;
}
catch (System.Threading.ThreadAbortException e) {
_isClosed = true;
if (null != _connection) {
_connection.Abort(e);
}
throw;
}
}
private bool TryReadColumnInternal(int i, bool readHeaderOnly = false) {
AssertReaderState(requireData: true, permitAsync: true, columnIndex: i);
// Check if we've already read the header already
if (i < _sharedState._nextColumnHeaderToRead) {
// Read the header, but we need to read the data
if ((i == _sharedState._nextColumnDataToRead) && (!readHeaderOnly)) {
return TryReadColumnData();
}
// Else we've already read the data, or we're reading the header only
else {
// Ensure that, if we've read past the column, then we did store its data
Debug.Assert(i == _sharedState._nextColumnDataToRead || // Either we haven't read the column yet
((i + 1 < _sharedState._nextColumnDataToRead) && (IsCommandBehavior(CommandBehavior.SequentialAccess))) || // Or we're in sequential mode and we've read way past the column (i.e. it was not the last column we read)
(!_data[i].IsEmpty || _data[i].IsNull) || // Or we should have data stored for the column (unless the column was null)
(_metaData[i].type == SqlDbType.Timestamp), // Or Dev11 Bug #336820, Dev10 Bug #479607 (SqlClient: IsDBNull always returns false for timestamp datatype)
// Due to a bug in TdsParser.GetNullSqlValue, Timestamps' IsNull is not correctly set - so we need to bypass the check
"Gone past column, be we have no data stored for it");
return true;
}
}
Debug.Assert(_data[i].IsEmpty || _data[i].IsNull, "re-reading column value?");
// If we're in sequential access mode, we can safely clear out any
// data from the previous column.
bool isSequentialAccess = IsCommandBehavior(CommandBehavior.SequentialAccess);
if (isSequentialAccess) {
if (0 < _sharedState._nextColumnDataToRead) {
_data[_sharedState._nextColumnDataToRead - 1].Clear();
}
// Only wipe out the blob objects if they aren't for a 'future' column (i.e. we haven't read up to them yet)
if ((_lastColumnWithDataChunkRead > -1) && (i > _lastColumnWithDataChunkRead)) {
CloseActiveSequentialStreamAndTextReader();
}
}
else if (_sharedState._nextColumnDataToRead < _sharedState._nextColumnHeaderToRead) {
// We read the header but not the column for the previous column
if (!TryReadColumnData()) {
return false;
}
Debug.Assert(_sharedState._nextColumnDataToRead == _sharedState._nextColumnHeaderToRead);
}
// if we still have bytes left from the previous blob read, clear the wire and reset
if (!TryResetBlobState()) {
return false;
}
do {
_SqlMetaData columnMetaData = _metaData[_sharedState._nextColumnHeaderToRead];
if ((isSequentialAccess) && (_sharedState._nextColumnHeaderToRead < i)) {
// SkipValue is no-op if the column appears in NBC bitmask
// if not, it skips regular and PLP types
if (!_parser.TrySkipValue(columnMetaData, _sharedState._nextColumnHeaderToRead, _stateObj)) {
return false;
}
_sharedState._nextColumnDataToRead = _sharedState._nextColumnHeaderToRead;
_sharedState._nextColumnHeaderToRead++;
}
else {
bool isNull;
ulong dataLength;
if (!_parser.TryProcessColumnHeader(columnMetaData, _stateObj, _sharedState._nextColumnHeaderToRead, out isNull, out dataLength)) {
return false;
}
_sharedState._nextColumnDataToRead = _sharedState._nextColumnHeaderToRead;
_sharedState._nextColumnHeaderToRead++; // We read this one
if (isNull && columnMetaData.type != SqlDbType.Timestamp /* Maintain behavior for known bug (Dev10 479607) rejected as breaking change - See comments in GetNullSqlValue for timestamp */)
{
TdsParser.GetNullSqlValue(_data[_sharedState._nextColumnDataToRead],
columnMetaData,
_command != null ? _command.ColumnEncryptionSetting : SqlCommandColumnEncryptionSetting.UseConnectionSetting,
_parser.Connection);
if (!readHeaderOnly) {
_sharedState._nextColumnDataToRead++;
}
}
else {
if ((i > _sharedState._nextColumnDataToRead) || (!readHeaderOnly)) {
// If we're not in sequential access mode, we have to
// save the data we skip over so that the consumer
// can read it out of order
if (!_parser.TryReadSqlValue(_data[_sharedState._nextColumnDataToRead], columnMetaData, (int)dataLength, _stateObj,
_command != null ? _command.ColumnEncryptionSetting : SqlCommandColumnEncryptionSetting.UseConnectionSetting,
columnMetaData.column)) { // will read UDTs as VARBINARY.
return false;
}
_sharedState._nextColumnDataToRead++;
}
else {
_sharedState._columnDataBytesRemaining = (long)dataLength;
}
}
}
if (_snapshot != null) {
// reset snapshot to save memory use. We can safely do that here because all SqlDataReader values are stable.
// The retry logic can use the current values to get back to the right state.
_snapshot = null;
PrepareAsyncInvocation(useSnapshot: true);
}
} while (_sharedState._nextColumnHeaderToRead <= i);
return true;
}
// Estimates if there is enough data available to read the number of columns requested
private bool WillHaveEnoughData(int targetColumn, bool headerOnly = false) {
AssertReaderState(requireData: true, permitAsync: true, columnIndex: targetColumn);
if ((_lastColumnWithDataChunkRead == _sharedState._nextColumnDataToRead) && (_metaData[_lastColumnWithDataChunkRead].metaType.IsPlp)) {
// In the middle of reading a Plp - no idea how much is left
return false;
}
int bytesRemaining = Math.Min(checked(_stateObj._inBytesRead - _stateObj._inBytesUsed), _stateObj._inBytesPacket);
// There are some parts of our code that peeks at the next token after doing its read
// So we will make sure that there is always a spare byte for it to look at
bytesRemaining--;
if ((targetColumn >= _sharedState._nextColumnDataToRead) && (_sharedState._nextColumnDataToRead < _sharedState._nextColumnHeaderToRead)) {
if (_sharedState._columnDataBytesRemaining > bytesRemaining) {
// The current column needs more data than we currently have
// NOTE: Since the Long data types (TEXT, IMAGE, NTEXT) can have a size of Int32.MaxValue we cannot simply subtract
// _columnDataBytesRemaining from bytesRemaining and then compare it to zero as this may lead to an overflow
return false;
}
else {
// Already read the header, so subtract actual data size
bytesRemaining = checked(bytesRemaining - (int)_sharedState._columnDataBytesRemaining);
}
}
// For each column that we need to read, subtract the size of its header and the size of its data
int currentColumn = _sharedState._nextColumnHeaderToRead;
while ((bytesRemaining >= 0) && (currentColumn <= targetColumn)) {
// Check NBC first
if (!_stateObj.IsNullCompressionBitSet(currentColumn)) {
// NOTE: This is mostly duplicated from TryProcessColumnHeaderNoNBC and TryGetTokenLength
var metaType = _metaData[currentColumn].metaType;
if ((metaType.IsLong) || (metaType.IsPlp) || (metaType.SqlDbType == SqlDbType.Udt) || (metaType.SqlDbType == SqlDbType.Structured)) {
// Plp, Udt and TVP types have an unknownable size - so return that the estimate failed
return false;
}
int maxHeaderSize;
byte typeAndMask = (byte)(_metaData[currentColumn].tdsType & TdsEnums.SQLLenMask);
if ((typeAndMask == TdsEnums.SQLVarLen) || (typeAndMask == TdsEnums.SQLVarCnt)) {
if (0 != (_metaData[currentColumn].tdsType & 0x80)) {
// UInt16 represents size
maxHeaderSize = 2;
}
else if (0 == (_metaData[currentColumn].tdsType & 0x0c)) {
// UInt32 represents size
maxHeaderSize = 4;
}
else {
// Byte represents size
maxHeaderSize = 1;
}
}
else
{
maxHeaderSize = 0;
}
bytesRemaining = checked(bytesRemaining - maxHeaderSize);
if ((currentColumn < targetColumn) || (!headerOnly)) {
bytesRemaining = checked(bytesRemaining - _metaData[currentColumn].length);
}
}
currentColumn++;
}
return (bytesRemaining >= 0);
}
// clean remainder bytes for the column off the wire
private bool TryResetBlobState() {
Debug.Assert(null != _stateObj, "null state object"); // _parser may be null at this point
AssertReaderState(requireData: true, permitAsync: true);
Debug.Assert(_sharedState._nextColumnHeaderToRead <= _metaData.Length, "_sharedState._nextColumnHeaderToRead too large");
// If we haven't already entirely read the column
if (_sharedState._nextColumnDataToRead < _sharedState._nextColumnHeaderToRead) {
if ((_sharedState._nextColumnHeaderToRead > 0) && (_metaData[_sharedState._nextColumnHeaderToRead - 1].metaType.IsPlp)) {
if (_stateObj._longlen != 0) {
ulong ignored;
if (!_stateObj.Parser.TrySkipPlpValue(UInt64.MaxValue, _stateObj, out ignored)) {
return false;
}
}
if (_streamingXml != null) {
SqlStreamingXml localSXml = _streamingXml;
_streamingXml = null;
localSXml.Close();
}
}
else if (0 < _sharedState._columnDataBytesRemaining) {
if (!_stateObj.TrySkipLongBytes(_sharedState._columnDataBytesRemaining)) {
return false;
}
}
}
#if DEBUG
else {
Debug.Assert((_sharedState._columnDataBytesRemaining == 0 || _sharedState._columnDataBytesRemaining == -1) && _stateObj._longlen == 0, "Haven't read header yet, but column is partially read?");
}
#endif
_sharedState._columnDataBytesRemaining = 0;
_columnDataBytesRead = 0;
_columnDataCharsRead = 0;
_columnDataChars = null;
_columnDataCharsIndex = -1;
_stateObj._plpdecoder = null;
return true;
}
private void CloseActiveSequentialStreamAndTextReader() {
if (_currentStream != null) {
_currentStream.SetClosed();
_currentStream = null;
}
if (_currentTextReader != null) {
_currentTextReader.SetClosed();
_currentStream = null;
}
}
private void RestoreServerSettings(TdsParser parser, TdsParserStateObject stateObj) {
// turn off any set options
if (null != parser && null != _resetOptionsString) {
// It is possible for this to be called during connection close on a
// broken connection, so check state first.
if (parser.State == TdsParserState.OpenLoggedIn) {
Bid.CorrelationTrace("<sc.SqlDataReader.RestoreServerSettings|Info|Correlation> ObjectID%d#, ActivityID %ls\n", ObjectID);
Task executeTask = parser.TdsExecuteSQLBatch(_resetOptionsString, (_command != null) ? _command.CommandTimeout : 0, null, stateObj, sync: true);
Debug.Assert(executeTask == null, "Shouldn't get a task when doing sync writes");
// must execute this one synchronously as we can't retry
parser.Run(RunBehavior.UntilDone, _command, this, null, stateObj);
}
_resetOptionsString = null;
}
}
internal bool TrySetAltMetaDataSet(_SqlMetaDataSet metaDataSet, bool metaDataConsumed) {
if (_altMetaDataSetCollection == null) {
_altMetaDataSetCollection = new _SqlMetaDataSetCollection();
}
else if (_snapshot != null && object.ReferenceEquals(_snapshot._altMetaDataSetCollection, _altMetaDataSetCollection)) {
_altMetaDataSetCollection = (_SqlMetaDataSetCollection)_altMetaDataSetCollection.Clone();
}
_altMetaDataSetCollection.SetAltMetaData(metaDataSet);
_metaDataConsumed = metaDataConsumed;
if (_metaDataConsumed && null != _parser) {
byte b;
if (!_stateObj.TryPeekByte(out b)) {
return false;
}
if (TdsEnums.SQLORDER == b) {
bool ignored;
if (!_parser.TryRun(RunBehavior.ReturnImmediately, _command, this, null, _stateObj, out ignored)) {
return false;
}
if (!_stateObj.TryPeekByte(out b)) {
return false;
}
}
if (b == TdsEnums.SQLINFO) {
try {
_stateObj._accumulateInfoEvents = true;
bool ignored;
if (!_parser.TryRun(RunBehavior.ReturnImmediately, _command, null, null, _stateObj, out ignored)) {
return false;
}
}
finally {
_stateObj._accumulateInfoEvents = false;
}
if (!_stateObj.TryPeekByte(out b)) {
return false;
}
}
_hasRows = IsRowToken(b);
}
if (metaDataSet != null) {
if (_data == null || _data.Length<metaDataSet.Length) {
_data = SqlBuffer.CreateBufferArray(metaDataSet.Length);
}
}
return true;
}
private void ClearMetaData() {
_metaData = null;
_tableNames = null;
_fieldNameLookup = null;
_metaDataConsumed = false;
_browseModeInfoConsumed = false;
}
internal bool TrySetMetaData(_SqlMetaDataSet metaData, bool moreInfo) {
_metaData = metaData;
// get rid of cached metadata info as well
_tableNames = null;
if (_metaData != null) {
_metaData.schemaTable = null;
_data = SqlBuffer.CreateBufferArray(metaData.Length);
}
_fieldNameLookup = null;
if (null != metaData) {
// we are done consuming metadata only if there is no moreInfo
if (!moreInfo) {
_metaDataConsumed = true;
if (_parser != null) { // There is a valid case where parser is null
// Peek, and if row token present, set _hasRows true since there is a
// row in the result
byte b;
if (!_stateObj.TryPeekByte(out b)) {
return false;
}
//
// simply rip the order token off the wire
if (b == TdsEnums.SQLORDER) { // same logic as SetAltMetaDataSet
// Devnote: That's not the right place to process TDS
// Can this result in Reentrance to Run?
//
bool ignored;
if (!_parser.TryRun(RunBehavior.ReturnImmediately, null, null, null, _stateObj, out ignored)) {
return false;
}
if (!_stateObj.TryPeekByte(out b)) {
return false;
}
}
if (b == TdsEnums.SQLINFO)
{
// VSTFDEVDIV713926
// We are accumulating informational events and fire them at next
// TdsParser.Run purely to avoid breaking change
try {
_stateObj._accumulateInfoEvents = true;
bool ignored;
if (!_parser.TryRun(RunBehavior.ReturnImmediately, null, null, null, _stateObj, out ignored)) {
return false;
}
}
finally {
_stateObj._accumulateInfoEvents = false;
}
if (!_stateObj.TryPeekByte(out b)) {
return false;
}
}
_hasRows = IsRowToken(b);
if (TdsEnums.SQLALTMETADATA == b)
{
_metaDataConsumed = false;
}
}
}
}
else {
_metaDataConsumed = false;
}
_browseModeInfoConsumed = false;
return true;
}
private void SetTimeout(long timeoutMilliseconds) {
// WebData 111653,112003 -- we now set timeouts per operation, not
// per command (it's not supposed to be a cumulative per command).
TdsParserStateObject stateObj = _stateObj;
if (null != stateObj) {
stateObj.SetTimeoutMilliseconds(timeoutMilliseconds);
}
}
private bool HasActiveStreamOrTextReaderOnColumn(int columnIndex) {
bool active = false;
active |= ((_currentStream != null) && (_currentStream.ColumnIndex == columnIndex));
active |= ((_currentTextReader != null) && (_currentTextReader.ColumnIndex == columnIndex));
return active;
}
private void CheckMetaDataIsReady() {
if (_currentTask != null) {
throw ADP.AsyncOperationPending();
}
if (MetaData == null) {
throw SQL.InvalidRead();
}
}
private void CheckMetaDataIsReady(int columnIndex, bool permitAsync = false) {
if ((!permitAsync) && (_currentTask != null)) {
throw ADP.AsyncOperationPending();
}
if (MetaData == null) {
throw SQL.InvalidRead();
}
if ((columnIndex < 0) || (columnIndex >= _metaData.Length)) {
throw ADP.IndexOutOfRange();
}
}
private void CheckDataIsReady() {
if (_currentTask != null) {
throw ADP.AsyncOperationPending();
}
Debug.Assert(!_sharedState._dataReady || _metaData != null, "Data is ready, but there is no metadata?");
if ((!_sharedState._dataReady) || (_metaData == null)) {
throw SQL.InvalidRead();
}
}
private void CheckHeaderIsReady(int columnIndex, bool permitAsync = false, string methodName = null) {
if (_isClosed) {
throw ADP.DataReaderClosed(methodName ?? "CheckHeaderIsReady");
}
if ((!permitAsync) && (_currentTask != null)) {
throw ADP.AsyncOperationPending();
}
Debug.Assert(!_sharedState._dataReady || _metaData != null, "Data is ready, but there is no metadata?");
if ((!_sharedState._dataReady) || (_metaData == null)) {
throw SQL.InvalidRead();
}
if ((columnIndex < 0) || (columnIndex >= _metaData.Length)) {
throw ADP.IndexOutOfRange();
}
if ((IsCommandBehavior(CommandBehavior.SequentialAccess)) && // Only for sequential access
((_sharedState._nextColumnHeaderToRead > columnIndex + 1) || (_lastColumnWithDataChunkRead > columnIndex))) { // Read past column
throw ADP.NonSequentialColumnAccess(columnIndex, Math.Max(_sharedState._nextColumnHeaderToRead - 1, _lastColumnWithDataChunkRead));
}
}
private void CheckDataIsReady(int columnIndex, bool allowPartiallyReadColumn = false, bool permitAsync = false, string methodName = null) {
if (_isClosed) {
throw ADP.DataReaderClosed(methodName ?? "CheckDataIsReady");
}
if ((!permitAsync) && (_currentTask != null)) {
throw ADP.AsyncOperationPending();
}
Debug.Assert(!_sharedState._dataReady || _metaData != null, "Data is ready, but there is no metadata?");
if ((!_sharedState._dataReady) || (_metaData == null)) {
throw SQL.InvalidRead();
}
if ((columnIndex < 0) || (columnIndex >= _metaData.Length)) {
throw ADP.IndexOutOfRange();
}
if ((IsCommandBehavior(CommandBehavior.SequentialAccess)) && // Only for sequential access
((_sharedState._nextColumnDataToRead > columnIndex) || (_lastColumnWithDataChunkRead > columnIndex) || // Read past column
((!allowPartiallyReadColumn) && (_lastColumnWithDataChunkRead == columnIndex)) || // Partially read column
((allowPartiallyReadColumn) && (HasActiveStreamOrTextReaderOnColumn(columnIndex))))) { // Has a Stream or TextReader on a partially-read column
throw ADP.NonSequentialColumnAccess(columnIndex, Math.Max(_sharedState._nextColumnDataToRead, _lastColumnWithDataChunkRead + 1));
}
}
[Conditional("DEBUG")]
private void AssertReaderState(bool requireData, bool permitAsync, int? columnIndex = null, bool enforceSequentialAccess = false) {
Debug.Assert(!_sharedState._dataReady || _metaData != null, "Data is ready, but there is no metadata?");
Debug.Assert(permitAsync || _currentTask == null, "Call while async operation is pending");
Debug.Assert(_metaData != null, "_metaData is null, check MetaData before calling this method");
Debug.Assert(!requireData || _sharedState._dataReady, "No data is ready to be read");
if (columnIndex.HasValue) {
Debug.Assert(columnIndex.Value >= 0 && columnIndex.Value < _metaData.Length, "Invalid column index");
Debug.Assert((!enforceSequentialAccess) || (!IsCommandBehavior(CommandBehavior.SequentialAccess)) || ((_sharedState._nextColumnDataToRead <= columnIndex) && (_lastColumnWithDataChunkRead <= columnIndex)), "Already read past column");
}
}
public override Task<bool> NextResultAsync(CancellationToken cancellationToken) {
IntPtr hscp;
Bid.ScopeEnter(out hscp, "<sc.SqlDataReader.NextResultAsync|API> %d#", ObjectID);
try {
TaskCompletionSource<bool> source = new TaskCompletionSource<bool>();
if (IsClosed) {
source.SetException(ADP.ExceptionWithStackTrace(ADP.DataReaderClosed("NextResultAsync")));
return source.Task;
}
IDisposable registration = null;
if (cancellationToken.CanBeCanceled) {
if (cancellationToken.IsCancellationRequested) {
source.SetCanceled();
return source.Task;
}
registration = cancellationToken.Register(_command.CancelIgnoreFailure);
}
Task original = Interlocked.CompareExchange(ref _currentTask, source.Task, null);
if (original != null) {
source.SetException(ADP.ExceptionWithStackTrace(SQL.PendingBeginXXXExists()));
return source.Task;
}
// Check if cancellation due to close is requested (this needs to be done after setting _currentTask)
if (_cancelAsyncOnCloseToken.IsCancellationRequested) {
source.SetCanceled();
_currentTask = null;
return source.Task;
}
PrepareAsyncInvocation(useSnapshot: true);
Func<Task, Task<bool>> moreFunc = null;
moreFunc = (t) => {
if (t != null) {
Bid.Trace("<sc.SqlDataReader.NextResultAsync> attempt retry %d#\n", ObjectID);
PrepareForAsyncContinuation();
}
bool more;
if (TryNextResult(out more)) {
// completed
return more ? ADP.TrueTask : ADP.FalseTask;
}
return ContinueRetryable(moreFunc);
};
return InvokeRetryable(moreFunc, source, registration);
}
finally {
Bid.ScopeLeave(ref hscp);
}
}
// NOTE: This will return null if it completed sequentially
// If this returns null, then you can use bytesRead to see how many bytes were read - otherwise bytesRead should be ignored
internal Task<int> GetBytesAsync(int i, byte[] buffer, int index, int length, int timeout, CancellationToken cancellationToken, out int bytesRead) {
AssertReaderState(requireData: true, permitAsync: true, columnIndex: i, enforceSequentialAccess: true);
Debug.Assert(IsCommandBehavior(CommandBehavior.SequentialAccess));
bytesRead = 0;
if (IsClosed) {
TaskCompletionSource<int> source = new TaskCompletionSource<int>();
source.SetException(ADP.ExceptionWithStackTrace(ADP.DataReaderClosed("GetBytesAsync")));
return source.Task;
}
if (_currentTask != null) {
TaskCompletionSource<int> source = new TaskCompletionSource<int>();
source.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
return source.Task;
}
if (cancellationToken.CanBeCanceled) {
if (cancellationToken.IsCancellationRequested) {
return null;
}
}
// Check if we need to skip columns
Debug.Assert(_sharedState._nextColumnDataToRead <= _lastColumnWithDataChunkRead, "Non sequential access");
if ((_sharedState._nextColumnHeaderToRead <= _lastColumnWithDataChunkRead) || (_sharedState._nextColumnDataToRead < _lastColumnWithDataChunkRead)) {
TaskCompletionSource<int> source = new TaskCompletionSource<int>();
Task original = Interlocked.CompareExchange(ref _currentTask, source.Task, null);
if (original != null) {
source.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
return source.Task;
}
PrepareAsyncInvocation(useSnapshot: true);
Func<Task, Task<int>> moreFunc = null;
// Timeout
CancellationToken timeoutToken = CancellationToken.None;
CancellationTokenSource timeoutCancellationSource = null;
if (timeout > 0) {
timeoutCancellationSource = new CancellationTokenSource();
timeoutCancellationSource.CancelAfter(timeout);
timeoutToken = timeoutCancellationSource.Token;
}
moreFunc = (t) => {
if (t != null) {
Bid.Trace("<sc.SqlDataReader.GetBytesAsync> attempt retry %d#\n", ObjectID);
PrepareForAsyncContinuation();
}
// Prepare for stateObj timeout
SetTimeout(_defaultTimeoutMilliseconds);
if (TryReadColumnHeader(i)) {
// Only once we have read upto where we need to be can we check the cancellation tokens (otherwise we will be in an unknown state)
if (cancellationToken.IsCancellationRequested) {
// User requested cancellation
return ADP.CreatedTaskWithCancellation<int>();
}
else if (timeoutToken.IsCancellationRequested) {
// Timeout
return ADP.CreatedTaskWithException<int>(ADP.ExceptionWithStackTrace(ADP.IO(SQLMessage.Timeout())));
}
else {
// Upto the correct column - continue to read
SwitchToAsyncWithoutSnapshot();
int totalBytesRead;
var readTask = GetBytesAsyncReadDataStage(i, buffer, index, length, timeout, true, cancellationToken, timeoutToken, out totalBytesRead);
if (readTask == null) {
// Completed synchronously
return Task.FromResult<int>(totalBytesRead);
}
else {
return readTask;
}
}
}
else {
return ContinueRetryable(moreFunc);
}
};
return InvokeRetryable(moreFunc, source, timeoutCancellationSource);
}
else {
// We're already at the correct column, just read the data
// Switch to async
PrepareAsyncInvocation(useSnapshot: false);
try {
return GetBytesAsyncReadDataStage(i, buffer, index, length, timeout, false, cancellationToken, CancellationToken.None, out bytesRead);
}
catch {
CleanupAfterAsyncInvocation();
throw;
}
}
}
private Task<int> GetBytesAsyncReadDataStage(int i, byte[] buffer, int index, int length, int timeout, bool isContinuation, CancellationToken cancellationToken, CancellationToken timeoutToken, out int bytesRead) {
_lastColumnWithDataChunkRead = i;
TaskCompletionSource<int> source = null;
CancellationTokenSource timeoutCancellationSource = null;
// Prepare for stateObj timeout
SetTimeout(_defaultTimeoutMilliseconds);
// Try to read without any continuations (all the data may already be in the stateObj's buffer)
if (!TryGetBytesInternalSequential(i, buffer, index, length, out bytesRead)) {
// This will be the 'state' for the callback
int totalBytesRead = bytesRead;
if (!isContinuation) {
// This is the first async operation which is happening - setup the _currentTask and timeout
source = new TaskCompletionSource<int>();
Task original = Interlocked.CompareExchange(ref _currentTask, source.Task, null);
if (original != null) {
source.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
return source.Task;
}
// Check if cancellation due to close is requested (this needs to be done after setting _currentTask)
if (_cancelAsyncOnCloseToken.IsCancellationRequested) {
source.SetCanceled();
_currentTask = null;
return source.Task;
}
// Timeout
Debug.Assert(timeoutToken == CancellationToken.None, "TimeoutToken is set when GetBytesAsyncReadDataStage is not a continuation");
if (timeout > 0) {
timeoutCancellationSource = new CancellationTokenSource();
timeoutCancellationSource.CancelAfter(timeout);
timeoutToken = timeoutCancellationSource.Token;
}
}
Func<Task, Task<int>> moreFunc = null;
moreFunc = (_ => {
PrepareForAsyncContinuation();
if (cancellationToken.IsCancellationRequested) {
// User requested cancellation
return ADP.CreatedTaskWithCancellation<int>();
}
else if (timeoutToken.IsCancellationRequested) {
// Timeout
return ADP.CreatedTaskWithException<int>(ADP.ExceptionWithStackTrace(ADP.IO(SQLMessage.Timeout())));
}
else {
// Prepare for stateObj timeout
SetTimeout(_defaultTimeoutMilliseconds);
int bytesReadThisIteration;
bool result = TryGetBytesInternalSequential(i, buffer, index + totalBytesRead, length - totalBytesRead, out bytesReadThisIteration);
totalBytesRead += bytesReadThisIteration;
Debug.Assert(totalBytesRead <= length, "Read more bytes than required");
if (result) {
return Task.FromResult<int>(totalBytesRead);
}
else {
return ContinueRetryable(moreFunc);
}
}
});
Task<int> retryTask = ContinueRetryable(moreFunc);
if (isContinuation) {
// Let the caller handle cleanup\completing
return retryTask;
}
else {
// setup for cleanup\completing
retryTask.ContinueWith((t) => CompleteRetryable(t, source, timeoutCancellationSource), TaskScheduler.Default);
return source.Task;
}
}
if (!isContinuation) {
// If this is the first async op, we need to cleanup
CleanupAfterAsyncInvocation();
}
// Completed synchronously, return null
return null;
}
public override Task<bool> ReadAsync(CancellationToken cancellationToken) {
IntPtr hscp;
Bid.ScopeEnter(out hscp, "<sc.SqlDataReader.ReadAsync|API> %d#", ObjectID);
try {
if (IsClosed) {
return ADP.CreatedTaskWithException<bool>(ADP.ExceptionWithStackTrace(ADP.DataReaderClosed("ReadAsync")));
}
// If user's token is canceled, return a canceled task
if (cancellationToken.IsCancellationRequested) {
return ADP.CreatedTaskWithCancellation<bool>();
}
// Check for existing async
if (_currentTask != null) {
return ADP.CreatedTaskWithException<bool>(ADP.ExceptionWithStackTrace(SQL.PendingBeginXXXExists()));
}
// These variables will be captured in moreFunc so that we can skip searching for a row token once one has been read
bool rowTokenRead = false;
bool more = false;
// Shortcut, do we have enough data to immediately do the ReadAsync?
try {
// First, check if we can finish reading the current row
// NOTE: If we are in SingleRow mode and we've read that single row (i.e. _haltRead == true), then skip the shortcut
if ((!_haltRead) && ((!_sharedState._dataReady) || (WillHaveEnoughData(_metaData.Length - 1)))) {
#if DEBUG
try {
_stateObj._shouldHaveEnoughData = true;
#endif
if (_sharedState._dataReady) {
// Clean off current row
CleanPartialReadReliable();
}
// If there a ROW token ready (as well as any metadata for the row)
if (_stateObj.IsRowTokenReady()) {
// Read the ROW token
bool result = TryReadInternal(true, out more);
Debug.Assert(result, "Should not have run out of data");
rowTokenRead = true;
if (more) {
// Sequential mode, nothing left to do
if (IsCommandBehavior(CommandBehavior.SequentialAccess)) {
return ADP.TrueTask;
}
// For non-sequential, check if we can read the row data now
else if (WillHaveEnoughData(_metaData.Length - 1)) {
// Read row data
result = TryReadColumn(_metaData.Length - 1, setTimeout: true);
Debug.Assert(result, "Should not have run out of data");
return ADP.TrueTask;
}
}
else {
// No data left, return
return ADP.FalseTask;
}
}
#if DEBUG
}
finally {
_stateObj._shouldHaveEnoughData = false;
}
#endif
}
}
catch (Exception ex) {
if (!ADP.IsCatchableExceptionType(ex)) {
throw;
}
return ADP.CreatedTaskWithException<bool>(ex);
}
TaskCompletionSource<bool> source = new TaskCompletionSource<bool>();
Task original = Interlocked.CompareExchange(ref _currentTask, source.Task, null);
if (original != null) {
source.SetException(ADP.ExceptionWithStackTrace(SQL.PendingBeginXXXExists()));
return source.Task;
}
// Check if cancellation due to close is requested (this needs to be done after setting _currentTask)
if (_cancelAsyncOnCloseToken.IsCancellationRequested) {
source.SetCanceled();
_currentTask = null;
return source.Task;
}
IDisposable registration = null;
if (cancellationToken.CanBeCanceled) {
registration = cancellationToken.Register(_command.CancelIgnoreFailure);
}
PrepareAsyncInvocation(useSnapshot: true);
Func<Task, Task<bool>> moreFunc = null;
moreFunc = (t) => {
if (t != null) {
Bid.Trace("<sc.SqlDataReader.ReadAsync> attempt retry %d#\n", ObjectID);
PrepareForAsyncContinuation();
}
if (rowTokenRead || TryReadInternal(true, out more)) {
// If there are no more rows, or this is Sequential Access, then we are done
if (!more || (_commandBehavior & CommandBehavior.SequentialAccess) == CommandBehavior.SequentialAccess) {
// completed
return more ? ADP.TrueTask : ADP.FalseTask;
}
else {
// First time reading the row token - update the snapshot
if (!rowTokenRead) {
rowTokenRead = true;
_snapshot = null;
PrepareAsyncInvocation(useSnapshot: true);
}
// if non-sequentialaccess then read entire row before returning
if (TryReadColumn(_metaData.Length - 1, true)) {
// completed
return ADP.TrueTask;
}
}
}
return ContinueRetryable(moreFunc);
};
return InvokeRetryable(moreFunc, source, registration);
}
finally {
Bid.ScopeLeave(ref hscp);
}
}
override public Task<bool> IsDBNullAsync(int i, CancellationToken cancellationToken) {
try {
CheckHeaderIsReady(columnIndex: i, methodName: "IsDBNullAsync");
}
catch (Exception ex) {
if (!ADP.IsCatchableExceptionType(ex)) {
throw;
}
return ADP.CreatedTaskWithException<bool>(ex);
}
// Shortcut - if there are no issues and the data is already read, then just return the value
if ((_sharedState._nextColumnHeaderToRead > i) && (!cancellationToken.IsCancellationRequested) && (_currentTask == null)) {
var data = _data;
if (data != null) {
return data[i].IsNull ? ADP.TrueTask : ADP.FalseTask;
}
else {
// Reader was closed between the CheckHeaderIsReady and accessing _data - throw closed exception
return ADP.CreatedTaskWithException<bool>(ADP.ExceptionWithStackTrace(ADP.DataReaderClosed("IsDBNullAsync")));
}
}
else {
// Throw if there is any current task
if (_currentTask != null) {
return ADP.CreatedTaskWithException<bool>(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
}
// If user's token is canceled, return a canceled task
if (cancellationToken.IsCancellationRequested) {
return ADP.CreatedTaskWithCancellation<bool>();
}
// Shortcut - if we have enough data, then run sync
try {
if (WillHaveEnoughData(i, headerOnly: true)) {
#if DEBUG
try {
_stateObj._shouldHaveEnoughData = true;
#endif
ReadColumnHeader(i);
return _data[i].IsNull ? ADP.TrueTask : ADP.FalseTask;
#if DEBUG
}
finally {
_stateObj._shouldHaveEnoughData = false;
}
#endif
}
}
catch (Exception ex) {
if (!ADP.IsCatchableExceptionType(ex)) {
throw;
}
return ADP.CreatedTaskWithException<bool>(ex);
}
// Setup and check for pending task
TaskCompletionSource<bool> source = new TaskCompletionSource<bool>();
Task original = Interlocked.CompareExchange(ref _currentTask, source.Task, null);
if (original != null) {
source.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
return source.Task;
}
// Check if cancellation due to close is requested (this needs to be done after setting _currentTask)
if (_cancelAsyncOnCloseToken.IsCancellationRequested) {
source.SetCanceled();
_currentTask = null;
return source.Task;
}
// Setup cancellations
IDisposable registration = null;
if (cancellationToken.CanBeCanceled) {
registration = cancellationToken.Register(_command.CancelIgnoreFailure);
}
// Setup async
PrepareAsyncInvocation(useSnapshot: true);
// Setup the retryable function
Func<Task, Task<bool>> moreFunc = null;
moreFunc = (t) => {
if (t != null) {
PrepareForAsyncContinuation();
}
if (TryReadColumnHeader(i)) {
return _data[i].IsNull ? ADP.TrueTask : ADP.FalseTask;
}
else {
return ContinueRetryable(moreFunc);
}
};
// Go!
return InvokeRetryable(moreFunc, source, registration);
}
}
override public Task<T> GetFieldValueAsync<T>(int i, CancellationToken cancellationToken) {
try {
CheckDataIsReady(columnIndex: i, methodName: "GetFieldValueAsync");
// Shortcut - if there are no issues and the data is already read, then just return the value
if ((!IsCommandBehavior(CommandBehavior.SequentialAccess)) && (_sharedState._nextColumnDataToRead > i) && (!cancellationToken.IsCancellationRequested) && (_currentTask == null)) {
var data = _data;
var metaData =_metaData;
if ((data != null) && (metaData != null)) {
return Task.FromResult<T>(GetFieldValueFromSqlBufferInternal<T>(data[i], metaData[i]));
}
else {
// Reader was closed between the CheckDataIsReady and accessing _data\_metaData - throw closed exception
return ADP.CreatedTaskWithException<T>(ADP.ExceptionWithStackTrace(ADP.DataReaderClosed("GetFieldValueAsync")));
}
}
} catch (Exception ex) {
if (!ADP.IsCatchableExceptionType(ex)) {
throw;
}
return ADP.CreatedTaskWithException<T>(ex);
}
// Throw if there is any current task
if (_currentTask != null) {
return ADP.CreatedTaskWithException<T>(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
}
// If user's token is canceled, return a canceled task
if (cancellationToken.IsCancellationRequested) {
return ADP.CreatedTaskWithCancellation<T>();
}
// Shortcut - if we have enough data, then run sync
try {
if (WillHaveEnoughData(i)) {
#if DEBUG
try {
_stateObj._shouldHaveEnoughData = true;
#endif
return Task.FromResult(GetFieldValueInternal<T>(i));
#if DEBUG
}
finally {
_stateObj._shouldHaveEnoughData = false;
}
#endif
}
}
catch (Exception ex) {
if (!ADP.IsCatchableExceptionType(ex)) {
throw;
}
return ADP.CreatedTaskWithException<T>(ex);
}
// Setup and check for pending task
TaskCompletionSource<T> source = new TaskCompletionSource<T>();
Task original = Interlocked.CompareExchange(ref _currentTask, source.Task, null);
if (original != null) {
source.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
return source.Task;
}
// Check if cancellation due to close is requested (this needs to be done after setting _currentTask)
if (_cancelAsyncOnCloseToken.IsCancellationRequested) {
source.SetCanceled();
_currentTask = null;
return source.Task;
}
// Setup cancellations
IDisposable registration = null;
if (cancellationToken.CanBeCanceled) {
registration = cancellationToken.Register(_command.CancelIgnoreFailure);
}
// Setup async
PrepareAsyncInvocation(useSnapshot: true);
// Setup the retryable function
Func<Task, Task<T>> moreFunc = null;
moreFunc = (t) => {
if (t != null) {
PrepareForAsyncContinuation();
}
if (TryReadColumn(i, setTimeout: false)) {
return Task.FromResult<T>(GetFieldValueFromSqlBufferInternal<T>(_data[i], _metaData[i]));
}
else {
return ContinueRetryable(moreFunc);
}
};
// Go!
return InvokeRetryable(moreFunc, source, registration);
}
#if DEBUG
internal void CompletePendingReadWithSuccess(bool resetForcePendingReadsToWait) {
var stateObj = _stateObj;
if (stateObj != null) {
stateObj.CompletePendingReadWithSuccess(resetForcePendingReadsToWait);
}
}
internal void CompletePendingReadWithFailure(int errorCode, bool resetForcePendingReadsToWait) {
var stateObj = _stateObj;
if (stateObj != null) {
stateObj.CompletePendingReadWithFailure(errorCode, resetForcePendingReadsToWait);
}
}
#endif
class Snapshot {
public bool _dataReady;
public bool _haltRead;
public bool _metaDataConsumed;
public bool _browseModeInfoConsumed;
public bool _hasRows;
public ALTROWSTATUS _altRowStatus;
public int _nextColumnDataToRead;
public int _nextColumnHeaderToRead;
public long _columnDataBytesRead;
public long _columnDataBytesRemaining;
public _SqlMetaDataSet _metadata;
public _SqlMetaDataSetCollection _altMetaDataSetCollection;
public MultiPartTableName[] _tableNames;
public SqlSequentialStream _currentStream;
public SqlSequentialTextReader _currentTextReader;
}
private Task<T> ContinueRetryable<T>(Func<Task, Task<T>> moreFunc) {
// _networkPacketTaskSource could be null if the connection was closed
// while an async invocation was outstanding.
TaskCompletionSource<object> completionSource = _stateObj._networkPacketTaskSource;
if (_cancelAsyncOnCloseToken.IsCancellationRequested || completionSource == null) {
// Cancellation requested due to datareader being closed
TaskCompletionSource<T> source = new TaskCompletionSource<T>();
source.TrySetException(ADP.ExceptionWithStackTrace(ADP.ClosedConnectionError()));
return source.Task;
}
else {
return completionSource.Task.ContinueWith((retryTask) => {
if (retryTask.IsFaulted) {
// Somehow the network task faulted - return the exception
TaskCompletionSource<T> exceptionSource = new TaskCompletionSource<T>();
exceptionSource.TrySetException(retryTask.Exception.InnerException);
return exceptionSource.Task;
}
else if (!_cancelAsyncOnCloseToken.IsCancellationRequested) {
TdsParserStateObject stateObj = _stateObj;
if (stateObj != null) {
// protect continuations against concurrent
// close and cancel
lock (stateObj) {
if (_stateObj != null) { // reader not closed while we waited for the lock
if (retryTask.IsCanceled) {
if (_parser != null) {
_parser.State = TdsParserState.Broken; // We failed to respond to attention, we have to quit!
_parser.Connection.BreakConnection();
_parser.ThrowExceptionAndWarning(_stateObj);
}
}
else {
if (!IsClosed) {
try {
return moreFunc(retryTask);
}
catch (Exception) {
CleanupAfterAsyncInvocation();
throw;
}
}
}
}
}
}
}
// if stateObj is null, or we closed the connection or the connection was already closed,
// then mark this operation as cancelled.
TaskCompletionSource<T> source = new TaskCompletionSource<T>();
source.SetException(ADP.ExceptionWithStackTrace(ADP.ClosedConnectionError()));
return source.Task;
}, TaskScheduler.Default).Unwrap();
}
}
private Task<T> InvokeRetryable<T>(Func<Task, Task<T>> moreFunc, TaskCompletionSource<T> source, IDisposable objectToDispose = null) {
try {
Task<T> task;
try {
task = moreFunc(null);
}
catch (Exception ex) {
task = ADP.CreatedTaskWithException<T>(ex);
}
if (task.IsCompleted) {
CompleteRetryable(task, source, objectToDispose);
}
else {
task.ContinueWith((t) => CompleteRetryable(t, source, objectToDispose), TaskScheduler.Default);
}
}
catch (AggregateException e) {
source.TrySetException(e.InnerException);
}
catch (Exception e) {
source.TrySetException(e);
}
// Fall through for exceptions\completing async
return source.Task;
}
private void CompleteRetryable<T>(Task<T> task, TaskCompletionSource<T> source, IDisposable objectToDispose) {
if (objectToDispose != null) {
objectToDispose.Dispose();
}
// If something has forced us to switch to SyncOverAsync mode while in an async task then we need to guarantee that we do the cleanup
// This avoids us replaying non-replayable data (such as DONE or ENV_CHANGE tokens)
var stateObj = _stateObj;
bool ignoreCloseToken = ((stateObj != null) && (stateObj._syncOverAsync));
CleanupAfterAsyncInvocation(ignoreCloseToken);
Task current = Interlocked.CompareExchange(ref _currentTask, null, source.Task);
Debug.Assert(current == source.Task, "Should not be able to change the _currentTask while an asynchronous operation is pending");
if (task.IsFaulted) {
Exception e = task.Exception.InnerException;
source.TrySetException(e);
}
else if (task.IsCanceled) {
source.TrySetCanceled();
}
else {
source.TrySetResult(task.Result);
}
}
private void PrepareAsyncInvocation(bool useSnapshot) {
// if there is already a snapshot, then the previous async command
// completed with exception or cancellation. We need to continue
// with the old snapshot.
if (useSnapshot) {
Debug.Assert(!_stateObj._asyncReadWithoutSnapshot, "Can't prepare async invocation with snapshot if doing async without snapshots");
if (_snapshot == null) {
_snapshot = new Snapshot {
_dataReady = _sharedState._dataReady,
_haltRead = _haltRead,
_metaDataConsumed = _metaDataConsumed,
_browseModeInfoConsumed = _browseModeInfoConsumed,
_hasRows = _hasRows,
_altRowStatus = _altRowStatus,
_nextColumnDataToRead = _sharedState._nextColumnDataToRead,
_nextColumnHeaderToRead = _sharedState._nextColumnHeaderToRead,
_columnDataBytesRead = _columnDataBytesRead,
_columnDataBytesRemaining = _sharedState._columnDataBytesRemaining,
// _metadata and _altaMetaDataSetCollection must be Cloned
// before they are updated
_metadata = _metaData,
_altMetaDataSetCollection = _altMetaDataSetCollection,
_tableNames = _tableNames,
_currentStream = _currentStream,
_currentTextReader = _currentTextReader,
};
_stateObj.SetSnapshot();
}
}
else {
Debug.Assert(_snapshot == null, "Can prepare async invocation without snapshot if there is currently a snapshot");
_stateObj._asyncReadWithoutSnapshot = true;
}
_stateObj._syncOverAsync = false;
_stateObj._executionContext = ExecutionContext.Capture();
}
private void CleanupAfterAsyncInvocation(bool ignoreCloseToken = false) {
var stateObj = _stateObj;
if (stateObj != null) {
// If close requested cancellation and we have a snapshot, then it will deal with cleaning up
// NOTE: There are some cases where we wish to ignore the close token, such as when we've read some data that is not replayable (e.g. DONE or ENV_CHANGE token)
if ((ignoreCloseToken) || (!_cancelAsyncOnCloseToken.IsCancellationRequested) || (stateObj._asyncReadWithoutSnapshot)) {
// Prevent race condition between the DataReader being closed (e.g. when another MARS thread has an error)
lock(stateObj) {
if (_stateObj != null) { // reader not closed while we waited for the lock
CleanupAfterAsyncInvocationInternal(_stateObj);
Debug.Assert(_snapshot == null && !_stateObj._asyncReadWithoutSnapshot, "Snapshot not null or async without snapshot still enabled after cleaning async state");
}
}
}
}
}
// This function is called directly if calling function already closed the reader, so _stateObj is null,
// in other cases parameterless version should be called
private void CleanupAfterAsyncInvocationInternal(TdsParserStateObject stateObj, bool resetNetworkPacketTaskSource = true)
{
if (resetNetworkPacketTaskSource) {
stateObj._networkPacketTaskSource = null;
}
stateObj.ResetSnapshot();
stateObj._syncOverAsync = true;
stateObj._executionContext = null;
stateObj._asyncReadWithoutSnapshot = false;
#if DEBUG
stateObj._permitReplayStackTraceToDiffer = false;
#endif
// We are setting this to null inside the if-statement because stateObj==null means that the reader hasn't been initialized or has been closed (either way _snapshot should already be null)
_snapshot = null;
}
private void PrepareForAsyncContinuation() {
Debug.Assert(((_snapshot != null) || (_stateObj._asyncReadWithoutSnapshot)), "Can not prepare for an async continuation if no async if setup");
if (_snapshot != null) {
_sharedState._dataReady = _snapshot._dataReady;
_haltRead = _snapshot._haltRead;
_metaDataConsumed = _snapshot._metaDataConsumed;
_browseModeInfoConsumed = _snapshot._browseModeInfoConsumed;
_hasRows = _snapshot._hasRows;
_altRowStatus = _snapshot._altRowStatus;
_sharedState._nextColumnDataToRead = _snapshot._nextColumnDataToRead;
_sharedState._nextColumnHeaderToRead = _snapshot._nextColumnHeaderToRead;
_columnDataBytesRead = _snapshot._columnDataBytesRead;
_sharedState._columnDataBytesRemaining = _snapshot._columnDataBytesRemaining;
_metaData = _snapshot._metadata;
_altMetaDataSetCollection = _snapshot._altMetaDataSetCollection;
_tableNames = _snapshot._tableNames;
_currentStream = _snapshot._currentStream;
_currentTextReader = _snapshot._currentTextReader;
_stateObj.PrepareReplaySnapshot();
}
_stateObj._executionContext = ExecutionContext.Capture();
}
private void SwitchToAsyncWithoutSnapshot() {
Debug.Assert(_snapshot != null, "Should currently have a snapshot");
Debug.Assert(_stateObj != null && !_stateObj._asyncReadWithoutSnapshot, "Already in async without snapshot");
_snapshot = null;
_stateObj.ResetSnapshot();
_stateObj._asyncReadWithoutSnapshot = true;
}
}// SqlDataReader
}// namespace