public void onEvent()

in tablestore/src/main/java/com/alicloud/openservices/tablestore/writer/handle/RowEventHandler.java [88:148]


    public void onEvent(final RowChangeEvent rowChangeEvent, long sequence, boolean endOfBatch) throws Exception {

        boolean shouldWaitFlush = false;
        CountDownLatch latch = null;
        RequestWithGroups requestWithGroups = null;

        if (rowChangeEvent.type == RowChangeEvent.EventType.FLUSH) {
            logger.debug("FlushSignal with QueueSize: {}", requestManager.getTotalRowsCount());
            if (requestManager.getTotalRowsCount() > 0) {
                requestWithGroups = requestManager.makeRequest();
            }

            shouldWaitFlush = true;
            latch = rowChangeEvent.latch;
        } else {
            writerStatistics.totalRowsCount.incrementAndGet();
            final RowChange rowChange = rowChangeEvent.rowChange;
            final RowChangeWithGroup rowChangeWithGroup = new RowChangeWithGroup(rowChangeEvent.rowChange, rowChangeEvent.group);

            boolean succeed = requestManager.appendRowChange(rowChangeWithGroup);
            if (!succeed) {
                requestWithGroups = requestManager.makeRequest();
                succeed = requestManager.appendRowChange(rowChangeWithGroup);

                if (!succeed) {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            writerStatistics.totalFailedRowsCount.incrementAndGet();
                            ClientException exception = new ClientException("Can not even append only one row into buffer.");
                            logger.error("RowChange Failed: ", exception);
                            rowChangeWithGroup.group.failedOneRow(rowChangeWithGroup.rowChange, exception);
                            if (callback != null) {
                                callback.onFailed(rowChange, exception);
                            }
                        }
                    });
                }
            }
        }

        if (requestWithGroups != null) {
            final RequestWithGroups finalRequestWithGroups = requestWithGroups;
            bucketSemaphore.acquire();      // First, block and wait for the bucket semaphore.
            callbackSemaphore.acquire();    // Post-block waiting for thread pool signals
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    writerStatistics.totalRequestCount.incrementAndGet();
                    requestManager.sendRequest(finalRequestWithGroups);
                }
            });
        }

        if (shouldWaitFlush) {
            bucketSemaphore.acquire(bucketConcurrency);
            bucketSemaphore.release(bucketConcurrency);
            logger.debug("Finish bucket waitFlush.");
            latch.countDown();
        }
    }