in SolrAndKafkaIntegrationTest.java [73:123]
public void beforeSolrAndKafkaIntegrationTest() throws Exception {
consumer = new Consumer();
Properties config = new Properties();
//config.put("unclean.leader.election.enable", "true");
//config.put("enable.partition.eof", "false");
kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) {
public String bootstrapServers() {
return super.bootstrapServers().replaceAll("localhost", "127.0.0.1");
}
};
kafkaCluster.start();
kafkaCluster.createTopic(TOPIC, 1, 1);
System.setProperty("topicName", TOPIC);
System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers());
System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false");
solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
CollectionAdminRequest.Create create =
CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
solrCluster1.getSolrClient().request(create);
solrCluster1.waitForActiveCollection(COLLECTION, 1, 1);
solrCluster1.getSolrClient().setDefaultCollection(COLLECTION);
solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf",
getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure();
CollectionAdminRequest.Create create2 =
CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1);
solrCluster2.getSolrClient().request(create2);
solrCluster2.waitForActiveCollection(COLLECTION, 1, 1);
solrCluster2.getSolrClient().setDefaultCollection(COLLECTION);
String bootstrapServers = kafkaCluster.bootstrapServers();
log.info("bootstrapServers={}", bootstrapServers);
Map<String, Object> properties = new HashMap<>();
properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers);
properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress());
properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC);
properties.put(KafkaCrossDcConf.GROUP_ID, "group1");
properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES, MAX_DOC_SIZE_BYTES);
consumer.start(properties);
}