in tablestore/src/main/java/com/alicloud/openservices/tablestore/writer/handle/RowEventHandler.java [88:148]
public void onEvent(final RowChangeEvent rowChangeEvent, long sequence, boolean endOfBatch) throws Exception {
boolean shouldWaitFlush = false;
CountDownLatch latch = null;
RequestWithGroups requestWithGroups = null;
if (rowChangeEvent.type == RowChangeEvent.EventType.FLUSH) {
logger.debug("FlushSignal with QueueSize: {}", requestManager.getTotalRowsCount());
if (requestManager.getTotalRowsCount() > 0) {
requestWithGroups = requestManager.makeRequest();
}
shouldWaitFlush = true;
latch = rowChangeEvent.latch;
} else {
writerStatistics.totalRowsCount.incrementAndGet();
final RowChange rowChange = rowChangeEvent.rowChange;
final RowChangeWithGroup rowChangeWithGroup = new RowChangeWithGroup(rowChangeEvent.rowChange, rowChangeEvent.group);
boolean succeed = requestManager.appendRowChange(rowChangeWithGroup);
if (!succeed) {
requestWithGroups = requestManager.makeRequest();
succeed = requestManager.appendRowChange(rowChangeWithGroup);
if (!succeed) {
executor.execute(new Runnable() {
@Override
public void run() {
writerStatistics.totalFailedRowsCount.incrementAndGet();
ClientException exception = new ClientException("Can not even append only one row into buffer.");
logger.error("RowChange Failed: ", exception);
rowChangeWithGroup.group.failedOneRow(rowChangeWithGroup.rowChange, exception);
if (callback != null) {
callback.onFailed(rowChange, exception);
}
}
});
}
}
}
if (requestWithGroups != null) {
final RequestWithGroups finalRequestWithGroups = requestWithGroups;
bucketSemaphore.acquire(); // First, block and wait for the bucket semaphore.
callbackSemaphore.acquire(); // Post-block waiting for thread pool signals
executor.execute(new Runnable() {
@Override
public void run() {
writerStatistics.totalRequestCount.incrementAndGet();
requestManager.sendRequest(finalRequestWithGroups);
}
});
}
if (shouldWaitFlush) {
bucketSemaphore.acquire(bucketConcurrency);
bucketSemaphore.release(bucketConcurrency);
logger.debug("Finish bucket waitFlush.");
latch.countDown();
}
}