private static RestRepository initSingleIndex()

in mr/src/main/java/org/elasticsearch/hadoop/rest/RestService.java [651:728]


    private static RestRepository initSingleIndex(Settings settings, long currentInstance, Resource resource, Log log) {
        if (log.isDebugEnabled()) {
            log.debug(String.format("Resource [%s] resolves as a single index", resource));
        }

        RestRepository repository = new RestRepository(settings);
        // create the index if needed
        if (repository.touch()) {
            if (repository.waitForYellow()) {
                log.warn(String.format("Timed out waiting for index [%s] to reach yellow health", resource));
            }
        }

        // if WAN mode is used, use an already selected node
        if (settings.getNodesWANOnly()) {
            String node = SettingsUtils.getPinnedNode(settings);
            if (log.isDebugEnabled()) {
                log.debug(String.format("Partition writer instance [%s] assigned to [%s]", currentInstance, node));
            }

            return repository;
        }

        // if client-nodes are used, simply use the underlying nodes
        if (settings.getNodesClientOnly()) {
            String clientNode = repository.getRestClient().getCurrentNode();
            if (log.isDebugEnabled()) {
                log.debug(String.format("Client-node routing detected; partition writer instance [%s] assigned to [%s]",
                        currentInstance, clientNode));
            }

            return repository;
        }

        // no routing necessary; select the relevant target shard/node
        Map<ShardInfo, NodeInfo> targetShards = repository.getWriteTargetPrimaryShards(settings.getNodesClientOnly());
        repository.close();
        String nodeAddress;
        if (targetShards.isEmpty()) {
            List<String> nodes = SettingsUtils.discoveredOrDeclaredNodes(settings);
            nodeAddress = nodes.get(new Random().nextInt(nodes.size()));
            if (log.isDebugEnabled()) {
                log.debug(String.format("Shards not found for partition writer instance [%s], so node [%s] has been randomly selected",
                        currentInstance, nodeAddress));
            }
        } else {
            List<ShardInfo> orderedShards = new ArrayList<ShardInfo>(targetShards.keySet());
            // make sure the order is strict
            Collections.sort(orderedShards);
            if (log.isTraceEnabled()) {
                log.trace(String.format("Partition writer instance [%s] discovered [%s] primary shards %s", currentInstance, orderedShards.size(), orderedShards));
            }

            // if there's no task info, just pick a random bucket
            if (currentInstance <= 0) {
                currentInstance = new Random().nextInt(targetShards.size()) + 1;
            }
            int bucket = (int) (currentInstance % targetShards.size());
            ShardInfo chosenShard = orderedShards.get(bucket);
            NodeInfo targetNode = targetShards.get(chosenShard);
            nodeAddress = targetNode.getPublishAddress();
            if (log.isDebugEnabled()) {
                log.debug(String.format("Partition writer instance [%s] assigned using primary shard [%s] to choose node [%s]",
                        currentInstance, chosenShard.getName(), nodeAddress));
            }
        }

        // pin settings
        SettingsUtils.pinNode(settings, nodeAddress);
        String node = SettingsUtils.getPinnedNode(settings);
        repository = new RestRepository(settings);

        if (log.isDebugEnabled()) {
            log.debug(String.format("Partition writer instance [%s] assigned to address [%s]", currentInstance, node));
        }

        return repository;
    }