private int publishLogs()

in twill-core/src/main/java/org/apache/twill/internal/logging/KafkaAppender.java [209:243]


  private int publishLogs(long timeout, TimeUnit timeoutUnit) throws TimeoutException {
    List<ByteBuffer> logs = Lists.newArrayListWithExpectedSize(bufferedSize.get());

    for (String json : Iterables.consumingIterable(buffer)) {
      logs.add(Charsets.UTF_8.encode(json));
    }

    long backOffTime = timeoutUnit.toNanos(timeout) / 10;
    if (backOffTime <= 0) {
      backOffTime = 1;
    }

    try {
      Stopwatch stopwatch = new Stopwatch();
      stopwatch.start();
      long publishTimeout = timeout;

      do {
        try {
          int published = doPublishLogs(logs).get(publishTimeout, timeoutUnit);
          bufferedSize.addAndGet(-published);
          return published;
        } catch (ExecutionException e) {
          LOG.error("Failed to publish logs to Kafka.", e);
          TimeUnit.NANOSECONDS.sleep(backOffTime);
          publishTimeout -= stopwatch.elapsedTime(timeoutUnit);
          stopwatch.reset();
          stopwatch.start();
        }
      } while (publishTimeout > 0);
    } catch (InterruptedException e) {
      LOG.warn("Logs publish to Kafka interrupted.", e);
    }
    return 0;
  }