public List getSplits()

in core/src/main/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java [99:165]


    public List<InputSplit> getSplits(JobContext jobContext)
        throws IOException, InterruptedException {
        Configuration conf = jobContext.getConfiguration();

        //Get the job info from the configuration,
        //throws exception if not initialized
        InputJobInfo inputJobInfo;
        try {
            inputJobInfo = getJobInfo(conf);
        } catch (Exception e) {
            throw new IOException(e);
        }

        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
        if (partitionInfoList == null) {
            //No partitions match the specified partition filter
            return splits;
        }

        HCatStorageHandler storageHandler;
        JobConf jobConf;
        //For each matching partition, call getSplits on the underlying InputFormat
        for (PartInfo partitionInfo : partitionInfoList) {
            jobConf = HCatUtil.getJobConfFromContext(jobContext);
            setInputPath(jobConf, partitionInfo.getLocation());
            Map<String, String> jobProperties = partitionInfo.getJobProperties();

            HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
            for (HCatFieldSchema field :
                inputJobInfo.getTableInfo().getDataColumns().getFields())
                allCols.append(field);
            for (HCatFieldSchema field :
                inputJobInfo.getTableInfo().getPartitionColumns().getFields())
                allCols.append(field);

            HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);

            storageHandler = HCatUtil.getStorageHandler(
                jobConf, partitionInfo);

            //Get the input format
            Class inputFormatClass = storageHandler.getInputFormatClass();
            org.apache.hadoop.mapred.InputFormat inputFormat =
                getMapRedInputFormat(jobConf, inputFormatClass);

            //Call getSplit on the InputFormat, create an HCatSplit for each
            //underlying split. When the desired number of input splits is missing,
            //use a default number (denoted by zero).
            //TODO(malewicz): Currently each partition is split independently into
            //a desired number. However, we want the union of all partitions to be
            //split into a desired number while maintaining balanced sizes of input
            //splits.
            int desiredNumSplits =
                conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
            org.apache.hadoop.mapred.InputSplit[] baseSplits =
                inputFormat.getSplits(jobConf, desiredNumSplits);

            for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
                splits.add(new HCatSplit(
                    partitionInfo,
                    split, allCols));
            }
        }

        return splits;
    }