in twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java [209:243]
private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutException {
List<ByteBuffer> logs = Lists.newArrayListWithExpectedSize(bufferedSize.get());
for (String json : Iterables.consumingIterable(buffer)) {
logs.add(Charsets.UTF_8.encode(json));
}
long backOffTime = timeoutUnit.toNanos(timeout) / 10;
if (backOffTime <= 0) {
backOffTime = 1;
}
try {
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
long publishTimeout = timeout;
do {
try {
int published = doPublishLogs(logs).get(publishTimeout, timeoutUnit);
bufferedSize.addAndGet(-published);
return published;
} catch (ExecutionException e) {
LOG.error("Failed to publish logs to Kafka.", e);
TimeUnit.NANOSECONDS.sleep(backOffTime);
publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
stopwatch.reset();
stopwatch.start();
}
} while (publishTimeout > 0);
} catch (InterruptedException e) {
LOG.warn("Logs publish to Kafka interrupted.", e);
}
return 0;
}