SolrAndKafkaIntegrationTest.java (183 lines of code) (raw):

package org.apache.solr.crossdc; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakAction; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import com.carrotsearch.randomizedtesting.annotations.ThreadLeakLingering; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.lucene.util.QuickPatchThreadsFilter; import org.apache.solr.SolrIgnoredThreadsFilter; import org.apache.solr.SolrTestCaseJ4; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.BaseCloudSolrClient; import org.apache.solr.client.solrj.impl.BaseHttpSolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.request.UpdateRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.cloud.SolrCloudTestCase; import org.apache.solr.common.SolrInputDocument; import org.apache.solr.common.util.ObjectReleaseTracker; import org.apache.solr.crossdc.common.KafkaCrossDcConf; import org.apache.solr.crossdc.common.MirroredSolrRequest; import org.apache.solr.crossdc.common.MirroredSolrRequestSerializer; import org.apache.solr.crossdc.consumer.Consumer; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.sys.Prop; import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import static org.apache.solr.crossdc.common.KafkaCrossDcConf.DEFAULT_MAX_REQUEST_SIZE; import static org.apache.solr.crossdc.common.KafkaCrossDcConf.INDEX_UNMIRRORABLE_DOCS; import static org.mockito.Mockito.spy; @ThreadLeakFilters(defaultFilters = true, filters = { SolrIgnoredThreadsFilter.class, QuickPatchThreadsFilter.class, SolrKafkaTestsIgnoredThreadsFilter.class }) @ThreadLeakLingering(linger = 5000) public class SolrAndKafkaIntegrationTest extends SolrTestCaseJ4 { private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final int MAX_DOC_SIZE_BYTES = Integer.parseInt(DEFAULT_MAX_REQUEST_SIZE); static final String VERSION_FIELD = "_version_"; private static final int NUM_BROKERS = 1; public EmbeddedKafkaCluster kafkaCluster; protected volatile MiniSolrCloudCluster solrCluster1; protected volatile MiniSolrCloudCluster solrCluster2; protected static volatile Consumer consumer; private static String TOPIC = "topic1"; private static String COLLECTION = "collection1"; private static String ALT_COLLECTION = "collection2"; @Before public void beforeSolrAndKafkaIntegrationTest() throws Exception { consumer = new Consumer(); Properties config = new Properties(); //config.put("unclean.leader.election.enable", "true"); //config.put("enable.partition.eof", "false"); kafkaCluster = new EmbeddedKafkaCluster(NUM_BROKERS, config) { public String bootstrapServers() { return super.bootstrapServers().replaceAll("localhost", "127.0.0.1"); } }; kafkaCluster.start(); kafkaCluster.createTopic(TOPIC, 1, 1); System.setProperty("topicName", TOPIC); System.setProperty("bootstrapServers", kafkaCluster.bootstrapServers()); System.setProperty(INDEX_UNMIRRORABLE_DOCS, "false"); solrCluster1 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure(); CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1); solrCluster1.getSolrClient().request(create); solrCluster1.waitForActiveCollection(COLLECTION, 1, 1); solrCluster1.getSolrClient().setDefaultCollection(COLLECTION); solrCluster2 = new SolrCloudTestCase.Builder(1, createTempDir()).addConfig("conf", getFile("src/test/resources/configs/cloud-minimal/conf").toPath()).configure(); CollectionAdminRequest.Create create2 = CollectionAdminRequest.createCollection(COLLECTION, "conf", 1, 1); solrCluster2.getSolrClient().request(create2); solrCluster2.waitForActiveCollection(COLLECTION, 1, 1); solrCluster2.getSolrClient().setDefaultCollection(COLLECTION); String bootstrapServers = kafkaCluster.bootstrapServers(); log.info("bootstrapServers={}", bootstrapServers); Map<String, Object> properties = new HashMap<>(); properties.put(KafkaCrossDcConf.BOOTSTRAP_SERVERS, bootstrapServers); properties.put(KafkaCrossDcConf.ZK_CONNECT_STRING, solrCluster2.getZkServer().getZkAddress()); properties.put(KafkaCrossDcConf.TOPIC_NAME, TOPIC); properties.put(KafkaCrossDcConf.GROUP_ID, "group1"); properties.put(KafkaCrossDcConf.MAX_REQUEST_SIZE_BYTES, MAX_DOC_SIZE_BYTES); consumer.start(properties); } @After public void afterSolrAndKafkaIntegrationTest() throws Exception { ObjectReleaseTracker.clear(); if (solrCluster1 != null) { solrCluster1.getZkServer().getZkClient().printLayoutToStdOut(); solrCluster1.shutdown(); } if (solrCluster2 != null) { solrCluster2.getZkServer().getZkClient().printLayoutToStdOut(); solrCluster2.shutdown(); } consumer.shutdown(); consumer = null; try { //kafkaCluster.deleteAllTopicsAndWait(10000); kafkaCluster.stop(); kafkaCluster = null; } catch (Exception e) { log.error("Exception stopping Kafka cluster", e); } } // public void testFullCloudToCloud() throws Exception { // CloudSolrClient client = solrCluster1.getSolrClient(); // SolrInputDocument doc = new SolrInputDocument(); // doc.addField("id", String.valueOf(System.currentTimeMillis())); // doc.addField("text", "some test"); // // client.add(doc); // // client.commit(COLLECTION); // // System.out.println("Sent producer record"); // // assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1); // } // private static SolrInputDocument getDoc() { SolrInputDocument doc = new SolrInputDocument(); doc.addField("id", String.valueOf(System.nanoTime())); doc.addField("text", "some test"); return doc; } // // public void testProducerToCloud() throws Exception { // Properties properties = new Properties(); // properties.put("bootstrap.servers", kafkaCluster.bootstrapServers()); // properties.put("acks", "all"); // properties.put("retries", 1); // properties.put("batch.size", 1); // properties.put("buffer.memory", 33554432); // properties.put("linger.ms", 1); // properties.put("key.serializer", StringSerializer.class.getName()); // properties.put("value.serializer", MirroredSolrRequestSerializer.class.getName()); // Producer<String, MirroredSolrRequest> producer = new KafkaProducer(properties); // UpdateRequest updateRequest = new UpdateRequest(); // updateRequest.setParam("shouldMirror", "true"); // updateRequest.add("id", String.valueOf(System.currentTimeMillis()), "text", "test"); // updateRequest.add("id", String.valueOf(System.currentTimeMillis() + 22), "text", "test2"); // updateRequest.setParam("collection", COLLECTION); // MirroredSolrRequest mirroredSolrRequest = new MirroredSolrRequest(updateRequest); // System.out.println("About to send producer record"); // producer.send(new ProducerRecord(TOPIC, mirroredSolrRequest), (metadata, exception) -> { // log.info("Producer finished sending metadata={}, exception={}", metadata, exception); // }); // producer.flush(); // // System.out.println("Sent producer record"); // // solrCluster2.getSolrClient().commit(COLLECTION); // // assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 2); // // producer.close(); // } // @Test // public void testMirroringUpdateProcessor() throws Exception { // final SolrInputDocument tooLargeDoc = new SolrInputDocument(); // tooLargeDoc.addField("id", "tooLarge-" + String.valueOf(System.currentTimeMillis())); // tooLargeDoc.addField("text", new String(new byte[2 * MAX_DOC_SIZE_BYTES])); // final SolrInputDocument normalDoc = new SolrInputDocument(); // normalDoc.addField("id", "normalDoc-" + String.valueOf(System.currentTimeMillis())); // normalDoc.addField("text", "Hello world"); // final List<SolrInputDocument> docsToIndex = new ArrayList<>(); // docsToIndex.add(normalDoc); // docsToIndex.add(tooLargeDoc); // // final CloudSolrClient cluster1Client = solrCluster1.getSolrClient(); // try { // cluster1Client.add(docsToIndex); // } catch (BaseCloudSolrClient.RouteException e) { // // expected // } // cluster1Client.commit(COLLECTION); // // // Primary and secondary should each only index 'normalDoc' // final String normalDocQuery = "id:" + normalDoc.get("id").getFirstValue(); // assertCluster2EventuallyHasDocs(COLLECTION, normalDocQuery, 1); // assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 1); // assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, normalDocQuery, 1); // assertClusterEventuallyHasDocs(cluster1Client, COLLECTION, "*:*", 1); // // // Create new primary+secondary collection where 'tooLarge' docs ARE indexed on the primary // CollectionAdminRequest.Create create = // CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1) // .withProperty("indexUnmirrorableDocs", "true"); // try { // solrCluster1.getSolrClient().request(create); // solrCluster2.getSolrClient().request(create); // solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1); // solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1); // // cluster1Client.add(ALT_COLLECTION, docsToIndex); // cluster1Client.commit(ALT_COLLECTION); // // // Primary should have both 'normal' and 'large' docs; secondary should only have 'normal' doc. // assertClusterEventuallyHasDocs(cluster1Client, ALT_COLLECTION, "*:*", 2); // assertCluster2EventuallyHasDocs(ALT_COLLECTION, normalDocQuery, 1); // assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 1); // } finally { // CollectionAdminRequest.Delete delete = // CollectionAdminRequest.deleteCollection(ALT_COLLECTION); // solrCluster1.getSolrClient().request(delete); // solrCluster2.getSolrClient().request(delete); // } // } private void assertCluster2EventuallyHasDocs(String collection, String query, int expectedNumDocs) throws Exception { assertClusterEventuallyHasDocs(solrCluster2.getSolrClient(), collection, query, expectedNumDocs); } private void createCollection(CloudSolrClient client, CollectionAdminRequest.Create createCmd) throws Exception { final String stashedDefault = client.getDefaultCollection(); try { //client.setDefaultCollection(null); client.request(createCmd); } finally { //client.setDefaultCollection(stashedDefault); } } @Test public void testFullCloudToCloudMultiCollection() throws Exception { CollectionAdminRequest.Create create = CollectionAdminRequest.createCollection(ALT_COLLECTION, "conf", 1, 1); try { solrCluster1.getSolrClient().request(create); solrCluster1.waitForActiveCollection(ALT_COLLECTION, 1, 1); solrCluster2.getSolrClient().request(create); solrCluster2.waitForActiveCollection(ALT_COLLECTION, 1, 1); CloudSolrClient client = solrCluster1.getSolrClient(); SolrInputDocument doc1 = getDoc(); SolrInputDocument doc2 = getDoc(); SolrInputDocument doc3 = getDoc(); SolrInputDocument doc4 = getDoc(); SolrInputDocument doc5 = getDoc(); SolrInputDocument doc6 = getDoc(); SolrInputDocument doc7 = getDoc(); client.add(COLLECTION, doc1); client.add(ALT_COLLECTION, doc2); client.add(COLLECTION, doc3); client.add(COLLECTION, doc4); client.add(ALT_COLLECTION, doc5); client.add(ALT_COLLECTION, doc6); client.add(COLLECTION, doc7); client.commit(COLLECTION); client.commit(ALT_COLLECTION); System.out.println("Sent producer record"); assertCluster2EventuallyHasDocs(ALT_COLLECTION, "*:*", 3); assertCluster2EventuallyHasDocs(COLLECTION, "*:*", 4); } finally { CollectionAdminRequest.Delete delete = CollectionAdminRequest.deleteCollection(ALT_COLLECTION); solrCluster1.getSolrClient().request(delete); solrCluster2.getSolrClient().request(delete); } } private void assertClusterEventuallyHasDocs(SolrClient client, String collection, String query, int expectedNumDocs) throws Exception { QueryResponse results = null; boolean foundUpdates = false; for (int i = 0; i < 100; i++) { client.commit(collection); results = client.query(collection, new SolrQuery(query)); if (results.getResults().getNumFound() == expectedNumDocs) { foundUpdates = true; } else { Thread.sleep(200); } } assertTrue("results=" + results, foundUpdates); } }