in src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/LogAccumulator.java [141:193]
private ListenableFuture<Result> appendToHolder(
GroupKey groupKey,
List<LogItem> logItems,
Callback callback,
int sizeInBytes,
ProducerBatchHolder holder) {
if (holder.producerBatch != null) {
ListenableFuture<Result> f = holder.producerBatch.tryAppend(logItems, sizeInBytes, callback);
if (f != null) {
if (holder.producerBatch.isMeetSendCondition()) {
holder.transferProducerBatch(
ioThreadPool,
producerConfig,
clientPool,
retryQueue,
successQueue,
failureQueue,
batchCount);
}
return f;
} else {
holder.transferProducerBatch(
ioThreadPool,
producerConfig,
clientPool,
retryQueue,
successQueue,
failureQueue,
batchCount);
}
}
holder.producerBatch =
new ProducerBatch(
groupKey,
Utils.generatePackageId(producerHash, BATCH_ID),
producerConfig.getBatchSizeThresholdInBytes(),
producerConfig.getBatchCountThreshold(),
producerConfig.getMaxReservedAttempts(),
System.currentTimeMillis());
ListenableFuture<Result> f = holder.producerBatch.tryAppend(logItems, sizeInBytes, callback);
batchCount.incrementAndGet();
if (holder.producerBatch.isMeetSendCondition()) {
holder.transferProducerBatch(
ioThreadPool,
producerConfig,
clientPool,
retryQueue,
successQueue,
failureQueue,
batchCount);
}
return f;
}