in solr/test-framework/src/java/org/apache/solr/cloud/AbstractFullDistribZkTestBase.java [412:655]
protected List<JettySolrRunner> createJettys(int numJettys) throws Exception {
List<JettySolrRunner> jettys = Collections.synchronizedList(new ArrayList<>());
List<SolrClient> clients = Collections.synchronizedList(new ArrayList<>());
List<CollectionAdminRequest<CollectionAdminResponse>> createReplicaRequests =
Collections.synchronizedList(new ArrayList<>());
List<CollectionAdminRequest<CollectionAdminResponse>> createPullReplicaRequests =
Collections.synchronizedList(new ArrayList<>());
StringBuilder sb = new StringBuilder();
// HACK: Don't be fooled by the replication factor of '1'...
//
// This CREATE command asks for a repFactor of 1, but uses an empty nodeSet.
// This allows this method to create a collection with numShards == sliceCount,
// but no actual cores ... yet. The actual replicas are added later (once the actual
// jetty instances are started)
assertEquals(
0,
CollectionAdminRequest.createCollection(
DEFAULT_COLLECTION, "conf1", sliceCount, 1) // not real rep factor!
.setCreateNodeSet("") // empty node set prevents creation of cores
.process(cloudClient)
.getStatus());
// expect sliceCount active shards, but no active replicas
ZkStateReader.from(cloudClient)
.waitForState(
DEFAULT_COLLECTION,
30,
TimeUnit.SECONDS,
SolrCloudTestCase.activeClusterShape(sliceCount, 0));
ExecutorService customThreadPool =
ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("closeThreadPool"));
int numOtherReplicas = numJettys - getPullReplicaCount() * sliceCount;
if (log.isInfoEnabled()) {
log.info(
"Creating jetty instances pullReplicaCount={} numOtherReplicas={}",
getPullReplicaCount(),
numOtherReplicas);
}
int addedReplicas = 0;
for (int i = 1; i <= numJettys; i++) {
if (sb.length() > 0) sb.append(',');
int cnt = this.jettyIntCntr.incrementAndGet();
Path jettyDir = createTempDir("shard-" + i);
Files.createDirectories(jettyDir);
setupJettySolrHome(jettyDir);
int currentI = i;
if (numOtherReplicas > 0) {
numOtherReplicas--;
if (useTlogReplicas()) {
if (log.isInfoEnabled()) {
log.info(
"create jetty {} in directory {} of type {} in shard {}",
i,
jettyDir,
Replica.Type.TLOG,
((currentI % sliceCount) + 1)); // nowarn
}
customThreadPool.execute(
() -> {
try {
JettySolrRunner j =
createJetty(
jettyDir,
useJettyDataDir ? getDataDir(testDir + "/jetty" + cnt) : null,
null,
"solrconfig.xml",
null,
Replica.Type.TLOG);
j.start();
jettys.add(j);
waitForLiveNode(j);
createReplicaRequests.add(
CollectionAdminRequest.addReplicaToShard(
DEFAULT_COLLECTION, "shard" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.TLOG));
coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
SolrClient client = createNewSolrClient(j.getLocalPort());
clients.add(client);
} catch (Exception e) {
log.error("error creating jetty", e);
throw new RuntimeException(e);
}
});
addedReplicas++;
} else {
if (log.isInfoEnabled()) {
log.info(
"create jetty {} in directory {} of type {} for shard{}",
i,
jettyDir,
Replica.Type.NRT,
((currentI % sliceCount) + 1)); // nowarn
}
customThreadPool.execute(
() -> {
try {
JettySolrRunner j =
createJetty(
jettyDir,
useJettyDataDir ? getDataDir(testDir + "/jetty" + cnt) : null,
null,
"solrconfig.xml",
null,
null);
j.start();
jettys.add(j);
waitForLiveNode(j);
createReplicaRequests.add(
CollectionAdminRequest.addReplicaToShard(
DEFAULT_COLLECTION, "shard" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.NRT));
coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
SolrClient client = createNewSolrClient(j.getLocalPort());
clients.add(client);
} catch (Exception e) {
log.error("error creating jetty", e);
throw new RuntimeException(e);
}
});
addedReplicas++;
}
} else {
log.info(
"create jetty {} in directory {} of type {} for shard{}",
i,
jettyDir,
Replica.Type.PULL,
((currentI % sliceCount) + 1)); // nowarn
customThreadPool.execute(
() -> {
try {
JettySolrRunner j =
createJetty(
jettyDir,
useJettyDataDir ? getDataDir(testDir + "/jetty" + cnt) : null,
null,
"solrconfig.xml",
null,
Replica.Type.PULL);
j.start();
jettys.add(j);
waitForLiveNode(j);
createPullReplicaRequests.add(
CollectionAdminRequest.addReplicaToShard(
DEFAULT_COLLECTION, "shard" + ((currentI % sliceCount) + 1))
.setNode(j.getNodeName())
.setType(Replica.Type.PULL));
coreClients.add(createNewSolrClient(coreName, j.getLocalPort()));
SolrClient client = createNewSolrClient(j.getLocalPort());
clients.add(client);
} catch (Exception e) {
log.error("error creating jetty", e);
throw new RuntimeException(e);
}
});
addedReplicas++;
}
}
ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
customThreadPool =
ExecutorUtil.newMDCAwareCachedThreadPool(
new SolrNamedThreadFactory("createReplicaRequests"));
for (CollectionAdminRequest<CollectionAdminResponse> r : createReplicaRequests) {
customThreadPool.execute(
() -> {
CollectionAdminResponse response;
try {
response = r.process(cloudClient);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
assertTrue(response.isSuccess());
String coreName = response.getCollectionCoresStatus().keySet().iterator().next();
});
}
ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
customThreadPool =
ExecutorUtil.newMDCAwareCachedThreadPool(
new SolrNamedThreadFactory("createPullReplicaRequests"));
for (CollectionAdminRequest<CollectionAdminResponse> r : createPullReplicaRequests) {
customThreadPool.execute(
() -> {
CollectionAdminResponse response;
try {
response = r.process(cloudClient);
} catch (SolrServerException | IOException e) {
throw new RuntimeException(e);
}
assertTrue(response.isSuccess());
String coreName = response.getCollectionCoresStatus().keySet().iterator().next();
});
}
ExecutorUtil.shutdownAndAwaitTermination(customThreadPool);
waitForActiveReplicaCount(cloudClient, DEFAULT_COLLECTION, addedReplicas);
this.jettys.addAll(jettys);
this.clients.addAll(clients);
ZkStateReader zkStateReader = ZkStateReader.from(cloudClient);
// make sure we have a leader for each shard
for (int i = 1; i <= sliceCount; i++) {
zkStateReader.getLeaderRetry(DEFAULT_COLLECTION, "shard" + i, 10000);
}
if (sliceCount > 0) {
updateMappingsFromZk(this.jettys, this.clients);
}
// build the shard string
for (int i = 1; i <= numJettys / 2; i++) {
JettySolrRunner j = this.jettys.get(i);
JettySolrRunner j2 = this.jettys.get(i + (numJettys / 2 - 1));
if (sb.length() > 0) sb.append(',');
sb.append(buildUrl(j.getLocalPort()));
sb.append("|").append(buildUrl(j2.getLocalPort()));
}
shards = sb.toString();
return jettys;
}