in mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java [651:728]
private static RestRepository initSingleIndex(Settings settings, long currentInstance, Resource resource, Log log) {
if (log.isDebugEnabled()) {
log.debug(String.format("Resource [%s] resolves as a single index", resource));
}
RestRepository repository = new RestRepository(settings);
// create the index if needed
if (repository.touch()) {
if (repository.waitForYellow()) {
log.warn(String.format("Timed out waiting for index [%s] to reach yellow health", resource));
}
}
// if WAN mode is used, use an already selected node
if (settings.getNodesWANOnly()) {
String node = SettingsUtils.getPinnedNode(settings);
if (log.isDebugEnabled()) {
log.debug(String.format("Partition writer instance [%s] assigned to [%s]", currentInstance, node));
}
return repository;
}
// if client-nodes are used, simply use the underlying nodes
if (settings.getNodesClientOnly()) {
String clientNode = repository.getRestClient().getCurrentNode();
if (log.isDebugEnabled()) {
log.debug(String.format("Client-node routing detected; partition writer instance [%s] assigned to [%s]",
currentInstance, clientNode));
}
return repository;
}
// no routing necessary; select the relevant target shard/node
Map<ShardInfo, NodeInfo> targetShards = repository.getWriteTargetPrimaryShards(settings.getNodesClientOnly());
repository.close();
String nodeAddress;
if (targetShards.isEmpty()) {
List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
nodeAddress = nodes.get(new Random().nextInt(nodes.size()));
if (log.isDebugEnabled()) {
log.debug(String.format("Shards not found for partition writer instance [%s], so node [%s] has been randomly selected",
currentInstance, nodeAddress));
}
} else {
List<ShardInfo> orderedShards = new ArrayList<ShardInfo>(targetShards.keySet());
// make sure the order is strict
Collections.sort(orderedShards);
if (log.isTraceEnabled()) {
log.trace(String.format("Partition writer instance [%s] discovered [%s] primary shards %s", currentInstance, orderedShards.size(), orderedShards));
}
// if there's no task info, just pick a random bucket
if (currentInstance <= 0) {
currentInstance = new Random().nextInt(targetShards.size()) + 1;
}
int bucket = (int) (currentInstance % targetShards.size());
ShardInfo chosenShard = orderedShards.get(bucket);
NodeInfo targetNode = targetShards.get(chosenShard);
nodeAddress = targetNode.getPublishAddress();
if (log.isDebugEnabled()) {
log.debug(String.format("Partition writer instance [%s] assigned using primary shard [%s] to choose node [%s]",
currentInstance, chosenShard.getName(), nodeAddress));
}
}
// pin settings
SettingsUtils.pinNode(settings, nodeAddress);
String node = SettingsUtils.getPinnedNode(settings);
repository = new RestRepository(settings);
if (log.isDebugEnabled()) {
log.debug(String.format("Partition writer instance [%s] assigned to address [%s]", currentInstance, node));
}
return repository;
}