public void register()

in samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java [100:165]


  public void register(final String source) {
    BulkProcessor.Listener listener = new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request) {
          // Nothing to do.
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
          boolean hasFatalError = false;
          //Do not consider version conficts to be errors. Ignore old versions
          if (response.hasFailures()) {
            for (BulkItemResponse itemResp : response.getItems()) {
              if (itemResp.isFailed()) {
                if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
                  LOGGER.info("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
                } else {
                  hasFatalError = true;
                  LOGGER.error("Failed to index document in Elasticsearch: " + itemResp.getFailureMessage());
                }
              }
            }
          }
          if (hasFatalError) {
            sendFailed.set(true);
          } else {
            updateSuccessMetrics(response);
          }
        }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
          LOGGER.error(failure.getMessage());
          thrown.compareAndSet(null, failure);
          sendFailed.set(true);
        }

        private void updateSuccessMetrics(BulkResponse response) {
          metrics.bulkSendSuccess.inc();
          int writes = 0;
          for (BulkItemResponse itemResp: response.getItems()) {
            if (itemResp.isFailed()) {
              if (itemResp.getFailure().getStatus().equals(RestStatus.CONFLICT)) {
                metrics.conflicts.inc();
              }
            } else {
              ActionResponse resp = itemResp.getResponse();
              if (resp instanceof IndexResponse) {
                writes += 1;
                if (((IndexResponse) resp).isCreated()) {
                  metrics.inserts.inc();
                } else {
                  metrics.updates.inc();
                }
              } else {
                LOGGER.error("Unexpected Elasticsearch action response type: " + resp.getClass().getSimpleName());
              }
            }
          }
          LOGGER.info(String.format("Wrote %s messages from %s to %s.",
                  writes, source, system));
        }
    };

    sourceBulkProcessor.put(source, bulkProcessorFactory.getBulkProcessor(client, listener));
  }