public FutureCallbackHolder()

in gobblin-modules/gobblin-elasticsearch/src/main/java/org/apache/gobblin/elasticsearch/writer/FutureCallbackHolder.java [53:173]


  public FutureCallbackHolder(final @Nullable WriteCallback callback,
      ExceptionLogger exceptionLogger,
      final MalformedDocPolicy malformedDocPolicy) {
    this.future = new Future<WriteResponse>() {
      @Override
      public boolean cancel(boolean mayInterruptIfRunning) {
        return false;
      }

      @Override
      public boolean isCancelled() {
        return false;
      }

      @Override
      public boolean isDone() {
        return done.get();
      }

      @Override
      public WriteResponse get()
          throws InterruptedException, ExecutionException {
        Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.take();
        return getWriteResponseorThrow(writeResponseThrowablePair);
      }

      @Override
      public WriteResponse get(long timeout, TimeUnit unit)
          throws InterruptedException, ExecutionException, TimeoutException {
        Pair<WriteResponse, Throwable> writeResponseThrowablePair = writeResponseQueue.poll(timeout, unit);
        if (writeResponseThrowablePair == null) {
          throw new TimeoutException("Timeout exceeded while waiting for future to be done");
        } else {
          return getWriteResponseorThrow(writeResponseThrowablePair);
        }
      }
    };

    this.actionListener = new ActionListener<BulkResponse>() {
      @Override
      public void onResponse(BulkResponse bulkItemResponses) {
        if (bulkItemResponses.hasFailures()) {
          boolean logicalErrors = false;
          boolean serverErrors = false;
          for (BulkItemResponse bulkItemResponse: bulkItemResponses) {
            if (bulkItemResponse.isFailed()) {
              // check if the failure is permanent (logical) or transient (server)
              if (isLogicalError(bulkItemResponse)) {
                // check error policy
                switch (malformedDocPolicy) {
                  case IGNORE: {
                    log.debug("Document id {} was malformed with error {}",
                        bulkItemResponse.getId(),
                        bulkItemResponse.getFailureMessage());
                    break;
                  }
                  case WARN: {
                    log.warn("Document id {} was malformed with error {}",
                        bulkItemResponse.getId(),
                        bulkItemResponse.getFailureMessage());
                    break;
                  }
                  default: {
                    // Pass through
                  }
                }
                logicalErrors = true;
              } else {
                serverErrors = true;
              }
            }
          }
          if (serverErrors) {
            onFailure(new RuntimeException("Partial failures in the batch: " + bulkItemResponses.buildFailureMessage()));
          } else if (logicalErrors) {
            // all errors found were logical, throw RuntimeException if policy says to Fail
            switch (malformedDocPolicy) {
              case FAIL: {
                onFailure(new RuntimeException("Partial non-recoverable failures in the batch. To ignore these, set "
                    + ElasticsearchWriterConfigurationKeys.ELASTICSEARCH_WRITER_MALFORMED_DOC_POLICY + " to "
                    + MalformedDocPolicy.IGNORE.name()));
                break;
              }
              default: {
                WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses);
                writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null));
                if (callback != null) {
                  callback.onSuccess(writeResponse);
                }
              }
            }
          }
        } else {
          WriteResponse writeResponse = new GenericWriteResponse<BulkResponse>(bulkItemResponses);
          writeResponseQueue.add(new Pair<WriteResponse, Throwable>(writeResponse, null));
          if (callback != null) {
            callback.onSuccess(writeResponse);
          }
        }
      }

      private boolean isLogicalError(BulkItemResponse bulkItemResponse) {
        String failureMessage = bulkItemResponse.getFailureMessage();
        return failureMessage.contains("IllegalArgumentException")
            || failureMessage.contains("illegal_argument_exception")
            || failureMessage.contains("MapperParsingException")
            || failureMessage.contains("mapper_parsing_exception");
      }

      @Override
      public void onFailure(Exception exception) {
        writeResponseQueue.add(new Pair<WriteResponse, Throwable>(null, exception));
        if (exceptionLogger != null) {
          exceptionLogger.log(exception);
        }
        if (callback != null) {
          callback.onFailure(exception);
        }
      }
    };
  }