in tso-server/src/main/java/org/apache/omid/tso/AbstractRequestProcessor.java [219:267]
private void handleCommit(RequestEvent event) throws Exception {
long startTimestamp = event.getStartTimestamp();
Iterable<Long> writeSet = event.writeSet();
Collection<Long> tableIdSet = event.getTableIdSet();
boolean isCommitRetry = event.isCommitRetry();
Channel c = event.getChannel();
boolean nonEmptyWriteSet = writeSet.iterator().hasNext();
// If the transaction started before the low watermark, or
// it started before a fence and modified the table the fence created for, or
// it has a write-write conflict with a transaction committed after it started
// Then it should abort. Otherwise, it can commit.
if (startTimestamp > lowWatermark &&
!hasConflictsWithFences(startTimestamp, tableIdSet) &&
!hasConflictsWithCommittedTransactions(startTimestamp, writeSet)) {
long commitTimestamp = timestampOracle.next();
Optional<Long> forwardNewWaterMark = Optional.absent();
if (nonEmptyWriteSet) {
long newLowWatermark = lowWatermark;
for (long r : writeSet) {
long removed = hashmap.putLatestWriteForCell(r, commitTimestamp);
newLowWatermark = Math.max(removed, newLowWatermark);
}
if (newLowWatermark != lowWatermark) {
LOG.trace("Setting new low Watermark to {}", newLowWatermark);
lowWatermark = newLowWatermark;
forwardNewWaterMark = Optional.of(lowWatermark);
}
}
event.getMonCtx().timerStop("request.processor.commit.latency");
forwardCommit(startTimestamp, commitTimestamp, c, event.getMonCtx(), forwardNewWaterMark);
} else {
event.getMonCtx().timerStop("request.processor.commit.latency");
if (isCommitRetry) { // Re-check if it was already committed but the client retried due to a lag replying
forwardCommitRetry(startTimestamp, c, event.getMonCtx());
} else {
forwardAbort(startTimestamp, c, event.getMonCtx());
}
}
}