in tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java [120:152]
void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {
Batch batch = replyBatchEvent.getBatch();
for (int i = 0; i < batch.getNumEvents(); i++) {
PersistEvent event = batch.get(i);
switch (event.getType()) {
case COMMIT:
sendCommitResponse(event.getStartTimestamp(),
event.getCommitTimestamp(),
event.getChannel(),
event.getMonCtx(),
event.getNewLowWatermark());
break;
case ABORT:
sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
break;
case TIMESTAMP:
sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
break;
case FENCE:
sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
break;
case COMMIT_RETRY:
throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
default:
throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
}
event.getMonCtx().publish();
}
batchPool.returnObject(batch);
}