in transaction-client/src/main/java/org/apache/omid/transaction/AbstractTransactionManager.java [395:469]
private void commitRegularTransaction(AbstractTransaction<? extends CellId> tx)
throws RollbackException, TransactionException
{
try {
long commitTs = tsoClient.commit(tx.getStartTimestamp(), tx.getWriteSet(), tx.getConflictFreeWriteSet()).get();
certifyCommitForTx(tx, commitTs);
updateShadowCellsAndRemoveCommitTableEntry(tx, postCommitter);
} catch (ExecutionException e) {
if (e.getCause() instanceof AbortException) { // TSO reports Tx conflicts as AbortExceptions in the future
rollback(tx);
rolledbackTxsCounter.inc();
throw new RollbackException(tx.getStartTimestamp() + ": Conflicts detected in writeset", e.getCause());
}
if (e.getCause() instanceof ServiceUnavailableException || e.getCause() instanceof ConnectionException) {
errorTxsCounter.inc();
try {
LOG.warn("Can't contact the TSO for receiving outcome for Tx {}. Checking Commit Table...", tx.getStartTimestamp());
// Check the commit table to find if the target TSO woke up in the meantime and added the commit
// TODO: Decide what we should we do if we can not contact the commit table
Optional<CommitTimestamp> commitTimestamp =
commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
if (commitTimestamp.isPresent()) {
if (commitTimestamp.get().isValid()) {
LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx.getStartTimestamp());
certifyCommitForTx(tx, commitTimestamp.get().getValue());
postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
} else { // Probably another Tx in a new TSO Server invalidated this transaction
LOG.warn("{}: Invalidated commit TS found in Commit Table. Rolling-back...", tx.getStartTimestamp());
rollback(tx);
throw new RollbackException(tx.getStartTimestamp() + " invalidated by other Tx started", e.getCause());
}
} else {
LOG.warn("{}: Trying to invalidate Tx proactively in Commit Table...", tx.getStartTimestamp());
boolean invalidated = commitTableClient.tryInvalidateTransaction(tx.getStartTimestamp()).get();
if (invalidated) {
LOG.warn("{}: Invalidated proactively in Commit Table. Rolling-back Tx...", tx.getStartTimestamp());
invalidatedTxsCounter.inc();
rollback(tx); // Rollback proactively cause it's likely that a new TSOServer is now master
throw new RollbackException(tx.getStartTimestamp() + " rolled-back precautionary", e.getCause());
} else {
LOG.warn("{}: Invalidation could NOT be completed. Re-checking Commit Table...", tx.getStartTimestamp());
// TODO: Decide what we should we do if we can not contact the commit table
commitTimestamp = commitTableClient.getCommitTimestamp(tx.getStartTimestamp()).get();
if (commitTimestamp.isPresent() && commitTimestamp.get().isValid()) {
LOG.warn("{}: Valid commit TS found in Commit Table. Committing Tx...", tx.getStartTimestamp());
certifyCommitForTx(tx, commitTimestamp.get().getValue());
postCommitter.updateShadowCells(tx); // But do NOT remove transaction from commit table
} else {
LOG.error("{}: Can't determine Transaction outcome", tx.getStartTimestamp());
throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome");
}
}
}
} catch (ExecutionException e1) {
throw new TransactionException(tx.getStartTimestamp() + ": problem reading commitTS from Commit Table", e1);
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
throw new TransactionException(tx.getStartTimestamp() + ": interrupted while reading commitTS from Commit Table", e1);
}
} else {
throw new TransactionException(tx.getStartTimestamp() + ": cannot determine Tx outcome", e.getCause());
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new TransactionException(tx.getStartTimestamp() + ": interrupted during commit", ie);
}
}