in crossdc-consumer/src/main/java/org/apache/solr/crossdc/consumer/KafkaCrossDcConsumer.java [167:295]
boolean pollAndProcessRequests() {
log.trace("Entered pollAndProcessRequests loop");
try {
try {
partitionManager.checkOffsetUpdates();
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
ConsumerRecords<String,MirroredSolrRequest> records = kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
if (log.isTraceEnabled()) {
log.trace("poll return {} records", records.count());
}
UpdateRequest solrReqBatch = null;
ConsumerRecord<String,MirroredSolrRequest> lastRecord = null;
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String,MirroredSolrRequest>> partitionRecords = records.records(partition);
PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
try {
ModifiableSolrParams lastParams = null;
NamedList lastParamsAsNamedList = null;
solrReqBatch = new UpdateRequest();
for (ConsumerRecord<String,MirroredSolrRequest> requestRecord : partitionRecords) {
if (log.isTraceEnabled()) {
log.trace("Fetched record from topic={} partition={} key={} value={}", requestRecord.topic(), requestRecord.partition(), requestRecord.key(),
requestRecord.value());
}
lastRecord = requestRecord;
MirroredSolrRequest req = requestRecord.value();
UpdateRequest solrReq = (UpdateRequest) req.getSolrRequest();
ModifiableSolrParams params = solrReq.getParams();
if (log.isTraceEnabled()) {
log.trace("params={}", params);
}
if (lastParams != null && !lastParams.toNamedList().equals(params.toNamedList())) {
if (log.isTraceEnabled()) {
log.trace("SolrParams have changed, starting new UpdateRequest, params={}", params);
}
lastParamsAsNamedList = null;
sendBatch(solrReqBatch, lastRecord, workUnit);
solrReqBatch = new UpdateRequest();
workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
}
lastParams = solrReq.getParams();
solrReqBatch.setParams(params);
if (lastParamsAsNamedList == null) {
lastParamsAsNamedList = lastParams.toNamedList();
}
List<SolrInputDocument> docs = solrReq.getDocuments();
if (docs != null) {
solrReqBatch.add(docs);
}
List<String> deletes = solrReq.getDeleteById();
if (deletes != null) {
solrReqBatch.deleteById(deletes);
}
List<String> deleteByQuery = solrReq.getDeleteQuery();
if (deleteByQuery != null) {
for (String delByQuery : deleteByQuery) {
solrReqBatch.deleteByQuery(delByQuery);
}
}
}
sendBatch(solrReqBatch, lastRecord, workUnit);
try {
partitionManager.checkForOffsetUpdates(partition);
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
// handleItem sets the thread interrupt, let's exit if there has been an interrupt set
if (Thread.currentThread().isInterrupted()) {
log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
return false;
}
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
return false;
} catch (Exception e) {
// If there is any exception returned by handleItem, don't set the offset.
if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
log.error("Non retryable error", e);
return false;
}
log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e);
return false;
}
}
try {
partitionManager.checkOffsetUpdates();
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
return false;
} catch (Exception e) {
if (e instanceof ClassCastException || e instanceof SerializationException) { // TODO: optional
log.error("Non retryable error", e);
return false;
}
log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
}
return true;
}