in crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java [127:190]
public void inform(SolrCore core) {
log.info("KafkaRequestMirroringHandler inform enabled={}", this.enabled);
if (!enabled) {
return;
}
log.info("Producer startup config properties before adding additional properties from Zookeeper={}", properties);
Properties zkProps = null;
try {
if (core.getCoreContainer().getZkController()
.getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
CrossDcConf.CROSSDC_PROPERTIES), true)) {
byte[] data = core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
CrossDcConf.CROSSDC_PROPERTIES), null, null, true);
if (data == null) {
log.error(CrossDcConf.CROSSDC_PROPERTIES + " file in Zookeeper has no data");
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, CrossDcConf.CROSSDC_PROPERTIES
+ " file in Zookeeper has no data");
}
zkProps = new Properties();
zkProps.load(new ByteArrayInputStream(data));
KafkaCrossDcConf.readZkProps(properties, zkProps);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted looking for CrossDC configuration in Zookeeper", e);
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
} catch (Exception e) {
log.error("Exception looking for CrossDC configuration in Zookeeper", e);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception looking for CrossDC configuration in Zookeeper", e);
}
if (properties.get(BOOTSTRAP_SERVERS) == null) {
log.error(
"boostrapServers not specified for producer in CrossDC configuration props={} zkProps={}",
properties, zkProps);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "boostrapServers not specified for producer");
}
if (properties.get(TOPIC_NAME) == null) {
log.error(
"topicName not specified for producer in CrossDC configuration props={} zkProps={}",
properties, zkProps);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "topicName not specified for producer");
}
// load the request mirroring sink class and instantiate.
// mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);
conf = new KafkaCrossDcConf(properties);
KafkaMirroringSink sink = new KafkaMirroringSink(conf);
Closer closer = new Closer(sink);
core.addCloseHook(new MyCloseHook(closer));
mirroringHandler = new KafkaRequestMirroringHandler(sink);
}