boolean pollAndProcessRequests()

in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java [167:295]


  boolean pollAndProcessRequests() {
    log.trace("Entered pollAndProcessRequests loop");
    try {
      try {
        partitionManager.checkOffsetUpdates();
      } catch (Throwable e) {
        log.error("Error while checking offset updates, shutting down", e);
        return false;
      }

      ConsumerRecords<String,MirroredSolrRequest> records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));

      if (log.isTraceEnabled()) {
        log.trace("poll return {} records", records.count());
      }

      UpdateRequest solrReqBatch = null;

      ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;

      for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords = records.records(partition);

        PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
        PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
        workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
        partitionWork.partitionQueue.add(workUnit);
        try {
          ModifiableSolrParams lastParams = null;
          NamedList lastParamsAsNamedList = null;
          solrReqBatch = new UpdateRequest();
          for (ConsumerRecord<String,MirroredSolrRequest> requestRecord : partitionRecords) {
            if (log.isTraceEnabled()) {
              log.trace("Fetched record from topic={} partition={} key={} value={}", requestRecord.topic(), requestRecord.partition(), requestRecord.key(),
                  requestRecord.value());
            }

            lastRecord = requestRecord;
            MirroredSolrRequest req = requestRecord.value();
            UpdateRequest solrReq = (UpdateRequest) req.getSolrRequest();
            ModifiableSolrParams params = solrReq.getParams();
            if (log.isTraceEnabled()) {
              log.trace("params={}", params);
            }

            if (lastParams != null && !lastParams.toNamedList().equals(params.toNamedList())) {
              if (log.isTraceEnabled()) {
                log.trace("SolrParams have changed, starting new UpdateRequest, params={}", params);
              }
              lastParamsAsNamedList = null;
              sendBatch(solrReqBatch, lastRecord, workUnit);
              solrReqBatch = new UpdateRequest();
              workUnit = new PartitionManager.WorkUnit(partition);
              workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
              partitionWork.partitionQueue.add(workUnit);
            }

            lastParams = solrReq.getParams();
            solrReqBatch.setParams(params);
            if (lastParamsAsNamedList == null) {
              lastParamsAsNamedList = lastParams.toNamedList();
            }

            List<SolrInputDocument> docs = solrReq.getDocuments();
            if (docs != null) {
              solrReqBatch.add(docs);
            }
            List<String> deletes = solrReq.getDeleteById();
            if (deletes != null) {
              solrReqBatch.deleteById(deletes);
            }
            List<String> deleteByQuery = solrReq.getDeleteQuery();
            if (deleteByQuery != null) {
              for (String delByQuery : deleteByQuery) {
                solrReqBatch.deleteByQuery(delByQuery);
              }
            }

          }

          sendBatch(solrReqBatch, lastRecord, workUnit);
          try {
            partitionManager.checkForOffsetUpdates(partition);
          } catch (Throwable e) {
            log.error("Error while checking offset updates, shutting down", e);
            return false;
          }

          // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
          if (Thread.currentThread().isInterrupted()) {
            log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
            return false;
          }
        } catch (WakeupException e) {
          log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
          return false;
        } catch (Exception e) {
          // If there is any exception returned by handleItem, don't set the offset.

          if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
            log.error("Non retryable error", e);
            return false;
          }
          log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e);
          return false;
        }
      }

      try {
        partitionManager.checkOffsetUpdates();
      } catch (Throwable e) {
        log.error("Error while checking offset updates, shutting down", e);
        return false;
      }

    } catch (WakeupException e) {
      log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
      return false;
    } catch (Exception e) {

      if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
        log.error("Non retryable error", e);
        return false;
      }

      log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
    }
    return true;
  }