in System.Data/fx/src/data/System/Data/SqlClient/SqlBulkCopy.cs [1747:2720]
private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctoken) {
Task reconnectTask = _connection._currentReconnectionTask;
if (reconnectTask != null && !reconnectTask.IsCompleted) {
if (this._isAsyncBulkCopy) {
TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
reconnectTask.ContinueWith((t) => {
Task writeTask = WriteRowSourceToServerAsync(columnCount, ctoken);
if (writeTask == null) {
tcs.SetResult(null);
}
else {
AsyncHelper.ContinueTask(writeTask, tcs, () => tcs.SetResult(null));
}
}, ctoken); // we do not need to propagate exception etc. from reconnect task, we just need to wait for it to finish
return tcs.Task;
}
else {
AsyncHelper.WaitForCompletion(reconnectTask, BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }, rethrowExceptions: false);
}
}
bool finishedSynchronously = true;
_isBulkCopyingInProgress = true;
CreateOrValidateConnection(SQL.WriteToServer);
SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
Debug.Assert(_parserLock == null, "Previous parser lock not cleaned");
_parserLock = internalConnection._parserLock;
_parserLock.Wait(canReleaseFromAnyThread: _isAsyncBulkCopy);
TdsParser bestEffortCleanupTarget = null;
RuntimeHelpers.PrepareConstrainedRegions();
try {
#if DEBUG
TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
RuntimeHelpers.PrepareConstrainedRegions();
try {
tdsReliabilitySection.Start();
#else // !DEBUG
{
#endif //DEBUG
bestEffortCleanupTarget = SqlInternalConnection.GetBestEffortCleanupTarget(_connection);
WriteRowSourceToServerCommon(columnCount); //this is common in both sync and async
Task resultTask = WriteToServerInternalAsync(ctoken); // resultTask is null for sync, but Task for async.
if (resultTask != null) {
finishedSynchronously = false;
return resultTask.ContinueWith((t) => {
try {
AbortTransaction(); // if there is one, on success transactions will be commited
}
finally {
_isBulkCopyingInProgress = false;
if (_parser != null) {
_parser._asyncWrite = false;
}
if (_parserLock != null) {
_parserLock.Release();
_parserLock = null;
}
}
return t;
}, TaskScheduler.Default).Unwrap();
}
return null;
}
#if DEBUG
finally {
tdsReliabilitySection.Stop();
}
#endif //DEBUG
}
catch (System.OutOfMemoryException e) {
_connection.Abort(e);
throw;
}
catch (System.StackOverflowException e) {
_connection.Abort(e);
throw;
}
catch (System.Threading.ThreadAbortException e) {
_connection.Abort(e);
SqlInternalConnection.BestEffortCleanup(bestEffortCleanupTarget);
throw;
}
finally {
_columnMappings.ReadOnly = false;
if (finishedSynchronously) {
try {
AbortTransaction(); // if there is one, on success transactions will be commited
}
finally {
_isBulkCopyingInProgress = false;
if (_parser != null) {
_parser._asyncWrite = false;
}
if (_parserLock != null) {
_parserLock.Release();
_parserLock = null;
}
}
}
}
}
// Handles the column mapping.
//
private void WriteRowSourceToServerCommon(int columnCount) {
bool unspecifiedColumnOrdinals = false;
_columnMappings.ReadOnly = true;
_localColumnMappings = _columnMappings;
if (_localColumnMappings.Count > 0) {
_localColumnMappings.ValidateCollection();
foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
unspecifiedColumnOrdinals = true;
break;
}
}
}
else {
_localColumnMappings = new SqlBulkCopyColumnMappingCollection();
_localColumnMappings.CreateDefaultMapping(columnCount);
}
// perf: If the user specified all column ordinals we do not need to get a schematable
//
if (unspecifiedColumnOrdinals) {
int index = -1;
unspecifiedColumnOrdinals = false;
// Match up sourceColumn names with sourceColumn ordinals
//
if (_localColumnMappings.Count > 0) {
foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
string unquotedColumnName = UnquotedName(bulkCopyColumn.SourceColumn);
switch (this._rowSourceType) {
case ValueSourceType.DataTable:
index = ((DataTable)_rowSource).Columns.IndexOf(unquotedColumnName);
break;
case ValueSourceType.RowArray:
index = ((DataRow[])_rowSource)[0].Table.Columns.IndexOf(unquotedColumnName);
break;
case ValueSourceType.DbDataReader:
case ValueSourceType.IDataReader:
try {
index = ((IDataRecord)this._rowSource).GetOrdinal(unquotedColumnName);
}
catch (IndexOutOfRangeException e) {
throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e));
}
break;
}
if (index == -1) {
throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName));
}
bulkCopyColumn._internalSourceColumnOrdinal = index;
}
}
}
}
}
internal void OnConnectionClosed() {
TdsParserStateObject stateObj = _stateObj;
if (stateObj != null) {
stateObj.OnConnectionClosed();
}
}
private void OnRowsCopied(SqlRowsCopiedEventArgs value) {
SqlRowsCopiedEventHandler handler = _rowsCopiedEventHandler;
if(handler != null) {
handler(this, value);
}
}
// fxcop:
// Use the .Net Event System whenever appropriate.
private bool FireRowsCopiedEvent(long rowsCopied) {
// release lock to prevent possible deadlocks
SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
bool semaphoreLock = internalConnection._parserLock.CanBeReleasedFromAnyThread;
internalConnection._parserLock.Release();
SqlRowsCopiedEventArgs eventArgs = new SqlRowsCopiedEventArgs(rowsCopied);
try {
_insideRowsCopiedEvent = true;
this.OnRowsCopied(eventArgs);
}
finally {
_insideRowsCopiedEvent = false;
internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock);
}
return eventArgs.Abort;
}
// Reads a cell and then writes it.
// Read may block at this moment since there is no getValueAsync or DownStream async at this moment.
// When _isAsyncBulkCopy == true: Write will return Task (when async method runs asynchronously) or Null (when async call actually ran synchronously) for performance.
// When _isAsyncBulkCopy == false: Writes are purely sync. This method reutrn null at the end.
//
private Task ReadWriteColumnValueAsync(int col) {
bool isSqlType;
bool isDataFeed;
bool isNull;
Object value = GetValueFromSourceRow(col, out isSqlType, out isDataFeed, out isNull); //this will return Task/null in future: as rTask
_SqlMetaData metadata = _sortedColumnMappings[col]._metadata;
if (!isDataFeed) {
value = ConvertValue(value, metadata, isNull, ref isSqlType, out isDataFeed);
// If column encryption is requested via connection string option, perform encryption here
if (!isNull && // if value is not NULL
metadata.isEncrypted) { // If we are transparently encrypting
Debug.Assert (_parser.ShouldEncryptValuesForBulkCopy());
value = _parser.EncryptColumnValue(value, metadata, metadata.column, _stateObj, isDataFeed, isSqlType);
isSqlType = false; // Its not a sql type anymore
}
}
//write part
Task writeTask = null;
if (metadata.type != SqlDbType.Variant) {
//this is the most common path
writeTask = _parser.WriteBulkCopyValue(value, metadata, _stateObj, isSqlType, isDataFeed, isNull); //returns Task/Null
}
else {
// Target type shouldn't be encrypted
Debug.Assert (!metadata.isEncrypted, "Can't encrypt SQL Variant type");
SqlBuffer.StorageType variantInternalType = SqlBuffer.StorageType.Empty;
if ((_SqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) {
variantInternalType = _SqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal);
}
if (variantInternalType == SqlBuffer.StorageType.DateTime2) {
_parser.WriteSqlVariantDateTime2(((DateTime)value), _stateObj);
}
else if (variantInternalType == SqlBuffer.StorageType.Date) {
_parser.WriteSqlVariantDate(((DateTime)value), _stateObj);
}
else {
writeTask = _parser.WriteSqlVariantDataRowValue(value, _stateObj); //returns Task/Null
}
}
return writeTask;
}
private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask) {
SqlConnection connection = _connection;
if (connection == null) {
// No connection
throw ADP.ClosedConnectionError();
}
connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.BulkCopyTag);
}
// Runs a loop to copy all columns of a single row.
// maintains a state by remembering #columns copied so far (int col)
// Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
//
private Task CopyColumnsAsync(int col, TaskCompletionSource<object> source = null) {
Task resultTask = null, task = null;
int i;
try {
for (i = col; i < _sortedColumnMappings.Count; i++) {
task = ReadWriteColumnValueAsync(i); //First reads and then writes one cell value. Task 'task' is completed when reading task and writing task both are complete.
if (task != null) break; //task != null means we have a pending read/write Task.
}
if (task != null) {
if (source == null) {
source = new TaskCompletionSource<object>();
resultTask = source.Task;
}
CopyColumnsAsyncSetupContinuation(source, task, i);
return resultTask; //associated task will be completed when all colums (i.e. the entire row) is written
}
if (source != null) {
source.SetResult(null);
}
}
catch(Exception ex) {
if (source != null) {
source.TrySetException(ex);
}
else {
throw;
}
}
return resultTask;
}
// This is in its own method to avoid always allocating the lambda in CopyColumnsAsync
private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> source, Task task, int i) {
AsyncHelper.ContinueTask(task, source, () => {
if (i + 1 < _sortedColumnMappings.Count) {
CopyColumnsAsync(i + 1, source); //continue from the next column
}
else {
source.SetResult(null);
}
},
_connection.GetOpenTdsConnection());
}
// The notification logic.
//
private void CheckAndRaiseNotification() {
bool abortOperation = false; //returns if the operation needs to be aborted.
Exception exception = null;
_rowsCopied++;
// Fire event logic
if (_notifyAfter > 0) { // no action if no value specified
// (0=no notification)
if (_rowsUntilNotification > 0) { // > 0?
if (--_rowsUntilNotification == 0) { // decrement counter
// Fire event during operation. This is the users chance to abort the operation
try {
// it's also the user's chance to cause an exception ...
_stateObj.BcpLock = true;
abortOperation = FireRowsCopiedEvent(_rowsCopied);
Bid.Trace("<sc.SqlBulkCopy.WriteToServerInternal|INFO> \n");
// just in case some pathological person closes the target connection ...
if (ConnectionState.Open != _connection.State) {
exception = ADP.OpenConnectionRequired("CheckAndRaiseNotification", _connection.State);
}
}
catch (Exception e) {
//
if (!ADP.IsCatchableExceptionType(e)) {
exception = e;
}
else {
exception = OperationAbortedException.Aborted(e);
}
}
finally {
_stateObj.BcpLock = false;
}
if (!abortOperation) {
_rowsUntilNotification = _notifyAfter;
}
}
}
}
if (!abortOperation && _rowsUntilNotification > _notifyAfter) { // if the specified counter decreased we update
_rowsUntilNotification = _notifyAfter; // decreased we update otherwise not
}
if (exception == null && abortOperation) {
exception = OperationAbortedException.Aborted(null);
}
if (_connection.State != ConnectionState.Open) {
throw ADP.OpenConnectionRequired(SQL.WriteToServer, _connection.State);
}
if (exception != null) {
_parser._asyncWrite = false;
Task writeTask = _parser.WriteBulkCopyDone(_stateObj); //We should complete the current batch upto this row.
Debug.Assert(writeTask == null, "Task should not pend while doing sync bulk copy");
RunParser();
AbortTransaction();
throw exception; //this will be caught and put inside the Task's exception.
}
}
// Checks for cancellation. If cancel requested, cancels the task and returns the cancelled task
Task CheckForCancellation(CancellationToken cts, TaskCompletionSource<object> tcs) {
if (cts.IsCancellationRequested) {
if (tcs == null) {
tcs = new TaskCompletionSource<object>();
}
tcs.SetCanceled();
return tcs.Task;
}
else {
return null;
}
}
private TaskCompletionSource<object> ContinueTaskPend(Task task, TaskCompletionSource<object> source, Func<TaskCompletionSource<object>> action) {
if (task == null) {
return action();
}
else {
Debug.Assert(source != null, "source should already be initialized if task is not null");
AsyncHelper.ContinueTask(task, source, () => {
TaskCompletionSource<object> newSource = action();
Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists");
});
}
return null;
}
// Copies all the rows in a batch
// maintains state machine with state variable: rowSoFar
// Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
//
private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, TaskCompletionSource<object> source = null) {
Task resultTask = null;
Task task = null;
int i;
try {
//totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false).
for (i = rowsSoFar; (totalRows <= 0 || i < totalRows) && _hasMoreRowToCopy == true; i++) {
if (_isAsyncBulkCopy == true) {
resultTask = CheckForCancellation(cts, source);
if (resultTask != null) {
return resultTask; // task got cancelled!
}
}
_stateObj.WriteByte(TdsEnums.SQLROW);
task = CopyColumnsAsync(0); //copy 1 row
if (task == null) { //tsk is done.
CheckAndRaiseNotification(); //check notification logic after copying the row
//now we will read the next row.
Task readTask = ReadFromRowSourceAsync(cts); // read the next row. Caution: more is only valid if the task returns null. Otherwise, we wait for Task.Result
if (readTask != null) {
if (source == null) {
source = new TaskCompletionSource<object>();
}
resultTask = source.Task;
AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
return resultTask; //associated task will be completed when all rows are copied to server/exception/cancelled.
}
}
else { //tsk != null, we add continuation for it.
source = source ?? new TaskCompletionSource<object>();
resultTask = source.Task;
AsyncHelper.ContinueTask(task, source, onSuccess: () => {
CheckAndRaiseNotification(); //check for notification now as the current row copy is done at this moment.
Task readTask = ReadFromRowSourceAsync(cts);
if (readTask == null) {
CopyRowsAsync(i + 1, totalRows, cts, source);
}
else {
AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
}
}, connectionToDoom: _connection.GetOpenTdsConnection());
return resultTask;
}
}
if (source != null) {
source.TrySetResult(null); // this is set only on the last call of async copy. But may not be set if everything runs synchronously.
}
}
catch (Exception ex) {
if (source != null) {
source.TrySetException(ex);
}
else {
throw;
}
}
return resultTask;
}
// Copies all the batches in a loop. One iteration for one batch.
// state variable is essentially not needed. (however, _hasMoreRowToCopy might be thought as a state variable)
// Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
//
private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source = null) {
Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
try {
while (_hasMoreRowToCopy) {
//pre->before every batch: Transaction, BulkCmd and metadata are done.
SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
if (IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) { //internal trasaction is started prior to each batch if the Option is set.
internalConnection.ThreadHasParserLockForClose = true; // In case of error, tell the connection we already have the parser lock
try {
_internalTransaction = _connection.BeginTransaction();
}
finally {
internalConnection.ThreadHasParserLockForClose = false;
}
}
Task commandTask = SubmitUpdateBulkCommand(updateBulkCommandText);
if (commandTask == null) {
Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
if (continuedTask != null) {
// Continuation will take care of re-calling CopyBatchesAsync
return continuedTask;
}
}
else {
Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
if (source == null) {
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(commandTask, source, () => {
Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null) {
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
}, _connection.GetOpenTdsConnection());
return source.Task;
}
}
}
catch (Exception ex) {
if (source != null) {
source.TrySetException(ex);
return source.Task;
}
else {
throw;
}
}
// If we are here, then we finished everything
if (source != null) {
source.SetResult(null);
return source.Task;
}
else {
return null;
}
}
// Writes the MetaData and a single batch
// If this returns true, then the caller is responsible for starting the next stage
private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
try {
WriteMetaData(internalResults);
// Load encryption keys now (if needed)
_parser.LoadColumnEncryptionKeys(
internalResults[MetaDataResultId].MetaData,
_connection.DataSource);
Task task = CopyRowsAsync(0, _savedBatchSize, cts); //this is copying 1 batch of rows and setting _hasMoreRowToCopy = true/false.
//post->after every batch
if (task != null) {
Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
if (source == null) { //first time only
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(task, source, () => {
Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
if (continuedTask == null) {
// Continuation finished sync, recall into CopyBatchesAsync to continue
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}
}, _connection.GetOpenTdsConnection(), _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), () => CopyBatchesAsyncContinuedOnError(cleanupParser: true));
return source.Task;
}
else {
return CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
}
}
catch (Exception ex) {
if (source != null) {
source.TrySetException(ex);
return source.Task;
}
else {
throw;
}
}
}
// Takes care of finishing a single batch (write done, run parser, commit transaction)
// If this returns true, then the caller is responsible for starting the next stage
private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
try {
Task writeTask = _parser.WriteBulkCopyDone(_stateObj);
if (writeTask == null) {
RunParser();
CommitTransaction();
return null;
}
else {
Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
if (source == null) {
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(writeTask, source, () => {
try {
RunParser();
CommitTransaction();
}
catch (Exception) {
CopyBatchesAsyncContinuedOnError(cleanupParser: false);
throw;
}
// Always call back into CopyBatchesAsync
CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
}, connectionToDoom: _connection.GetOpenTdsConnection(), onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false));
return source.Task;
}
}
catch (Exception ex) {
if (source != null) {
source.TrySetException(ex);
return source.Task;
}
else {
throw;
}
}
}
// Takes care of cleaning up the parser, stateObj and transaction when CopyBatchesAsync fails
private void CopyBatchesAsyncContinuedOnError(bool cleanupParser) {
SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
RuntimeHelpers.PrepareConstrainedRegions();
try {
#if DEBUG
TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
RuntimeHelpers.PrepareConstrainedRegions();
try {
tdsReliabilitySection.Start();
#endif //DEBUG
if ((cleanupParser) && (_parser != null) && (_stateObj != null)) {
_parser._asyncWrite = false;
Task task = _parser.WriteBulkCopyDone(_stateObj);
Debug.Assert(task == null, "Write should not pend when error occurs");
RunParser();
}
if (_stateObj != null) {
CleanUpStateObjectOnError();
}
#if DEBUG
}
finally {
tdsReliabilitySection.Stop();
}
#endif //DEBUG
}
catch (OutOfMemoryException) {
internalConnection.DoomThisConnection();
throw;
}
catch (StackOverflowException) {
internalConnection.DoomThisConnection();
throw;
}
catch (ThreadAbortException) {
internalConnection.DoomThisConnection();
throw;
}
AbortTransaction();
}
//Cleans the stateobj. Used in a number of places, specially in exceptions
//
private void CleanUpStateObjectOnError() {
if (_stateObj != null) {
_parser.Connection.ThreadHasParserLockForClose = true;
try {
_stateObj.ResetBuffer();
_stateObj._outputPacketNumber = 1;
//If _parser is closed, sending attention will raise debug assertion, so we avoid it but not calling CancelRequest;
if (_parser.State == TdsParserState.OpenNotLoggedIn || _parser.State == TdsParserState.OpenLoggedIn) {
_stateObj.CancelRequest();
}
_stateObj._internalTimeout = false;
_stateObj.CloseSession();
_stateObj._bulkCopyOpperationInProgress = false;
_stateObj._bulkCopyWriteTimeout = false;
_stateObj = null;
}
finally {
_parser.Connection.ThreadHasParserLockForClose = false;
}
}
}
// The continuation part of WriteToServerInternalRest. Executes when the initial query task is completed. (see, WriteToServerInternalRest).
// It carries on the source which is passed from the WriteToServerInternalRest and performs SetResult when the entire copy is done.
// The carried on source may be null in case of Sync copy. So no need to SetResult at that time.
// It launches the copy operation.
//
private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet internalResults, CancellationToken cts, TaskCompletionSource<object> source) {
Task task = null;
string updateBulkCommandText = null;
try {
updateBulkCommandText = AnalyzeTargetAndCreateUpdateBulkCommand(internalResults);
if (_sortedColumnMappings.Count != 0) {
_stateObj.SniContext = SniContext.Snix_SendRows;
_savedBatchSize = _batchSize; // for safety. If someone changes the batchsize during copy we still be using _savedBatchSize
_rowsUntilNotification = _notifyAfter;
_rowsCopied = 0;
_currentRowMetadata = new SourceColumnMetadata[_sortedColumnMappings.Count];
for (int i = 0; i < _currentRowMetadata.Length; i++) {
_currentRowMetadata[i] = GetColumnMetadata(i);
}
task = CopyBatchesAsync(internalResults, updateBulkCommandText, cts); //launch the BulkCopy
}
if (task != null) {
if (source == null) {
source = new TaskCompletionSource<object>();
}
AsyncHelper.ContinueTask(task, source, () => {
//Bulk copy task is completed at this moment.
//Todo: The cases may be combined for code reuse.
if (task.IsCanceled) {
_localColumnMappings = null;
try {
CleanUpStateObjectOnError();
}
finally {
source.SetCanceled();
}
}
else if (task.Exception != null) {
source.SetException(task.Exception.InnerException);
}
else {
_localColumnMappings = null;
try {
CleanUpStateObjectOnError();
}
finally {
if (source != null) {
if (cts.IsCancellationRequested) { //We may get cancellation req even after the entire copy.
source.SetCanceled();
}
else {
source.SetResult(null);
}
}
}
}
}, _connection.GetOpenTdsConnection());
return;
}
else {
_localColumnMappings = null;
try {
CleanUpStateObjectOnError();
} catch (Exception cleanupEx) {
Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
}
if(source != null) {
source.SetResult(null);
}
}
}
catch(Exception ex){
_localColumnMappings = null;
try {
CleanUpStateObjectOnError();
} catch (Exception cleanupEx) {
Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
}
if (source != null) {
source.TrySetException(ex);
}
else {
throw;
}
}
}
// Rest of the WriteToServerInternalAsync method.
// It carries on the source from its caller WriteToServerInternal.
// source is null in case of Sync bcp. But valid in case of Async bcp.
// It calls the WriteToServerInternalRestContinuedAsync as a continuation of the initial query task.
//
private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletionSource<object> source) {
Debug.Assert(_hasMoreRowToCopy, "first time it is true, otherwise this method would not have been called.");
_hasMoreRowToCopy = true;
Task<BulkCopySimpleResultSet> internalResultsTask = null;
BulkCopySimpleResultSet internalResults = new BulkCopySimpleResultSet();
SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
try {
_parser = _connection.Parser;
_parser._asyncWrite = _isAsyncBulkCopy; //very important!
Task reconnectTask;
try {
reconnectTask = _connection.ValidateAndReconnect(
() => {
if (_parserLock != null) {
_parserLock.Release();
_parserLock = null;
}
}, BulkCopyTimeout);
}
catch (SqlException ex) {
throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
}
if (reconnectTask != null) {
if (_isAsyncBulkCopy) {
CancellationTokenRegistration regReconnectCancel = new CancellationTokenRegistration();
TaskCompletionSource<object> cancellableReconnectTS = new TaskCompletionSource<object>();
if (cts.CanBeCanceled) {
regReconnectCancel = cts.Register(() => cancellableReconnectTS.TrySetCanceled());
}
AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); });
// no need to cancel timer since SqlBulkCopy creates specific task source for reconnection
AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout,
()=>{return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout());}, CancellationToken.None);
AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source,
() => {
regReconnectCancel.Dispose();
if (_parserLock != null) {
_parserLock.Release();
_parserLock = null;
}
_parserLock = _connection.GetOpenTdsConnection()._parserLock;
_parserLock.Wait(canReleaseFromAnyThread: true);
WriteToServerInternalRestAsync(cts, source);
},
connectionToAbort: _connection,
onFailure: (e) => { regReconnectCancel.Dispose(); },
onCancellation: () => { regReconnectCancel.Dispose(); },
exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex));
return;
}
else {
try {
AsyncHelper.WaitForCompletion(reconnectTask, this.BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); } );
}
catch (SqlException ex) {
throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex); // preserve behavior (throw InvalidOperationException on failure to connect)
}
_parserLock = _connection.GetOpenTdsConnection()._parserLock;
_parserLock.Wait(canReleaseFromAnyThread: false);
WriteToServerInternalRestAsync(cts, source);
return;
}
}
if (_isAsyncBulkCopy) {
_connection.AddWeakReference(this, SqlReferenceCollection.BulkCopyTag);
}
internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we already have the parser lock
try {
_stateObj = _parser.GetSession(this);
_stateObj._bulkCopyOpperationInProgress = true;
_stateObj.StartSession(ObjectID);
}
finally {
internalConnection.ThreadHasParserLockForClose = false;
}
try {
internalResultsTask = CreateAndExecuteInitialQueryAsync(out internalResults); //Task/Null
}
catch (SqlException ex) {
throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
}
if(internalResultsTask != null) {
AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), _connection.GetOpenTdsConnection());
}
else {
Debug.Assert(internalResults != null, "Executing initial query finished synchronously, but there were no results");
WriteToServerInternalRestContinuedAsync(internalResults, cts, source); //internalResults is valid here.
}
}
catch (Exception ex) {
if (source != null) {
source.TrySetException(ex);
}
else {
throw;
}
}
}
// This returns Task for Async, Null for Sync
//
private Task WriteToServerInternalAsync(CancellationToken ctoken) {
TaskCompletionSource<object> source = null;
Task<object> resultTask = null;
if (_isAsyncBulkCopy) {
source = new TaskCompletionSource<object>(); //creating the completion source/Task that we pass to application
resultTask = source.Task;
RegisterForConnectionCloseNotification(ref resultTask);
}
if (_destinationTableName == null) {
if(source != null) {
source.SetException(SQL.BulkLoadMissingDestinationTable()); //no table to copy
}
else {
throw SQL.BulkLoadMissingDestinationTable();
}
return resultTask;
}
try {
Task readTask = ReadFromRowSourceAsync(ctoken); // readTask == reading task. This is the first read call. "more" is valid only if readTask == null;
if (readTask == null) { //synchronously finished reading.
if (!_hasMoreRowToCopy) { //no rows in the source to copy!
if(source != null) {
source.SetResult(null);
}
return resultTask;
}
else { //true, we have more rows.
WriteToServerInternalRestAsync(ctoken, source); //rest of the method, passing the same completion and returning the incomplete task (ret).
return resultTask;
}
}
else {
Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode");
AsyncHelper.ContinueTask(readTask, source, () => {
if (!_hasMoreRowToCopy) {
source.SetResult(null); //no rows to copy!
}
else {
WriteToServerInternalRestAsync(ctoken, source); //passing the same completion which will be completed by the Callee.
}
}, _connection.GetOpenTdsConnection());
return resultTask;
}
}
catch(Exception ex) {
if (source != null) {
source.TrySetException(ex);
}
else {
throw;
}
}
return resultTask;
}
}//End of SqlBulkCopy Class