public void inform()

in crossdc-producer/src/main/java/org/apache/solr/update/processor/MirroringUpdateRequestProcessorFactory.java [127:190]


    public void inform(SolrCore core) {
        log.info("KafkaRequestMirroringHandler inform enabled={}", this.enabled);

        if (!enabled) {
            return;
        }

        log.info("Producer startup config properties before adding additional properties from Zookeeper={}", properties);

        Properties zkProps = null;
        try {
            if (core.getCoreContainer().getZkController()
                .getZkClient().exists(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
                    CrossDcConf.CROSSDC_PROPERTIES), true)) {
                byte[] data = core.getCoreContainer().getZkController().getZkClient().getData(System.getProperty(CrossDcConf.ZK_CROSSDC_PROPS_PATH,
                    CrossDcConf.CROSSDC_PROPERTIES), null, null, true);

                if (data == null) {
                    log.error(CrossDcConf.CROSSDC_PROPERTIES + " file in Zookeeper has no data");
                    throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, CrossDcConf.CROSSDC_PROPERTIES
                        + " file in Zookeeper has no data");
                }

                zkProps = new Properties();
                zkProps.load(new ByteArrayInputStream(data));

                KafkaCrossDcConf.readZkProps(properties, zkProps);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Interrupted looking for CrossDC configuration in Zookeeper", e);
            throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE, e);
        } catch (Exception e) {
            log.error("Exception looking for CrossDC configuration in Zookeeper", e);
            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Exception looking for CrossDC configuration in Zookeeper", e);
        }

        if (properties.get(BOOTSTRAP_SERVERS) == null) {
            log.error(
                "boostrapServers not specified for producer in CrossDC configuration props={} zkProps={}",
                properties, zkProps);
           throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "boostrapServers not specified for producer");
       }
        
        if (properties.get(TOPIC_NAME) == null) {
            log.error(
                "topicName not specified for producer in CrossDC configuration props={} zkProps={}",
                properties, zkProps);
            throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "topicName not specified for producer");
        }

        // load the request mirroring sink class and instantiate.
       // mirroringHandler = core.getResourceLoader().newInstance(RequestMirroringHandler.class.getName(), KafkaRequestMirroringHandler.class);

        conf = new KafkaCrossDcConf(properties);


        KafkaMirroringSink sink = new KafkaMirroringSink(conf);

        Closer closer = new Closer(sink);
        core.addCloseHook(new MyCloseHook(closer));

        mirroringHandler = new KafkaRequestMirroringHandler(sink);
    }