public INcCollection build()

in hyracks/hyracks-hdfs/hyracks-hdfs-core/src/main/java/org/apache/hyracks/hdfs/scheduler/IPProximityNcCollectionBuilder.java [40:147]


    public INcCollection build(Map<String, NodeControllerInfo> ncNameToNcInfos,
            final Map<String, List<String>> ipToNcMapping, final Map<String, Integer> ncNameToIndex, String[] NCs,
            final int[] workloads, final int slotLimit) {
        final TreeMap<BytesWritable, IntWritable> availableIpsToSlots = new TreeMap<BytesWritable, IntWritable>();
        for (int i = 0; i < workloads.length; i++) {
            if (workloads[i] < slotLimit) {
                byte[] rawip;
                try {
                    rawip = ncNameToNcInfos.get(NCs[i]).getNetworkAddress().lookupIpAddress();
                }
                catch (UnknownHostException e) {
                    // QQQ Should probably have a neater solution than this
                    throw new RuntimeException(e);
                }
                BytesWritable ip = new BytesWritable(rawip);
                IntWritable availableSlot = availableIpsToSlots.get(ip);
                if (availableSlot == null) {
                    availableSlot = new IntWritable(slotLimit - workloads[i]);
                    availableIpsToSlots.put(ip, availableSlot);
                } else {
                    availableSlot.set(slotLimit - workloads[i] + availableSlot.get());
                }
            }
        }
        return new INcCollection() {

            @Override
            public String findNearestAvailableSlot(InputSplit split) {
                try {
                    String[] locs = split.getLocations();
                    int minDistance = Integer.MAX_VALUE;
                    BytesWritable currentCandidateIp = null;
                    if (locs == null || locs.length > 0) {
                        for (int j = 0; j < locs.length; j++) {
                            /**
                             * get all the IP addresses from the name
                             */
                            InetAddress[] allIps = InetAddress.getAllByName(locs[j]);
                            for (InetAddress ip : allIps) {
                                BytesWritable splitIp = new BytesWritable(ip.getAddress());
                                /**
                                 * if the node controller exists
                                 */
                                BytesWritable candidateNcIp = availableIpsToSlots.floorKey(splitIp);
                                if (candidateNcIp == null) {
                                    candidateNcIp = availableIpsToSlots.ceilingKey(splitIp);
                                }
                                if (candidateNcIp != null) {
                                    if (availableIpsToSlots.get(candidateNcIp).get() > 0) {
                                        byte[] candidateIP = candidateNcIp.getBytes();
                                        byte[] splitIP = splitIp.getBytes();
                                        int candidateInt = candidateIP[0] << 24 | (candidateIP[1] & 0xFF) << 16
                                                | (candidateIP[2] & 0xFF) << 8 | (candidateIP[3] & 0xFF);
                                        int splitInt = splitIP[0] << 24 | (splitIP[1] & 0xFF) << 16
                                                | (splitIP[2] & 0xFF) << 8 | (splitIP[3] & 0xFF);
                                        int distance = Math.abs(candidateInt - splitInt);
                                        if (minDistance > distance) {
                                            minDistance = distance;
                                            currentCandidateIp = candidateNcIp;
                                        }
                                    }
                                }
                            }
                        }
                    } else {
                        for (Entry<BytesWritable, IntWritable> entry : availableIpsToSlots.entrySet()) {
                            if (entry.getValue().get() > 0) {
                                currentCandidateIp = entry.getKey();
                                break;
                            }
                        }
                    }

                    if (currentCandidateIp != null) {
                        /**
                         * Update the entry of the selected IP
                         */
                        IntWritable availableSlot = availableIpsToSlots.get(currentCandidateIp);
                        availableSlot.set(availableSlot.get() - 1);
                        if (availableSlot.get() == 0) {
                            availableIpsToSlots.remove(currentCandidateIp);
                        }
                        /**
                         * Update the entry of the selected NC
                         */
                        List<String> dataLocations = ipToNcMapping.get(InetAddress.getByAddress(
                                currentCandidateIp.getBytes()).getHostAddress());
                        for (String nc : dataLocations) {
                            int ncIndex = ncNameToIndex.get(nc);
                            if (workloads[ncIndex] < slotLimit) {
                                return nc;
                            }
                        }
                    }
                    /** not scheduled */
                    return null;
                } catch (Exception e) {
                    throw new IllegalStateException(e);
                }
            }

            @Override
            public int numAvailableSlots() {
                return availableIpsToSlots.size();
            }

        };
    }