void handleReplyBatchEvent()

in tso-server/src/main/java/org/apache/omid/tso/ReplyProcessorImpl.java [120:152]


    void handleReplyBatchEvent(ReplyBatchEvent replyBatchEvent) throws Exception {

        Batch batch = replyBatchEvent.getBatch();
        for (int i = 0; i < batch.getNumEvents(); i++) {
            PersistEvent event = batch.get(i);

            switch (event.getType()) {
                case COMMIT:
                    sendCommitResponse(event.getStartTimestamp(),
                            event.getCommitTimestamp(),
                            event.getChannel(),
                            event.getMonCtx(),
                            event.getNewLowWatermark());
                    break;
                case ABORT:
                    sendAbortResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                    break;
                case TIMESTAMP:
                    sendTimestampResponse(event.getStartTimestamp(), event.getChannel(), event.getMonCtx());
                    break;
                case FENCE:
                    sendFenceResponse(event.getStartTimestamp(), event.getCommitTimestamp(), event.getChannel(), event.getMonCtx());
                    break;
                case COMMIT_RETRY:
                    throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
                default:
                    throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
            }
            event.getMonCtx().publish();
        }

        batchPool.returnObject(batch);
    }