in src/State/ConnectionStateTracker.cs [133:205]
private void DoRestoreTransactions(ITransport transport, ConnectionState connectionState)
{
AtomicCollection<TransactionState> transactionStates = connectionState.TransactionStates;
List<TransactionInfo> toRollback = new List<TransactionInfo>();
foreach(TransactionState transactionState in transactionStates)
{
// rollback any completed transactions - no way to know if commit got there
// or if reply went missing
if (transactionState.Commands.Count != 0)
{
Command lastCommand = transactionState.Commands[transactionState.Commands.Count - 1];
if (lastCommand.IsTransactionInfo)
{
TransactionInfo transactionInfo = lastCommand as TransactionInfo;
if (transactionInfo.Type == TransactionInfo.COMMIT_ONE_PHASE)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("rolling back potentially completed tx: " +
transactionState.Id);
}
toRollback.Add(transactionInfo);
continue;
}
}
}
// replay the add and remove of short lived producers that may have been
// involved in the transaction
foreach (ProducerState producerState in transactionState.ProducerStates)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("tx replay producer :" + producerState.Info);
}
transport.Oneway(producerState.Info);
}
foreach (Command command in transactionState.Commands)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("tx replay: " + command);
}
transport.Oneway(command);
}
foreach (ProducerState producerState in transactionState.ProducerStates)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("tx remove replayed producer :" + producerState.Info);
}
RemoveInfo producerRemove = new RemoveInfo();
producerRemove.ObjectId = producerState.Info.ProducerId;
transport.Oneway(producerRemove);
}
}
foreach (TransactionInfo command in toRollback)
{
// respond to the outstanding commit
ExceptionResponse response = new ExceptionResponse();
response.Exception = new BrokerError();
response.Exception.Message =
"Transaction completion in doubt due to failover. Forcing rollback of " + command.TransactionId;
response.Exception.ExceptionClass = (new TransactionRolledBackException()).GetType().FullName;
response.CorrelationId = command.CommandId;
transport.CommandAsync(transport, response).GetAsyncResult();
}
}