public void onEvent()

in tablestore/src/main/java/com/alicloud/openservices/tablestore/timeserieswriter/handle/TimeseriesRowEventHandler.java [83:165]


    public void onEvent(TimeseriesRowEvent timeseriesRowEvent, long l, boolean b) throws Exception {
        boolean shouldWaitFlush = false;
        CountDownLatch latch = null;
        Map<String, TimeseriesRequestWithGroups> timeseriesRequestWithGroupsMap = new HashMap<String, TimeseriesRequestWithGroups>();

        if (timeseriesRowEvent.type == TimeseriesRowEvent.EventType.FLUSH) {
            int totalRowsCount = 0;
            for (Map.Entry<String, TimeseriesRequestManager> entry : map.entrySet()) {
                if (entry.getValue().getTotalRowsCount() > 0) {
                    timeseriesRequestWithGroupsMap.put(entry.getKey(), entry.getValue().makeRequest(entry.getKey()));
                    totalRowsCount += entry.getValue().getTotalRowsCount();
                }
            }

            logger.debug("FlushSignal with QueueSize: {}", totalRowsCount);
            shouldWaitFlush = true;
            latch = timeseriesRowEvent.latch;
        } else {
            timeseriesWriterHandleStatistics.totalRowsCount.incrementAndGet();
            final TimeseriesTableRow timeseriesTableRow = timeseriesRowEvent.timeseriesTableRow;
            final TimeseriesRowWithGroup timeseriesRowWithGroup = new TimeseriesRowWithGroup(timeseriesRowEvent.timeseriesTableRow, timeseriesRowEvent.timeseriesGroup);
            if (!map.containsKey(timeseriesTableRow.getTableName())) {
                map.put(timeseriesTableRow.getTableName(), new TimeseriesBatchRequestManager(this.ots, timeseriesWriterConfig, this.timeseriesBucketConfig, executor,
                        timeseriesWriterHandleStatistics,
                        callback,
                        callbackSemaphore, bucketSemaphore));
            }
            boolean succeed = map.get(timeseriesTableRow.getTableName()).appendTimeseriesRow(timeseriesRowWithGroup);
            // The first failure may be due to:
            // 1. The number of rows has reached the maximum limit
            // 2. The size of rows has reached the maximum limit
            // 3. Duplicate timeseries key in cache
            // 4. The format of the row is incorrect
            if (!succeed) {
                // For the case of 1, 2, 3 above, make request and try to add row to the cache again
                TimeseriesRequestWithGroups timeseriesRequestWithGroups = map.get(timeseriesTableRow.getTableName()).makeRequest(timeseriesTableRow.getTableName());
                if (timeseriesRequestWithGroups != null) {
                    timeseriesRequestWithGroupsMap.put(timeseriesTableRow.getTableName(), timeseriesRequestWithGroups);
                }
                // The second failure may be due to:
                // 1. Only one row size reaches the maximum limit
                // 2. The format of the row is incorrect
                succeed = map.get(timeseriesTableRow.getTableName()).appendTimeseriesRow(timeseriesRowWithGroup);
                if (!succeed) {
                    executor.execute(new Runnable() {
                        @Override
                        public void run() {
                            timeseriesWriterHandleStatistics.totalFailedRowsCount.incrementAndGet();
                            ClientException exception = new ClientException("Failed to append timeseries row into buffer.");
                            logger.error("RowChange Failed: ", exception);
                            timeseriesRowWithGroup.timeseriesGroup.failedOneRow(timeseriesRowWithGroup.timeseriesTableRow, exception);
                            if (callback != null) {
                                callback.onFailed(timeseriesTableRow, exception);
                            }
                        }
                    });
                }
            }
        }

        if (!timeseriesRequestWithGroupsMap.isEmpty()) {

            for (final Map.Entry<String, TimeseriesRequestWithGroups> entry : timeseriesRequestWithGroupsMap.entrySet()) {

                bucketSemaphore.acquire();      // First, block and wait for the bucket semaphore.
                callbackSemaphore.acquire();    // Post-block waiting for thread pool signals
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        timeseriesWriterHandleStatistics.totalRequestCount.incrementAndGet();
                        map.get(entry.getKey()).sendRequest(entry.getValue());
                    }
                });
            }
        }

        if (shouldWaitFlush) {
            bucketSemaphore.acquire(bucketConcurrency);
            bucketSemaphore.release(bucketConcurrency);
            logger.debug("Finish bucket waitFlush.");
            latch.countDown();
        }
    }