boolean pollAndProcessRequests()

in solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java [230:411]


  boolean pollAndProcessRequests() {
    log.trace("Entered pollAndProcessRequests loop");
    try {
      try {
        partitionManager.checkOffsetUpdates();
      } catch (Throwable e) {
        log.error("Error while checking offset updates, shutting down", e);
        return false;
      }

      ConsumerRecords<String, MirroredSolrRequest<?>> records =
          kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));

      if (log.isTraceEnabled()) {
        log.trace("poll return {} records", records.count());
      }

      UpdateRequest updateReqBatch = null;
      int currentCollapsed = 0;

      ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord = null;

      for (TopicPartition partition : records.partitions()) {
        List<ConsumerRecord<String, MirroredSolrRequest<?>>> partitionRecords =
            records.records(partition);

        PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
        PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
        workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
        partitionWork.partitionQueue.add(workUnit);
        try {
          ModifiableSolrParams lastUpdateParams = null;
          for (ConsumerRecord<String, MirroredSolrRequest<?>> requestRecord : partitionRecords) {
            if (log.isTraceEnabled()) {
              log.trace(
                  "Fetched record from topic={} partition={} key={} value={}",
                  requestRecord.topic(),
                  requestRecord.partition(),
                  requestRecord.key(),
                  requestRecord.value());
            }

            lastRecord = requestRecord;
            MirroredSolrRequest<?> req = requestRecord.value();
            SolrRequest<?> solrReq = req.getSolrRequest();
            MirroredSolrRequest.Type type = req.getType();
            metrics.counter(MetricRegistry.name(type.name(), "input")).inc();
            ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
            if (log.isTraceEnabled()) {
              log.trace("-- picked type={}, params={}", req.getType(), params);
            }

            // determine if it's an UPDATE with deletes, or if the existing batch has deletes
            boolean hasDeletes = false;
            if (type == MirroredSolrRequest.Type.UPDATE) {
              UpdateRequest ureq = (UpdateRequest) solrReq;
              hasDeletes = hasDeletes(ureq) || hasDeletes(updateReqBatch);
            }

            // it's an update but with different params
            if (type == MirroredSolrRequest.Type.UPDATE
                && (
                // different params
                (lastUpdateParams != null && !lastUpdateParams.equals(params))
                    ||
                    // no collapsing
                    (collapseUpdates == CrossDcConf.CollapseUpdates.NONE)
                    ||
                    // partial collapsing but has deletes
                    (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes)
                    ||
                    // too many collapsed - emit
                    currentCollapsed >= maxCollapseRecords)) {
              if (log.isTraceEnabled()) {
                log.trace("Starting new UpdateRequest, params={}", params);
              }
              // send previous batch, if any
              if (updateReqBatch != null) {
                sendBatch(updateReqBatch, type, lastRecord, workUnit);
              }
              updateReqBatch = null;
              currentCollapsed = 0;
              workUnit = new PartitionManager.WorkUnit(partition);
              workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
              partitionWork.partitionQueue.add(workUnit);
            }

            lastUpdateParams = params;
            if (type == MirroredSolrRequest.Type.UPDATE) {
              if (updateReqBatch == null) {
                // just initialize
                updateReqBatch = new UpdateRequest();
              } else {
                if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
                  throw new RuntimeException("Can't collapse requests.");
                }
                if (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes) {
                  throw new RuntimeException("Can't collapse requests with deletions.");
                }
                metrics.counter(MetricRegistry.name(type.name(), "collapsed")).inc();
                currentCollapsed++;
              }
              UpdateRequest update = (UpdateRequest) solrReq;
              MirroredSolrRequest.setParams(updateReqBatch, params);

              // merge
              List<SolrInputDocument> docs = update.getDocuments();
              if (docs != null) {
                updateReqBatch.add(docs);
                metrics.counter(MetricRegistry.name(type.name(), "add")).inc(docs.size());
              }
              List<String> deletes = update.getDeleteById();
              if (deletes != null) {
                updateReqBatch.deleteById(deletes);
                metrics.counter(MetricRegistry.name(type.name(), "dbi")).inc(deletes.size());
              }
              List<String> deleteByQuery = update.getDeleteQuery();
              if (deleteByQuery != null) {
                for (String delByQuery : deleteByQuery) {
                  updateReqBatch.deleteByQuery(delByQuery);
                }
                metrics.counter(MetricRegistry.name(type.name(), "dbq")).inc(deleteByQuery.size());
              }
            } else {
              // non-update requests should be sent immediately
              sendBatch(req.getSolrRequest(), type, lastRecord, workUnit);
            }
          }

          if (updateReqBatch != null) {
            sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit);
          }
          try {
            partitionManager.checkForOffsetUpdates(partition);
          } catch (Throwable e) {
            log.error("Error while checking offset updates, shutting down", e);
            return false;
          }

          // handleItem sets the thread interrupt, let's exit if there has been an interrupt set
          if (Thread.currentThread().isInterrupted()) {
            log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
            return false;
          }
        } catch (WakeupException e) {
          log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
          return false;
        } catch (Exception e) {
          // If there is any exception returned by handleItem, don't set the offset.

          if (e instanceof ClassCastException
              || e instanceof SerializationException) { // TODO: optional
            log.error("Non retryable error", e);
            return false;
          }
          log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e);
          return false;
        }
      }

      try {
        partitionManager.checkOffsetUpdates();
      } catch (Throwable e) {
        log.error("Error while checking offset updates, shutting down", e);
        return false;
      }

    } catch (WakeupException e) {
      log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
      return false;
    } catch (Exception e) {

      if (e instanceof ClassCastException
          || e instanceof SerializationException) { // TODO: optional
        log.error("Non retryable error", e);
        return false;
      }

      log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
    }
    return true;
  }