in solr/cross-dc-manager/src/java/org/apache/solr/crossdc/manager/consumer/KafkaCrossDcConsumer.java [230:411]
boolean pollAndProcessRequests() {
log.trace("Entered pollAndProcessRequests loop");
try {
try {
partitionManager.checkOffsetUpdates();
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
ConsumerRecords<String, MirroredSolrRequest<?>> records =
kafkaConsumer.poll(Duration.ofMillis(KAFKA_CONSUMER_POLL_TIMEOUT_MS));
if (log.isTraceEnabled()) {
log.trace("poll return {} records", records.count());
}
UpdateRequest updateReqBatch = null;
int currentCollapsed = 0;
ConsumerRecord<String, MirroredSolrRequest<?>> lastRecord = null;
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, MirroredSolrRequest<?>>> partitionRecords =
records.records(partition);
PartitionManager.PartitionWork partitionWork = partitionManager.getPartitionWork(partition);
PartitionManager.WorkUnit workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
try {
ModifiableSolrParams lastUpdateParams = null;
for (ConsumerRecord<String, MirroredSolrRequest<?>> requestRecord : partitionRecords) {
if (log.isTraceEnabled()) {
log.trace(
"Fetched record from topic={} partition={} key={} value={}",
requestRecord.topic(),
requestRecord.partition(),
requestRecord.key(),
requestRecord.value());
}
lastRecord = requestRecord;
MirroredSolrRequest<?> req = requestRecord.value();
SolrRequest<?> solrReq = req.getSolrRequest();
MirroredSolrRequest.Type type = req.getType();
metrics.counter(MetricRegistry.name(type.name(), "input")).inc();
ModifiableSolrParams params = new ModifiableSolrParams(solrReq.getParams());
if (log.isTraceEnabled()) {
log.trace("-- picked type={}, params={}", req.getType(), params);
}
// determine if it's an UPDATE with deletes, or if the existing batch has deletes
boolean hasDeletes = false;
if (type == MirroredSolrRequest.Type.UPDATE) {
UpdateRequest ureq = (UpdateRequest) solrReq;
hasDeletes = hasDeletes(ureq) || hasDeletes(updateReqBatch);
}
// it's an update but with different params
if (type == MirroredSolrRequest.Type.UPDATE
&& (
// different params
(lastUpdateParams != null && !lastUpdateParams.equals(params))
||
// no collapsing
(collapseUpdates == CrossDcConf.CollapseUpdates.NONE)
||
// partial collapsing but has deletes
(collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes)
||
// too many collapsed - emit
currentCollapsed >= maxCollapseRecords)) {
if (log.isTraceEnabled()) {
log.trace("Starting new UpdateRequest, params={}", params);
}
// send previous batch, if any
if (updateReqBatch != null) {
sendBatch(updateReqBatch, type, lastRecord, workUnit);
}
updateReqBatch = null;
currentCollapsed = 0;
workUnit = new PartitionManager.WorkUnit(partition);
workUnit.nextOffset = PartitionManager.getOffsetForPartition(partitionRecords);
partitionWork.partitionQueue.add(workUnit);
}
lastUpdateParams = params;
if (type == MirroredSolrRequest.Type.UPDATE) {
if (updateReqBatch == null) {
// just initialize
updateReqBatch = new UpdateRequest();
} else {
if (collapseUpdates == CrossDcConf.CollapseUpdates.NONE) {
throw new RuntimeException("Can't collapse requests.");
}
if (collapseUpdates == CrossDcConf.CollapseUpdates.PARTIAL && hasDeletes) {
throw new RuntimeException("Can't collapse requests with deletions.");
}
metrics.counter(MetricRegistry.name(type.name(), "collapsed")).inc();
currentCollapsed++;
}
UpdateRequest update = (UpdateRequest) solrReq;
MirroredSolrRequest.setParams(updateReqBatch, params);
// merge
List<SolrInputDocument> docs = update.getDocuments();
if (docs != null) {
updateReqBatch.add(docs);
metrics.counter(MetricRegistry.name(type.name(), "add")).inc(docs.size());
}
List<String> deletes = update.getDeleteById();
if (deletes != null) {
updateReqBatch.deleteById(deletes);
metrics.counter(MetricRegistry.name(type.name(), "dbi")).inc(deletes.size());
}
List<String> deleteByQuery = update.getDeleteQuery();
if (deleteByQuery != null) {
for (String delByQuery : deleteByQuery) {
updateReqBatch.deleteByQuery(delByQuery);
}
metrics.counter(MetricRegistry.name(type.name(), "dbq")).inc(deleteByQuery.size());
}
} else {
// non-update requests should be sent immediately
sendBatch(req.getSolrRequest(), type, lastRecord, workUnit);
}
}
if (updateReqBatch != null) {
sendBatch(updateReqBatch, MirroredSolrRequest.Type.UPDATE, lastRecord, workUnit);
}
try {
partitionManager.checkForOffsetUpdates(partition);
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
// handleItem sets the thread interrupt, let's exit if there has been an interrupt set
if (Thread.currentThread().isInterrupted()) {
log.info("Kafka Consumer thread interrupted, shutting down Kafka consumer.");
return false;
}
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer.");
return false;
} catch (Exception e) {
// If there is any exception returned by handleItem, don't set the offset.
if (e instanceof ClassCastException
|| e instanceof SerializationException) { // TODO: optional
log.error("Non retryable error", e);
return false;
}
log.error("Exception occurred in Kafka consumer thread, stopping the Consumer.", e);
return false;
}
}
try {
partitionManager.checkOffsetUpdates();
} catch (Throwable e) {
log.error("Error while checking offset updates, shutting down", e);
return false;
}
} catch (WakeupException e) {
log.info("Caught wakeup exception, shutting down KafkaSolrRequestConsumer");
return false;
} catch (Exception e) {
if (e instanceof ClassCastException
|| e instanceof SerializationException) { // TODO: optional
log.error("Non retryable error", e);
return false;
}
log.error("Exception occurred in Kafka consumer thread, but we will continue.", e);
}
return true;
}