public void onEvent()

in tablestore/src/main/java/com/alicloud/openservices/tablestore/reader/ReaderEventHandler.java [47:95]


    public void onEvent(ReaderEvent readerEvent, long l, boolean b) throws Exception {
        boolean shouldWaitFlush = false;
        CountDownLatch latch = null;
        ReqWithGroups reqWithGroups = null;

        if (readerEvent.type == ReaderEvent.EventType.FLUSH) {
            logger.debug("FlushSignal with QueueSize: {}", requestManager.getTotalPksCount());
            if (requestManager.getTotalPksCount() > 0) {
                reqWithGroups = requestManager.makeRequest();
            }
            shouldWaitFlush = true;
            latch = readerEvent.latch;
        } else if (readerEvent.type == EventType.SEND) {
            logger.debug("SendSignal with QueueSize: {}", requestManager.getTotalPksCount());
            if (requestManager.getTotalPksCount() > 0) {
                reqWithGroups = requestManager.makeRequest();
            }
        } else {
            statistics.totalRowsCount.incrementAndGet();
            final PrimaryKeyWithTable primaryKeyWithTable = readerEvent.pkWithTable;
            final PkWithGroup pkWithGroup = new PkWithGroup(primaryKeyWithTable, readerEvent.readerGroup);
            boolean succeed = requestManager.appendPrimaryKey(pkWithGroup);
            if (!succeed) {
                // Indicates that the request size has reached the upper limit.
                reqWithGroups = requestManager.makeRequest();
                requestManager.appendPrimaryKey(pkWithGroup);
            }
        }

        if (reqWithGroups != null) {
            final ReqWithGroups finalRequest = reqWithGroups;
            bucketSemaphore.acquire();      // First, block and wait for the bucket semaphore.
            callbackSemaphore.acquire();    // Post-block waiting for thread pool signal
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    statistics.totalRequestCount.incrementAndGet();
                    requestManager.sendRequest(finalRequest);
                }
            });
        }

        if (shouldWaitFlush) {
            bucketSemaphore.acquire(bucketConcurrency);
            bucketSemaphore.release(bucketConcurrency);
            logger.debug("Finish bucket waitFlush.");
            latch.countDown();
        }
    }