in src/main/java/com/aliyun/openservices/aliyun/log/producer/internals/SendProducerBatchTask.java [74:143]
private void sendProducerBatch(long nowMs) throws InterruptedException {
LOGGER.trace("Prepare to send producer batch, batch={}", batch);
String project = batch.getProject();
Client client = getClient(project);
if (client == null) {
LOGGER.error("Failed to get client, project={}", project);
Attempt attempt =
new Attempt(
false,
"",
Errors.PROJECT_CONFIG_NOT_EXIST,
"Cannot get the projectConfig for project " + project,
nowMs);
batch.appendAttempt(attempt);
failureQueue.put(batch);
} else {
PutLogsResponse response;
try {
PutLogsRequest request = buildPutLogsRequest(batch);
if (producerConfig.getCompressType() != null) {
Consts.CompressType compressType = Consts.CompressType.fromString(producerConfig.getCompressType());
if (compressType != null && compressType != Consts.CompressType.NONE) {
request.setCompressType(compressType);
}
}
response = client.PutLogs(request);
} catch (Exception e) {
LOGGER.error(
"Failed to put logs, project="
+ batch.getProject()
+ ", logStore="
+ batch.getLogStore()
+ ", e=",
e);
Attempt attempt = buildAttempt(e, nowMs);
batch.appendAttempt(attempt);
if (meetFailureCondition(e)) {
LOGGER.debug("Prepare to put batch to the failure queue");
failureQueue.put(batch);
} else {
LOGGER.debug("Prepare to put batch to the retry queue");
long retryBackoffMs = calculateRetryBackoffMs();
LOGGER.debug(
"Calculate the retryBackoffMs successfully, retryBackoffMs=" + retryBackoffMs);
batch.setNextRetryMs(System.currentTimeMillis() + retryBackoffMs);
try {
retryQueue.put(batch);
} catch (IllegalStateException e1) {
LOGGER.error(
"Failed to put batch to the retry queue, project="
+ batch.getProject()
+ ", logStore="
+ batch.getLogStore()
+ ", e=",
e);
if (retryQueue.isClosed()) {
LOGGER.info(
"Prepare to put batch to the failure queue since the retry queue was closed");
failureQueue.put(batch);
}
}
}
return;
}
Attempt attempt = new Attempt(true, response.GetRequestId(), "", "", nowMs);
batch.appendAttempt(attempt);
successQueue.put(batch);
LOGGER.trace("Send producer batch successfully, batch={}", batch);
}
}