in src/NetTxMessageConsumer.cs [69:115]
public override async Task BeforeMessageIsConsumedAsync(MessageDispatch dispatch)
{
if (!IsAutoAcknowledgeBatch)
{
if (this.session.IsTransacted)
{
bool waitForDtcWaitHandle = false;
using (await this.transactionContext.SyncRoot.LockAsync().Await())
{
// In the case where the consumer is operating in concert with a
// distributed TX manager we need to wait whenever the TX is being
// controlled by the DTC as it completes all operations async and
// we cannot start consumption again until all its tasks have completed.)
var currentTransactionId = transactionContext.TransactionId as XATransactionId;
string currentLocalTxId = currentTransactionId != null
? UTF8Encoding.UTF8.GetString(currentTransactionId.GlobalTransactionId)
: "NONE";
if (Transaction.Current != null)
{
waitForDtcWaitHandle = this.transactionContext.InNetTransaction &&
this.transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending ||
currentLocalTxId != Transaction.Current.TransactionInformation.LocalIdentifier;
}
else
{
waitForDtcWaitHandle = this.transactionContext.InNetTransaction &&
this.transactionContext.NetTxState == NetTxTransactionContext.TxState.Pending;
}
}
//if session EnlistMsDtcNativeResource the transaction does not need to wait
if (this.session.EnlistsMsDtcNativeResource)
{
waitForDtcWaitHandle = false;
}
if (waitForDtcWaitHandle)
{
this.transactionContext.DtcWaitHandle.WaitOne();
}
}
}
await base.BeforeMessageIsConsumedAsync(dispatch).Await();
}