private Task WriteRowSourceToServerAsync()

in System.Data/fx/src/data/System/Data/SqlClient/SqlBulkCopy.cs [1747:2720]


        private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctoken) {
            Task reconnectTask = _connection._currentReconnectionTask;
            if (reconnectTask != null && !reconnectTask.IsCompleted) {
                if (this._isAsyncBulkCopy) {
                    TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
                    reconnectTask.ContinueWith((t) => {
                        Task writeTask = WriteRowSourceToServerAsync(columnCount, ctoken);
                        if (writeTask == null) {
                            tcs.SetResult(null);
                        }
                        else {
                            AsyncHelper.ContinueTask(writeTask, tcs, () => tcs.SetResult(null));
                        }
                    }, ctoken); // we do not need to propagate exception etc. from reconnect task, we just need to wait for it to finish
                    return tcs.Task;
                }
                else {
                    AsyncHelper.WaitForCompletion(reconnectTask, BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }, rethrowExceptions: false);
                }
            }

            bool finishedSynchronously = true;
            _isBulkCopyingInProgress = true;

            CreateOrValidateConnection(SQL.WriteToServer);
            SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();

            Debug.Assert(_parserLock == null, "Previous parser lock not cleaned");
            _parserLock = internalConnection._parserLock;
            _parserLock.Wait(canReleaseFromAnyThread: _isAsyncBulkCopy);
            
            TdsParser bestEffortCleanupTarget = null;
            RuntimeHelpers.PrepareConstrainedRegions();
            try {
#if DEBUG
                TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();

                RuntimeHelpers.PrepareConstrainedRegions();
                try {
                    tdsReliabilitySection.Start();
#else   // !DEBUG
                {
#endif //DEBUG
                    bestEffortCleanupTarget = SqlInternalConnection.GetBestEffortCleanupTarget(_connection);
                    WriteRowSourceToServerCommon(columnCount); //this is common in both sync and async
                    Task resultTask = WriteToServerInternalAsync(ctoken); // resultTask is null for sync, but Task for async.
                    if (resultTask != null) {
                        finishedSynchronously = false;
                        return resultTask.ContinueWith((t) => {
                            try {
                                AbortTransaction(); // if there is one, on success transactions will be commited
                            }
                            finally {
                                _isBulkCopyingInProgress = false;
                                if (_parser != null) {
                                    _parser._asyncWrite = false;
                                }
                                if (_parserLock != null) {
                                    _parserLock.Release();
                                    _parserLock = null;
                                }
                            }
                            return t;
                        }, TaskScheduler.Default).Unwrap();
                    }
                    return null;
                }

#if DEBUG
                finally {
                    tdsReliabilitySection.Stop();
                }
#endif //DEBUG

            }

            catch (System.OutOfMemoryException e) {
                _connection.Abort(e);
                throw;
            }
            catch (System.StackOverflowException e) {
                _connection.Abort(e);
                throw;
            }
            catch (System.Threading.ThreadAbortException e) {
                _connection.Abort(e);
                SqlInternalConnection.BestEffortCleanup(bestEffortCleanupTarget);
                throw;
            }
            finally {
                _columnMappings.ReadOnly = false;
                if (finishedSynchronously) {
                    try {
                        AbortTransaction(); // if there is one, on success transactions will be commited
                    }
                    finally {
                        _isBulkCopyingInProgress = false;
                        if (_parser != null) {
                            _parser._asyncWrite = false;
                        }
                        if (_parserLock != null) {
                            _parserLock.Release();
                            _parserLock = null;
                        }
                    }
                }
            }
        }
  

    // Handles the column mapping.
    //    
        private void WriteRowSourceToServerCommon(int columnCount) {
            bool unspecifiedColumnOrdinals = false;

            _columnMappings.ReadOnly = true;
            _localColumnMappings = _columnMappings;
            if (_localColumnMappings.Count > 0) {
                _localColumnMappings.ValidateCollection();
                foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
                    if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
                        unspecifiedColumnOrdinals = true;
                        break;
                    }
                }
            }
            else {
                _localColumnMappings = new SqlBulkCopyColumnMappingCollection();
                _localColumnMappings.CreateDefaultMapping(columnCount);
            }

            // perf: If the user specified all column ordinals we do not need to get a schematable
            //
            if (unspecifiedColumnOrdinals) {
                int index = -1;
                unspecifiedColumnOrdinals = false;

                // Match up sourceColumn names with sourceColumn ordinals
                //
                if (_localColumnMappings.Count > 0) {
                    foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
                        if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
                            string unquotedColumnName = UnquotedName(bulkCopyColumn.SourceColumn);

                            switch (this._rowSourceType) {
                                case ValueSourceType.DataTable:
                                    index = ((DataTable)_rowSource).Columns.IndexOf(unquotedColumnName);
                                    break;
                                case ValueSourceType.RowArray:
                                    index = ((DataRow[])_rowSource)[0].Table.Columns.IndexOf(unquotedColumnName);
                                    break;
                                case ValueSourceType.DbDataReader:
                                case ValueSourceType.IDataReader:
                                    try {
                                        index = ((IDataRecord)this._rowSource).GetOrdinal(unquotedColumnName);
                                    }
                                    catch (IndexOutOfRangeException e) {
                                        throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e));
                                    }
                                    break;
                            }
                            if (index == -1) {
                                throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName));
                            }
                            bulkCopyColumn._internalSourceColumnOrdinal = index;
                        }
                    }
                }
            }      
        }     

        internal void OnConnectionClosed() {
            TdsParserStateObject stateObj = _stateObj;
            if (stateObj != null) {
                stateObj.OnConnectionClosed();
            }
        }

        private void OnRowsCopied(SqlRowsCopiedEventArgs value) {
            SqlRowsCopiedEventHandler handler = _rowsCopiedEventHandler;
            if(handler != null) {
                handler(this, value);
            }
        }
        // fxcop:
        // Use the .Net Event System whenever appropriate.

        private bool FireRowsCopiedEvent(long rowsCopied) {
            // release lock to prevent possible deadlocks
            SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
            bool semaphoreLock = internalConnection._parserLock.CanBeReleasedFromAnyThread;
            internalConnection._parserLock.Release();

            SqlRowsCopiedEventArgs eventArgs = new SqlRowsCopiedEventArgs(rowsCopied);
            try {
                _insideRowsCopiedEvent = true;
                this.OnRowsCopied(eventArgs);
            }
            finally {
                _insideRowsCopiedEvent = false;
                internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock);
            }
            return eventArgs.Abort;
        }

        // Reads a cell and then writes it. 
        // Read may block at this moment since there is no getValueAsync or DownStream async at this moment.
        // When _isAsyncBulkCopy == true: Write will return Task (when async method runs asynchronously) or Null (when async call actually ran synchronously) for performance. 
        // When _isAsyncBulkCopy == false: Writes are purely sync. This method reutrn null at the end.
        //
        private Task ReadWriteColumnValueAsync(int col) {
            bool isSqlType;
            bool isDataFeed;
            bool isNull;
            Object value = GetValueFromSourceRow(col, out isSqlType, out isDataFeed, out isNull); //this will return Task/null in future: as rTask

            _SqlMetaData metadata = _sortedColumnMappings[col]._metadata;
            if (!isDataFeed) {
                value = ConvertValue(value, metadata, isNull, ref isSqlType, out isDataFeed);

                // If column encryption is requested via connection string option, perform encryption here
                if (!isNull && // if value is not NULL
                    metadata.isEncrypted) { // If we are transparently encrypting
                    Debug.Assert (_parser.ShouldEncryptValuesForBulkCopy());
                    value = _parser.EncryptColumnValue(value, metadata, metadata.column, _stateObj, isDataFeed, isSqlType);
                    isSqlType = false; // Its not a sql type anymore
                }
            }

            //write part
            Task writeTask = null;
            if (metadata.type != SqlDbType.Variant) {
                //this is the most common path
                writeTask = _parser.WriteBulkCopyValue(value, metadata, _stateObj, isSqlType, isDataFeed, isNull); //returns Task/Null
            }
            else {
                // Target type shouldn't be encrypted
                Debug.Assert (!metadata.isEncrypted, "Can't encrypt SQL Variant type");
                SqlBuffer.StorageType variantInternalType = SqlBuffer.StorageType.Empty;
                if ((_SqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) {
                    variantInternalType = _SqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal);
                }

                if (variantInternalType == SqlBuffer.StorageType.DateTime2) {
                    _parser.WriteSqlVariantDateTime2(((DateTime)value), _stateObj);
                }
                else if (variantInternalType == SqlBuffer.StorageType.Date) {
                    _parser.WriteSqlVariantDate(((DateTime)value), _stateObj);
                }
                else {
                    writeTask = _parser.WriteSqlVariantDataRowValue(value, _stateObj); //returns Task/Null
                }
            }

            return writeTask;
        }

        private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask) {
            SqlConnection connection = _connection;
            if (connection == null) {
                // No connection
                throw ADP.ClosedConnectionError();
            }

            connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.BulkCopyTag);
        }

        // Runs a loop to copy all columns of a single row.
        // maintains a state by remembering #columns copied so far (int col)
        // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
        //
        private Task CopyColumnsAsync(int col, TaskCompletionSource<object> source = null) {
            Task resultTask = null, task = null;
            int i;
            try {
                for (i = col; i < _sortedColumnMappings.Count; i++) {
                    task = ReadWriteColumnValueAsync(i); //First reads and then writes one cell value. Task 'task' is completed when reading task and writing task both are complete.
                    if (task != null) break; //task != null means we have a pending read/write Task.
                }
                if (task != null) {
                    if (source == null) {
                        source = new TaskCompletionSource<object>();
                        resultTask = source.Task;
                    }
                    CopyColumnsAsyncSetupContinuation(source, task, i);
                    return resultTask; //associated task will be completed when all colums (i.e. the entire row) is written
                }
                if (source != null) {
                    source.SetResult(null);
                }
            }
            catch(Exception ex) {
                if (source != null) {
                    source.TrySetException(ex);
                }
                else {
                    throw;
                }
            }
            return resultTask;
        }

        // This is in its own method to avoid always allocating the lambda in CopyColumnsAsync
        private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> source, Task task, int i) {
            AsyncHelper.ContinueTask(task, source, () => {
                    if (i + 1 < _sortedColumnMappings.Count) {
                        CopyColumnsAsync(i + 1, source); //continue from the next column
                    }
                    else {
                        source.SetResult(null);
                    }
                },
                _connection.GetOpenTdsConnection());
        }


        // The notification logic. 
        //
        private void CheckAndRaiseNotification() {
            bool abortOperation = false; //returns if the operation needs to be aborted.
            Exception exception = null;

            _rowsCopied++;

            // Fire event logic
            if (_notifyAfter > 0) {                      // no action if no value specified
                // (0=no notification)
                if (_rowsUntilNotification > 0) {       // > 0?
                    if (--_rowsUntilNotification == 0) {        // decrement counter
                        // Fire event during operation. This is the users chance to abort the operation
                        try {
                            // it's also the user's chance to cause an exception ...
                            _stateObj.BcpLock = true;
                            abortOperation = FireRowsCopiedEvent(_rowsCopied);
                            Bid.Trace("<sc.SqlBulkCopy.WriteToServerInternal|INFO> \n");

                            // just in case some pathological person closes the target connection ...
                            if (ConnectionState.Open != _connection.State) {
                                exception = ADP.OpenConnectionRequired("CheckAndRaiseNotification", _connection.State);
                            }
                        }
                        catch (Exception e) {
                            // 
                            if (!ADP.IsCatchableExceptionType(e)) {
                                exception = e;
                            }
                            else {
                                exception = OperationAbortedException.Aborted(e);
                            }
                        }
                        finally {
                            _stateObj.BcpLock = false;
                        }
                        if (!abortOperation) {
                            _rowsUntilNotification = _notifyAfter;    
                        }
                    }
                }
            }
            if (!abortOperation && _rowsUntilNotification > _notifyAfter) {    // if the specified counter decreased we update
                _rowsUntilNotification = _notifyAfter;      // decreased we update otherwise not
            }
            if (exception == null && abortOperation) {
                exception = OperationAbortedException.Aborted(null);
            }
            if (_connection.State != ConnectionState.Open) {
                throw ADP.OpenConnectionRequired(SQL.WriteToServer, _connection.State);
            }
            if (exception != null) {
                _parser._asyncWrite = false;
                Task writeTask = _parser.WriteBulkCopyDone(_stateObj); //We should complete the current batch upto this row.
                Debug.Assert(writeTask == null, "Task should not pend while doing sync bulk copy");
                RunParser();
                AbortTransaction();
                throw exception; //this will be caught and put inside the Task's exception.    
            }
        }

        // Checks for cancellation. If cancel requested, cancels the task and returns the cancelled task 
        Task CheckForCancellation(CancellationToken cts, TaskCompletionSource<object> tcs) {
            if (cts.IsCancellationRequested) {
                if (tcs == null) {
                    tcs = new TaskCompletionSource<object>();
                }
                tcs.SetCanceled();
                return tcs.Task;
            }
            else {
                return null;
            }
        }

        private TaskCompletionSource<object> ContinueTaskPend(Task task, TaskCompletionSource<object> source, Func<TaskCompletionSource<object>> action) {

            if (task == null) {
                return action();
            }
            else {
                Debug.Assert(source != null, "source should already be initialized if task is not null");
                AsyncHelper.ContinueTask(task, source, () => {
                    TaskCompletionSource<object> newSource = action();
                    Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists");
                });
            }
            return null;
        }

        // Copies all the rows in a batch
        // maintains state machine with state variable: rowSoFar
        // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
        //
        private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, TaskCompletionSource<object> source = null) {
            Task resultTask = null;
            Task task = null;
            int i;
            try {
                //totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false). 
                for (i = rowsSoFar; (totalRows <= 0 || i < totalRows) && _hasMoreRowToCopy == true; i++) {
                    if (_isAsyncBulkCopy == true) {
                        resultTask = CheckForCancellation(cts, source);
                        if (resultTask != null) {
                            return resultTask; // task got cancelled!
                        }
                    }

                    _stateObj.WriteByte(TdsEnums.SQLROW);
                    
                    task = CopyColumnsAsync(0); //copy 1 row
                             
                    if (task == null) { //tsk is done. 
                        CheckAndRaiseNotification(); //check notification logic after copying the row

                        //now we will read the next row.    
                        Task readTask = ReadFromRowSourceAsync(cts); // read the next row. Caution: more is only valid if the task returns null. Otherwise, we wait for Task.Result
                        if (readTask != null) {
                            if (source == null) {
                                source = new TaskCompletionSource<object>();
                            }
                            resultTask = source.Task;

                            AsyncHelper.ContinueTask(readTask, source, () =>  CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
                            return resultTask; //associated task will be completed when all rows are copied to server/exception/cancelled.
                        }
                    }
                    else { //tsk != null, we add continuation for it.
                        source = source ?? new TaskCompletionSource<object>();
                        resultTask = source.Task;

                        AsyncHelper.ContinueTask(task, source, onSuccess: () => {
                            CheckAndRaiseNotification(); //check for notification now as the current row copy is done at this moment.

                            Task readTask = ReadFromRowSourceAsync(cts);
                            if (readTask == null) {
                                CopyRowsAsync(i + 1, totalRows, cts, source);
                            }
                            else {
                                AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
                            }
                        }, connectionToDoom: _connection.GetOpenTdsConnection());
                        return resultTask;
                    }
                }

                if (source != null) {
                    source.TrySetResult(null); // this is set only on the last call of async copy. But may not be set if everything runs synchronously.
                } 
            }
            catch (Exception ex) {
                if (source != null) {
                    source.TrySetException(ex);
                }
                else {
                    throw;
                }
            }
            return resultTask;
        }

        // Copies all the batches in a loop. One iteration for one batch.
        // state variable is essentially not needed. (however, _hasMoreRowToCopy might be thought as a state variable)
        // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
        //
        private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source = null) {
            Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
            try {
                while (_hasMoreRowToCopy) {
                    //pre->before every batch: Transaction, BulkCmd and metadata are done.
                    SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();

                    if (IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) { //internal trasaction is started prior to each batch if the Option is set.
                        internalConnection.ThreadHasParserLockForClose = true;     // In case of error, tell the connection we already have the parser lock
                        try {
                            _internalTransaction = _connection.BeginTransaction();
                        }
                        finally {
                            internalConnection.ThreadHasParserLockForClose = false;
                        }
                    }

                    Task commandTask = SubmitUpdateBulkCommand(updateBulkCommandText);

                    if (commandTask == null) {
                        Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
                        if (continuedTask != null) {
                            // Continuation will take care of re-calling CopyBatchesAsync
                            return continuedTask;
                        }
                    }
                    else {
                        Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
                        if (source == null) {
                            source = new TaskCompletionSource<object>();
                        }

                        AsyncHelper.ContinueTask(commandTask, source, () => {
                            Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
                            if (continuedTask == null) {
                                // Continuation finished sync, recall into CopyBatchesAsync to continue
                                CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
                            }
                        }, _connection.GetOpenTdsConnection());
                        return source.Task;
                    }
                }
            }
            catch (Exception ex) {
                if (source != null) {
                    source.TrySetException(ex);
                    return source.Task;
                }
                else {
                    throw;
                }
            }

            // If we are here, then we finished everything
            if (source != null) {
                source.SetResult(null);
                return source.Task;
            }
            else {
                return null;
            }            
        }

        // Writes the MetaData and a single batch
        // If this returns true, then the caller is responsible for starting the next stage
        private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
            Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
            try {
                WriteMetaData(internalResults);

                // Load encryption keys now (if needed)
                _parser.LoadColumnEncryptionKeys(
                    internalResults[MetaDataResultId].MetaData, 
                    _connection.DataSource);

                Task task = CopyRowsAsync(0, _savedBatchSize, cts); //this is copying 1 batch of rows and setting _hasMoreRowToCopy = true/false.

                //post->after every batch
                if (task != null) {
                    Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
                    if (source == null) { //first time only
                        source = new TaskCompletionSource<object>();
                    }
                    AsyncHelper.ContinueTask(task, source, () => {
                        Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
                        if (continuedTask == null) {
                            // Continuation finished sync, recall into CopyBatchesAsync to continue
                            CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
                        }
                    }, _connection.GetOpenTdsConnection(), _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), () => CopyBatchesAsyncContinuedOnError(cleanupParser: true));

                    return source.Task;
                }
                else {
                    return CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
                }   
            }
            catch (Exception ex) {
                if (source != null) {
                    source.TrySetException(ex);
                    return source.Task;
                }
                else {
                    throw;
                }
            }
        }

        // Takes care of finishing a single batch (write done, run parser, commit transaction)
        // If this returns true, then the caller is responsible for starting the next stage
        private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
            Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
            try {
                Task writeTask = _parser.WriteBulkCopyDone(_stateObj);

                if (writeTask == null) {
                    RunParser();
                    CommitTransaction();

                    return null;
                }
                else {
                    Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
                    if (source == null) {
                        source = new TaskCompletionSource<object>();
                    }

                    AsyncHelper.ContinueTask(writeTask, source, () => {
                        try {
                            RunParser();
                            CommitTransaction();
                        }
                        catch (Exception) {
                            CopyBatchesAsyncContinuedOnError(cleanupParser: false);
                            throw;
                        }

                        // Always call back into CopyBatchesAsync
                        CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);

                    }, connectionToDoom: _connection.GetOpenTdsConnection(), onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false));
                    return source.Task;
                }
            }
            catch (Exception ex) {
                if (source != null) {
                    source.TrySetException(ex);
                    return source.Task;
                }
                else {
                    throw;
                }
            }
        }
        
        // Takes care of cleaning up the parser, stateObj and transaction when CopyBatchesAsync fails
        private void CopyBatchesAsyncContinuedOnError(bool cleanupParser) {
            SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
            RuntimeHelpers.PrepareConstrainedRegions();
            try {
#if DEBUG
                TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
                RuntimeHelpers.PrepareConstrainedRegions();
                try {
                    tdsReliabilitySection.Start();
    #endif //DEBUG
                    if ((cleanupParser) && (_parser != null) && (_stateObj != null)) {
                        _parser._asyncWrite = false;
                        Task task = _parser.WriteBulkCopyDone(_stateObj);
                        Debug.Assert(task == null, "Write should not pend when error occurs");
                        RunParser();
                    }

                    if (_stateObj != null) {
                        CleanUpStateObjectOnError();
                    }
    #if DEBUG
                }
                finally {
                    tdsReliabilitySection.Stop();
                }
#endif //DEBUG
            }
            catch (OutOfMemoryException) {
                internalConnection.DoomThisConnection();
                throw;
            }
            catch (StackOverflowException) {
                internalConnection.DoomThisConnection();
                throw;
            }
            catch (ThreadAbortException) {
                internalConnection.DoomThisConnection();
                throw;
            }

            AbortTransaction();
        }

        //Cleans the stateobj. Used in a number of places, specially in  exceptions
        //
        private void CleanUpStateObjectOnError() {
            if (_stateObj != null) {
                _parser.Connection.ThreadHasParserLockForClose = true;
                try {
                    _stateObj.ResetBuffer();
                    _stateObj._outputPacketNumber = 1;
                    //If _parser is closed, sending attention will raise debug assertion, so we avoid it but not calling CancelRequest;
                    if (_parser.State == TdsParserState.OpenNotLoggedIn || _parser.State == TdsParserState.OpenLoggedIn) {
                        _stateObj.CancelRequest();
                    }
                    _stateObj._internalTimeout = false;
                    _stateObj.CloseSession();
                    _stateObj._bulkCopyOpperationInProgress = false;
                    _stateObj._bulkCopyWriteTimeout = false;
                    _stateObj = null;
                }
                finally {
                    _parser.Connection.ThreadHasParserLockForClose = false;
                }
            }
        }

        // The continuation part of WriteToServerInternalRest. Executes when the initial query task is completed. (see, WriteToServerInternalRest).
        // It carries on the source which is passed from the WriteToServerInternalRest and performs SetResult when the entire copy is done.
        // The carried on source may be null in case of Sync copy. So no need to SetResult at that time.
        // It launches the copy operation. 
        //
        private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet internalResults, CancellationToken cts, TaskCompletionSource<object> source) {
            Task task = null;
            string updateBulkCommandText = null;

            try {
                updateBulkCommandText = AnalyzeTargetAndCreateUpdateBulkCommand(internalResults);

                if (_sortedColumnMappings.Count != 0) { 
                    _stateObj.SniContext = SniContext.Snix_SendRows;
                    _savedBatchSize = _batchSize; // for safety. If someone changes the batchsize during copy we still be using _savedBatchSize
                    _rowsUntilNotification = _notifyAfter;
                    _rowsCopied = 0;

                    _currentRowMetadata = new SourceColumnMetadata[_sortedColumnMappings.Count];
                    for (int i = 0; i < _currentRowMetadata.Length; i++) {
                        _currentRowMetadata[i] = GetColumnMetadata(i);
                    }

                    task = CopyBatchesAsync(internalResults, updateBulkCommandText, cts); //launch the BulkCopy
                }

                if (task != null) {
                    if (source == null) {
                        source = new TaskCompletionSource<object>();
                    }
                    AsyncHelper.ContinueTask(task, source, () => {
                        //Bulk copy task is completed at this moment. 
                        //Todo: The cases may be combined for code reuse.
                        if (task.IsCanceled) {
                            _localColumnMappings = null;
                            try {
                                CleanUpStateObjectOnError();
                            }
                            finally {
                                source.SetCanceled();
                            }

                        }
                        else if (task.Exception != null) {
                            source.SetException(task.Exception.InnerException);
                        }
                        else {
                            _localColumnMappings = null;
                            try {
                                CleanUpStateObjectOnError();
                            }
                            finally {
                                if (source != null) {
                                    if (cts.IsCancellationRequested) { //We may get cancellation req even after the entire copy.
                                        source.SetCanceled();    
                                    }
                                    else {
                                        source.SetResult(null);
                                    }
                                }
                            }
                        }
                    }, _connection.GetOpenTdsConnection());
                    return;
                }
                else {
                    _localColumnMappings = null;

                    try {
                        CleanUpStateObjectOnError();
                    } catch (Exception cleanupEx) {
                        Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
                    }
                    
                    if(source != null) {
                        source.SetResult(null);
                    }
                }
            }
            catch(Exception ex){
                _localColumnMappings = null;

                try {
                    CleanUpStateObjectOnError();
                } catch (Exception cleanupEx) {
                    Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
                }

                if (source != null) {
                    source.TrySetException(ex);
                }
                else {
                    throw;
                }
            }
        }

        // Rest of the WriteToServerInternalAsync method. 
        // It carries on the source from its caller WriteToServerInternal.
        // source is null in case of Sync bcp. But valid in case of Async bcp.
        // It calls the WriteToServerInternalRestContinuedAsync as a continuation of the initial query task.
        //
        private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletionSource<object> source) {
            Debug.Assert(_hasMoreRowToCopy, "first time it is true, otherwise this method would not have been called.");
            _hasMoreRowToCopy = true;
            Task<BulkCopySimpleResultSet> internalResultsTask = null;
            BulkCopySimpleResultSet internalResults = new BulkCopySimpleResultSet();
            SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
            try {
                _parser = _connection.Parser;
                _parser._asyncWrite = _isAsyncBulkCopy; //very important!

                Task reconnectTask;
                try {
                    reconnectTask = _connection.ValidateAndReconnect(
                        () => {
                            if (_parserLock != null) {
                                _parserLock.Release();
                                _parserLock = null;
                            }
                        }, BulkCopyTimeout);
                }
                catch (SqlException ex) {
                    throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
                }

                if (reconnectTask != null) {                    
                    if (_isAsyncBulkCopy) {                        
                        CancellationTokenRegistration regReconnectCancel = new CancellationTokenRegistration();                       
                        TaskCompletionSource<object> cancellableReconnectTS = new TaskCompletionSource<object>();                        
                        if (cts.CanBeCanceled) {
                            regReconnectCancel = cts.Register(() => cancellableReconnectTS.TrySetCanceled());
                        }
                        AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); });
                        // no need to cancel timer since SqlBulkCopy creates specific task source for reconnection 
                        AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout, 
                                ()=>{return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout());}, CancellationToken.None);
                        AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source,
                            () => {                               
                                regReconnectCancel.Dispose();
                                if (_parserLock != null) {
                                    _parserLock.Release();
                                    _parserLock = null;
                                }
                                _parserLock = _connection.GetOpenTdsConnection()._parserLock;
                                _parserLock.Wait(canReleaseFromAnyThread: true);
                                WriteToServerInternalRestAsync(cts, source);
                            },
                            connectionToAbort: _connection,
                            onFailure: (e) => { regReconnectCancel.Dispose(); },
                            onCancellation: () => { regReconnectCancel.Dispose(); },
                            exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex));
                        return;
                    }
                    else {
                        try {
                            AsyncHelper.WaitForCompletion(reconnectTask, this.BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); } );
                        }
                        catch (SqlException ex) {
                            throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex); // preserve behavior (throw InvalidOperationException on failure to connect)
                        }
                        _parserLock = _connection.GetOpenTdsConnection()._parserLock;
                        _parserLock.Wait(canReleaseFromAnyThread: false);
                        WriteToServerInternalRestAsync(cts, source);
                        return;
                    }
                }
                if (_isAsyncBulkCopy) {
                    _connection.AddWeakReference(this, SqlReferenceCollection.BulkCopyTag);
                }

                internalConnection.ThreadHasParserLockForClose = true;    // In case of error, let the connection know that we already have the parser lock 

                try {
                    _stateObj = _parser.GetSession(this);
                    _stateObj._bulkCopyOpperationInProgress = true;
                    _stateObj.StartSession(ObjectID);
                }
                finally {
                    internalConnection.ThreadHasParserLockForClose = false;
                }

                try {
                    internalResultsTask = CreateAndExecuteInitialQueryAsync(out internalResults); //Task/Null
                }
                catch (SqlException ex) {
                    throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
                }

                if(internalResultsTask != null) {
                    AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), _connection.GetOpenTdsConnection());
                }
                else {
                    Debug.Assert(internalResults != null, "Executing initial query finished synchronously, but there were no results");
                    WriteToServerInternalRestContinuedAsync(internalResults, cts, source); //internalResults is valid here.
                }
            }
            catch (Exception ex) {
                if (source != null) {
                    source.TrySetException(ex);
                }
                else {
                    throw;
                }
            }            
        }
    
        // This returns Task for Async, Null for Sync
        //
        private Task WriteToServerInternalAsync(CancellationToken ctoken) {
            TaskCompletionSource<object> source = null;
            Task<object> resultTask = null;
            
            if (_isAsyncBulkCopy) {
                source = new TaskCompletionSource<object>(); //creating the completion source/Task that we pass to application
                resultTask = source.Task;

                RegisterForConnectionCloseNotification(ref resultTask);
            }

            if (_destinationTableName == null) {
                if(source != null) {
                    source.SetException(SQL.BulkLoadMissingDestinationTable()); //no table to copy
                }
                else {
                    throw SQL.BulkLoadMissingDestinationTable();
                }
                return resultTask;
            }

            try {
                Task readTask = ReadFromRowSourceAsync(ctoken); // readTask == reading task. This is the first read call. "more" is valid only if readTask == null;

                if (readTask == null) { //synchronously finished reading.
                    if (!_hasMoreRowToCopy) { //no rows in the source to copy!
                        if(source != null) {
                            source.SetResult(null); 
                        }
                        return resultTask;
                    }
                    else { //true, we have more rows.
                        WriteToServerInternalRestAsync(ctoken, source); //rest of the method, passing the same completion and returning the incomplete task (ret).
                        return resultTask;
                    }
                }
                else {
                    Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode");
                    AsyncHelper.ContinueTask(readTask, source, () => {
                        if (!_hasMoreRowToCopy) {
                            source.SetResult(null); //no rows to copy!
                        }
                        else {
                            WriteToServerInternalRestAsync(ctoken, source); //passing the same completion which will be completed by the Callee.
                        }
                    }, _connection.GetOpenTdsConnection());
                    return resultTask;
                }
            }
            catch(Exception ex) {
                if (source != null) {
                    source.TrySetException(ex);
                }
                else {
                    throw;
                }               
            }
            return resultTask;
        }
    }//End of SqlBulkCopy Class