private ListenableFuture doAppend()

in src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/LogAccumulator.java [90:139]


  private ListenableFuture<Result> doAppend(
      String project,
      String logStore,
      String topic,
      String source,
      String shardHash,
      List<LogItem> logItems,
      Callback callback)
      throws InterruptedException, ProducerException {
    if (closed) {
      throw new IllegalStateException("cannot append after the log accumulator was closed");
    }
    int sizeInBytes = LogSizeCalculator.calculate(logItems);
    ensureValidLogSize(sizeInBytes);
    long maxBlockMs = producerConfig.getMaxBlockMs();
    LOGGER.trace(
        "Prepare to acquire bytes, sizeInBytes={}, maxBlockMs={}, project={}, logStore={}",
        sizeInBytes,
        maxBlockMs,
        project,
        logStore);
    if (maxBlockMs >= 0) {
      boolean acquired =
          memoryController.tryAcquire(sizeInBytes, maxBlockMs, TimeUnit.MILLISECONDS);
      if (!acquired) {
        LOGGER.warn(
            "Failed to acquire memory within the configured max blocking time {} ms, "
                + "requiredSizeInBytes={}, availableSizeInBytes={}",
            producerConfig.getMaxBlockMs(),
            sizeInBytes,
            memoryController.availablePermits());
        throw new TimeoutException(
            "failed to acquire memory within the configured max blocking time "
                + producerConfig.getMaxBlockMs()
                + " ms");
      }
    } else {
      memoryController.acquire(sizeInBytes);
    }
    try {
      GroupKey groupKey = new GroupKey(project, logStore, topic, source, shardHash);
      ProducerBatchHolder holder = getOrCreateProducerBatchHolder(groupKey);
      synchronized (holder) {
        return appendToHolder(groupKey, logItems, callback, sizeInBytes, holder);
      }
    } catch (Exception e) {
      memoryController.release(sizeInBytes);
      throw new ProducerException(e);
    }
  }