private List generateSplits()

in phoenix5-hive4/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java [121:193]


    private List<InputSplit> generateSplits(final JobConf jobConf, final QueryPlan qplan,
                                            final List<KeyRange> splits, String query) throws
            IOException {
        if (qplan == null){
            throw new NullPointerException();
        }if (splits == null){
            throw new NullPointerException();
        }
        final List<InputSplit> psplits = new ArrayList<>(splits.size());

        Path[] tablePaths = FileInputFormat.getInputPaths(ShimLoader.getHadoopShims()
                .newJobContext(new Job(jobConf)));
        boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                false);

        setScanCacheSize(jobConf);

        // Adding Localization
        try (org.apache.hadoop.hbase.client.Connection connection = ConnectionFactory.createConnection(PhoenixConnectionUtil.getConfiguration(jobConf))) {
        RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(qplan
                .getTableRef().getTable().getPhysicalName().toString()));

        for (List<Scan> scans : qplan.getScans()) {
            PhoenixInputSplit inputSplit;

            HRegionLocation location = regionLocator.getRegionLocation(scans.get(0).getStartRow()
                    , false);
            RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(regionLocator, connection.getAdmin());
            long regionSize =  sizeCalculator.getRegionSize(location.getRegionInfo().getRegionName());
            String regionLocation = PhoenixStorageHandlerUtil.getRegionLocation(location, LOG);

            if (splitByStats) {
                for (Scan aScan : scans) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Split for  scan : " + aScan + "with scanAttribute : " + aScan
                                .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" +
                                aScan.getCaching() + ", " + aScan.getCacheBlocks() + ", " + aScan
                                .getBatch() + "] and  regionLocation : " + regionLocation);
                    }

                    inputSplit = new PhoenixInputSplit(new ArrayList<>(Arrays.asList(aScan)), tablePaths[0],
                            regionLocation, regionSize);
                    inputSplit.setQuery(query);
                    psplits.add(inputSplit);
                }
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans
                            .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans
                            .size() - 1).getStopRow()));
                    LOG.debug("First scan : " + scans.get(0) + "with scanAttribute : " + scans
                            .get(0).getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : " +
                            "[" + scans.get(0).getCaching() + ", " + scans.get(0).getCacheBlocks()
                            + ", " + scans.get(0).getBatch() + "] and  regionLocation : " +
                            regionLocation);

                    for (int i = 0, limit = scans.size(); i < limit; i++) {
                        LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + Bytes
                                .toStringBinary(scans.get(i).getAttribute
                                        (BaseScannerRegionObserverConstants.EXPECTED_UPPER_REGION_KEY)));
                    }
                }

                inputSplit = new PhoenixInputSplit(scans, tablePaths[0], regionLocation,
                        regionSize);
                inputSplit.setQuery(query);
                psplits.add(inputSplit);
            }
        }
		}

        return psplits;
    }