in tablestore/src/main/java/com/alicloud/openservices/tablestore/timeserieswriter/handle/TimeseriesRowEventHandler.java [83:165]
public void onEvent(TimeseriesRowEvent timeseriesRowEvent, long l, boolean b) throws Exception {
boolean shouldWaitFlush = false;
CountDownLatch latch = null;
Map<String, TimeseriesRequestWithGroups> timeseriesRequestWithGroupsMap = new HashMap<String, TimeseriesRequestWithGroups>();
if (timeseriesRowEvent.type == TimeseriesRowEvent.EventType.FLUSH) {
int totalRowsCount = 0;
for (Map.Entry<String, TimeseriesRequestManager> entry : map.entrySet()) {
if (entry.getValue().getTotalRowsCount() > 0) {
timeseriesRequestWithGroupsMap.put(entry.getKey(), entry.getValue().makeRequest(entry.getKey()));
totalRowsCount += entry.getValue().getTotalRowsCount();
}
}
logger.debug("FlushSignal with QueueSize: {}", totalRowsCount);
shouldWaitFlush = true;
latch = timeseriesRowEvent.latch;
} else {
timeseriesWriterHandleStatistics.totalRowsCount.incrementAndGet();
final TimeseriesTableRow timeseriesTableRow = timeseriesRowEvent.timeseriesTableRow;
final TimeseriesRowWithGroup timeseriesRowWithGroup = new TimeseriesRowWithGroup(timeseriesRowEvent.timeseriesTableRow, timeseriesRowEvent.timeseriesGroup);
if (!map.containsKey(timeseriesTableRow.getTableName())) {
map.put(timeseriesTableRow.getTableName(), new TimeseriesBatchRequestManager(this.ots, timeseriesWriterConfig, this.timeseriesBucketConfig, executor,
timeseriesWriterHandleStatistics,
callback,
callbackSemaphore, bucketSemaphore));
}
boolean succeed = map.get(timeseriesTableRow.getTableName()).appendTimeseriesRow(timeseriesRowWithGroup);
// The first failure may be due to:
// 1. The number of rows has reached the maximum limit
// 2. The size of rows has reached the maximum limit
// 3. Duplicate timeseries key in cache
// 4. The format of the row is incorrect
if (!succeed) {
// For the case of 1, 2, 3 above, make request and try to add row to the cache again
TimeseriesRequestWithGroups timeseriesRequestWithGroups = map.get(timeseriesTableRow.getTableName()).makeRequest(timeseriesTableRow.getTableName());
if (timeseriesRequestWithGroups != null) {
timeseriesRequestWithGroupsMap.put(timeseriesTableRow.getTableName(), timeseriesRequestWithGroups);
}
// The second failure may be due to:
// 1. Only one row size reaches the maximum limit
// 2. The format of the row is incorrect
succeed = map.get(timeseriesTableRow.getTableName()).appendTimeseriesRow(timeseriesRowWithGroup);
if (!succeed) {
executor.execute(new Runnable() {
@Override
public void run() {
timeseriesWriterHandleStatistics.totalFailedRowsCount.incrementAndGet();
ClientException exception = new ClientException("Failed to append timeseries row into buffer.");
logger.error("RowChange Failed: ", exception);
timeseriesRowWithGroup.timeseriesGroup.failedOneRow(timeseriesRowWithGroup.timeseriesTableRow, exception);
if (callback != null) {
callback.onFailed(timeseriesTableRow, exception);
}
}
});
}
}
}
if (!timeseriesRequestWithGroupsMap.isEmpty()) {
for (final Map.Entry<String, TimeseriesRequestWithGroups> entry : timeseriesRequestWithGroupsMap.entrySet()) {
bucketSemaphore.acquire(); // First, block and wait for the bucket semaphore.
callbackSemaphore.acquire(); // Post-block waiting for thread pool signals
executor.execute(new Runnable() {
@Override
public void run() {
timeseriesWriterHandleStatistics.totalRequestCount.incrementAndGet();
map.get(entry.getKey()).sendRequest(entry.getValue());
}
});
}
}
if (shouldWaitFlush) {
bucketSemaphore.acquire(bucketConcurrency);
bucketSemaphore.release(bucketConcurrency);
logger.debug("Finish bucket waitFlush.");
latch.countDown();
}
}