in tablestore/src/main/java/com/alicloud/openservices/tablestore/reader/ReaderEventHandler.java [47:95]
public void onEvent(ReaderEvent readerEvent, long l, boolean b) throws Exception {
boolean shouldWaitFlush = false;
CountDownLatch latch = null;
ReqWithGroups reqWithGroups = null;
if (readerEvent.type == ReaderEvent.EventType.FLUSH) {
logger.debug("FlushSignal with QueueSize: {}", requestManager.getTotalPksCount());
if (requestManager.getTotalPksCount() > 0) {
reqWithGroups = requestManager.makeRequest();
}
shouldWaitFlush = true;
latch = readerEvent.latch;
} else if (readerEvent.type == EventType.SEND) {
logger.debug("SendSignal with QueueSize: {}", requestManager.getTotalPksCount());
if (requestManager.getTotalPksCount() > 0) {
reqWithGroups = requestManager.makeRequest();
}
} else {
statistics.totalRowsCount.incrementAndGet();
final PrimaryKeyWithTable primaryKeyWithTable = readerEvent.pkWithTable;
final PkWithGroup pkWithGroup = new PkWithGroup(primaryKeyWithTable, readerEvent.readerGroup);
boolean succeed = requestManager.appendPrimaryKey(pkWithGroup);
if (!succeed) {
// Indicates that the request size has reached the upper limit.
reqWithGroups = requestManager.makeRequest();
requestManager.appendPrimaryKey(pkWithGroup);
}
}
if (reqWithGroups != null) {
final ReqWithGroups finalRequest = reqWithGroups;
bucketSemaphore.acquire(); // First, block and wait for the bucket semaphore.
callbackSemaphore.acquire(); // Post-block waiting for thread pool signal
executor.execute(new Runnable() {
@Override
public void run() {
statistics.totalRequestCount.incrementAndGet();
requestManager.sendRequest(finalRequest);
}
});
}
if (shouldWaitFlush) {
bucketSemaphore.acquire(bucketConcurrency);
bucketSemaphore.release(bucketConcurrency);
logger.debug("Finish bucket waitFlush.");
latch.countDown();
}
}