public void onEvent()

in tso-server/src/main/java/org/apache/omid/tso/PersistenceProcessorHandler.java [94:155]


    public void onEvent(PersistenceProcessorImpl.PersistBatchEvent batchEvent) throws Exception {

        int commitEventsToFlush = 0;
        Batch batch = batchEvent.getBatch();
        int numOfBatchedEvents = batch.getNumEvents();
        batchSizeHistogram.update(numOfBatchedEvents);
        for (int i=0; i < numOfBatchedEvents; i++) {
            PersistEvent event = batch.get(i);
            switch (event.getType()) {
                case TIMESTAMP:
                    event.getMonCtx().timerStop("persistence.processor.timestamp.latency");
                    break;
                case COMMIT:
                    writer.addCommittedTransaction(event.getStartTimestamp(), event.getCommitTimestamp());
                    commitEventsToFlush++;
                    break;
                case COMMIT_RETRY:
                    event.getMonCtx().timerStop("persistence.processor.commit-retry.latency");
                    break;
                case ABORT:
                    event.getMonCtx().timerStop("persistence.processor.abort.latency");
                    break;
                case FENCE:
                    // Persist the fence by using the fence identifier as both the start and commit timestamp.
                    writer.addCommittedTransaction(event.getCommitTimestamp(), event.getCommitTimestamp());
                    commitEventsToFlush++;
                    break;
                default:
                    throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
            }
        }

        // Flush and send the responses back to the client. WARNING: Before sending the responses, first we need
        // to filter commit retries in the batch to disambiguate them.
        flush(commitEventsToFlush);
        filterAndDissambiguateClientRetries(batch);
        for (int i=0; i < batch.getNumEvents(); i++) { // Just for statistics
            PersistEvent event = batch.get(i);
            switch (event.getType()) {
                case TIMESTAMP:
                    event.getMonCtx().timerStart("reply.processor.timestamp.latency");
                    break;
                case COMMIT:
                    event.getMonCtx().timerStop("persistence.processor.commit.latency");
                    event.getMonCtx().timerStart("reply.processor.commit.latency");
                    break;
                case COMMIT_RETRY:
                    throw new IllegalStateException("COMMIT_RETRY events must be filtered before this step: " + event);
                case ABORT:
                    event.getMonCtx().timerStart("reply.processor.abort.latency");
                    break;
                case FENCE:
                    event.getMonCtx().timerStop("persistence.processor.fence.latency");
                    event.getMonCtx().timerStart("reply.processor.fence.latency");
                    break;
                default:
                    throw new IllegalStateException("Event not allowed in Persistent Processor Handler: " + event);
            }
        }
        replyProcessor.manageResponsesBatch(batchEvent.getBatchSequence(), batch);

    }