in core/src/main/java/org/apache/hcatalog/mapreduce/HCatBaseInputFormat.java [99:165]
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
Configuration conf = jobContext.getConfiguration();
//Get the job info from the configuration,
//throws exception if not initialized
InputJobInfo inputJobInfo;
try {
inputJobInfo = getJobInfo(conf);
} catch (Exception e) {
throw new IOException(e);
}
List<InputSplit> splits = new ArrayList<InputSplit>();
List<PartInfo> partitionInfoList = inputJobInfo.getPartitions();
if (partitionInfoList == null) {
//No partitions match the specified partition filter
return splits;
}
HCatStorageHandler storageHandler;
JobConf jobConf;
//For each matching partition, call getSplits on the underlying InputFormat
for (PartInfo partitionInfo : partitionInfoList) {
jobConf = HCatUtil.getJobConfFromContext(jobContext);
setInputPath(jobConf, partitionInfo.getLocation());
Map<String, String> jobProperties = partitionInfo.getJobProperties();
HCatSchema allCols = new HCatSchema(new LinkedList<HCatFieldSchema>());
for (HCatFieldSchema field :
inputJobInfo.getTableInfo().getDataColumns().getFields())
allCols.append(field);
for (HCatFieldSchema field :
inputJobInfo.getTableInfo().getPartitionColumns().getFields())
allCols.append(field);
HCatUtil.copyJobPropertiesToJobConf(jobProperties, jobConf);
storageHandler = HCatUtil.getStorageHandler(
jobConf, partitionInfo);
//Get the input format
Class inputFormatClass = storageHandler.getInputFormatClass();
org.apache.hadoop.mapred.InputFormat inputFormat =
getMapRedInputFormat(jobConf, inputFormatClass);
//Call getSplit on the InputFormat, create an HCatSplit for each
//underlying split. When the desired number of input splits is missing,
//use a default number (denoted by zero).
//TODO(malewicz): Currently each partition is split independently into
//a desired number. However, we want the union of all partitions to be
//split into a desired number while maintaining balanced sizes of input
//splits.
int desiredNumSplits =
conf.getInt(HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, 0);
org.apache.hadoop.mapred.InputSplit[] baseSplits =
inputFormat.getSplits(jobConf, desiredNumSplits);
for (org.apache.hadoop.mapred.InputSplit split : baseSplits) {
splits.add(new HCatSplit(
partitionInfo,
split, allCols));
}
}
return splits;
}