in src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/Mover.java [83:98]
private void doMoveBatches() {
ExpiredBatches expiredBatches = accumulator.expiredBatches();
LOGGER.debug(
"Expired batches from accumulator, size={}, remainingMs={}",
expiredBatches.getBatches().size(),
expiredBatches.getRemainingMs());
for (ProducerBatch b : expiredBatches.getBatches()) {
ioThreadPool.submit(createSendProducerBatchTask(b));
}
List<ProducerBatch> expiredRetryBatches =
retryQueue.expiredBatches(expiredBatches.getRemainingMs());
LOGGER.debug("Expired batches from retry queue, size={}", expiredRetryBatches.size());
for (ProducerBatch b : expiredRetryBatches) {
ioThreadPool.submit(createSendProducerBatchTask(b));
}
}