in src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/RetryQueue.java [38:67]
public List<ProducerBatch> expiredBatches(long timeoutMs) {
long deadline = System.currentTimeMillis() + timeoutMs;
List<ProducerBatch> expiredBatches = new ArrayList<ProducerBatch>();
retryBatches.drainTo(expiredBatches);
if (!expiredBatches.isEmpty()) {
return expiredBatches;
}
while (true) {
if (timeoutMs < 0) {
break;
}
ProducerBatch batch;
try {
batch = retryBatches.poll(timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.info("Interrupted when poll batch from the retry batches");
break;
}
if (batch == null) {
break;
}
expiredBatches.add(batch);
retryBatches.drainTo(expiredBatches);
if (!expiredBatches.isEmpty()) {
break;
}
timeoutMs = deadline - System.currentTimeMillis();
}
return expiredBatches;
}