private void sendProducerBatch()

in src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/SendProducerBatchTask.java [74:143]


  private void sendProducerBatch(long nowMs) throws InterruptedException {
    LOGGER.trace("Prepare to send producer batch, batch={}", batch);
    String project = batch.getProject();
    Client client = getClient(project);
    if (client == null) {
      LOGGER.error("Failed to get client, project={}", project);
      Attempt attempt =
          new Attempt(
              false,
              "",
              Errors.PROJECT_CONFIG_NOT_EXIST,
              "Cannot get the projectConfig for project " + project,
              nowMs);
      batch.appendAttempt(attempt);
      failureQueue.put(batch);
    } else {
      PutLogsResponse response;
      try {
        PutLogsRequest request = buildPutLogsRequest(batch);
        if (producerConfig.getCompressType() != null) {
          Consts.CompressType compressType = Consts.CompressType.fromString(producerConfig.getCompressType());
          if (compressType != null && compressType != Consts.CompressType.NONE) {
            request.setCompressType(compressType);
          }
        }
        response = client.PutLogs(request);
      } catch (Exception e) {
        LOGGER.error(
            "Failed to put logs, project="
                + batch.getProject()
                + ", logStore="
                + batch.getLogStore()
                + ", e=",
            e);
        Attempt attempt = buildAttempt(e, nowMs);
        batch.appendAttempt(attempt);
        if (meetFailureCondition(e)) {
          LOGGER.debug("Prepare to put batch to the failure queue");
          failureQueue.put(batch);
        } else {
          LOGGER.debug("Prepare to put batch to the retry queue");
          long retryBackoffMs = calculateRetryBackoffMs();
          LOGGER.debug(
              "Calculate the retryBackoffMs successfully, retryBackoffMs=" + retryBackoffMs);
          batch.setNextRetryMs(System.currentTimeMillis() + retryBackoffMs);
          try {
            retryQueue.put(batch);
          } catch (IllegalStateException e1) {
            LOGGER.error(
                "Failed to put batch to the retry queue, project="
                    + batch.getProject()
                    + ", logStore="
                    + batch.getLogStore()
                    + ", e=",
                e);
            if (retryQueue.isClosed()) {
              LOGGER.info(
                  "Prepare to put batch to the failure queue since the retry queue was closed");
              failureQueue.put(batch);
            }
          }
        }
        return;
      }
      Attempt attempt = new Attempt(true, response.GetRequestId(), "", "", nowMs);
      batch.appendAttempt(attempt);
      successQueue.put(batch);
      LOGGER.trace("Send producer batch successfully, batch={}", batch);
    }
  }