in src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/LogAccumulator.java [90:139]
private ListenableFuture<Result> doAppend(
String project,
String logStore,
String topic,
String source,
String shardHash,
List<LogItem> logItems,
Callback callback)
throws InterruptedException, ProducerException {
if (closed) {
throw new IllegalStateException("cannot append after the log accumulator was closed");
}
int sizeInBytes = LogSizeCalculator.calculate(logItems);
ensureValidLogSize(sizeInBytes);
long maxBlockMs = producerConfig.getMaxBlockMs();
LOGGER.trace(
"Prepare to acquire bytes, sizeInBytes={}, maxBlockMs={}, project={}, logStore={}",
sizeInBytes,
maxBlockMs,
project,
logStore);
if (maxBlockMs >= 0) {
boolean acquired =
memoryController.tryAcquire(sizeInBytes, maxBlockMs, TimeUnit.MILLISECONDS);
if (!acquired) {
LOGGER.warn(
"Failed to acquire memory within the configured max blocking time {} ms, "
+ "requiredSizeInBytes={}, availableSizeInBytes={}",
producerConfig.getMaxBlockMs(),
sizeInBytes,
memoryController.availablePermits());
throw new TimeoutException(
"failed to acquire memory within the configured max blocking time "
+ producerConfig.getMaxBlockMs()
+ " ms");
}
} else {
memoryController.acquire(sizeInBytes);
}
try {
GroupKey groupKey = new GroupKey(project, logStore, topic, source, shardHash);
ProducerBatchHolder holder = getOrCreateProducerBatchHolder(groupKey);
synchronized (holder) {
return appendToHolder(groupKey, logItems, callback, sizeInBytes, holder);
}
} catch (Exception e) {
memoryController.release(sizeInBytes);
throw new ProducerException(e);
}
}