in tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java [94:155]
public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {
int commitEventsToFlush = 0;
Batch batch = batchEvent.getBatch();
int numOfBatchedEvents = batch.getNumEvents();
batchSizeHistogram.update(numOfBatchedEvents);
for (int i=0; i < numOfBatchedEvents; i++) {
PersistEvent event = batch.get(i);
switch (event.getType()) {
case TIMESTAMP:
event.getMonCtx().timerStop("persistence.processor.timestamp.latency");
break;
case COMMIT:
writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
commitEventsToFlush++;
break;
case COMMIT_RETRY:
event.getMonCtx().timerStop("persistence.processor.commit-retry.latency");
break;
case ABORT:
event.getMonCtx().timerStop("persistence.processor.abort.latency");
break;
case FENCE:
// Persist the fence by using the fence identifier as both the start and commit timestamp.
writer.addCommittedTransaction(event.getCommitTimestamp(), event.getCommitTimestamp());
commitEventsToFlush++;
break;
default:
throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
}
}
// Flush and send the responses back to the client. WARNING: Before sending the responses, first we need
// to filter commit retries in the batch to disambiguate them.
flush(commitEventsToFlush);
filterAndDissambiguateClientRetries(batch);
for (int i=0; i < batch.getNumEvents(); i++) { // Just for statistics
PersistEvent event = batch.get(i);
switch (event.getType()) {
case TIMESTAMP:
event.getMonCtx().timerStart("reply.processor.timestamp.latency");
break;
case COMMIT:
event.getMonCtx().timerStop("persistence.processor.commit.latency");
event.getMonCtx().timerStart("reply.processor.commit.latency");
break;
case COMMIT_RETRY:
throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
case ABORT:
event.getMonCtx().timerStart("reply.processor.abort.latency");
break;
case FENCE:
event.getMonCtx().timerStop("persistence.processor.fence.latency");
event.getMonCtx().timerStart("reply.processor.fence.latency");
break;
default:
throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
}
}
replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);
}