in odps-sqoop/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java [268:456]
private void getMoreSplits(JobContext job, Path[] paths,
long maxSize, long minSizeNode, long minSizeRack,
List<InputSplit> splits)
throws IOException {
Configuration conf = job.getConfiguration();
// all blocks for all the files in input set
OneFileInfo[] files;
// mapping from a rack name to the list of blocks it has
HashMap<String, List<OneBlockInfo>> rackToBlocks =
new HashMap<String, List<OneBlockInfo>>();
// mapping from a block to the nodes on which it has replicas
HashMap<OneBlockInfo, String[]> blockToNodes =
new HashMap<OneBlockInfo, String[]>();
// mapping from a node to the list of blocks that it contains
HashMap<String, List<OneBlockInfo>> nodeToBlocks =
new HashMap<String, List<OneBlockInfo>>();
files = new OneFileInfo[paths.length];
if (paths.length == 0) {
return;
}
// populate all the blocks for all files
long totLength = 0;
for (int i = 0; i < paths.length; i++) {
files[i] = new OneFileInfo(paths[i], conf, isSplitable(job, paths[i]),
rackToBlocks, blockToNodes, nodeToBlocks,
rackToNodes, maxSize);
totLength += files[i].getLength();
}
ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
Set<String> nodes = new HashSet<String>();
long curSplitSize = 0;
// process all nodes and create splits that are local
// to a node.
for (Iterator<Map.Entry<String,
List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator();
iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
nodes.add(one.getKey());
List<OneBlockInfo> blocksInNode = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
for (OneBlockInfo oneblock : blocksInNode) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, nodes, validBlocks);
curSplitSize = 0;
validBlocks.clear();
}
}
}
// if there were any blocks left over and their combined size is
// larger than minSplitNode, then combine them into one split.
// Otherwise add them back to the unprocessed pool. It is likely
// that they will be combined with other blocks from the
// same rack later on.
if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
// create an input split and add it to the splits array
addCreatedSplit(splits, nodes, validBlocks);
} else {
for (OneBlockInfo oneblock : validBlocks) {
blockToNodes.put(oneblock, oneblock.hosts);
}
}
validBlocks.clear();
nodes.clear();
curSplitSize = 0;
}
// if blocks in a rack are below the specified minimum size, then keep them
// in 'overflow'. After the processing of all racks is complete, these
// overflow blocks will be combined into splits.
ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
Set<String> racks = new HashSet<String>();
// Process all racks over and over again until there is no more work to do.
while (blockToNodes.size() > 0) {
// Create one split for this rack before moving over to the next rack.
// Come back to this rack after creating a single split for each of the
// remaining racks.
// Process one rack location at a time, Combine all possible blocks that
// reside on this rack as one split. (constrained by minimum and maximum
// split size).
// iterate over all racks
for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
rackToBlocks.entrySet().iterator(); iter.hasNext();) {
Map.Entry<String, List<OneBlockInfo>> one = iter.next();
racks.add(one.getKey());
List<OneBlockInfo> blocks = one.getValue();
// for each block, copy it into validBlocks. Delete it from
// blockToNodes so that the same block does not appear in
// two different splits.
boolean createdSplit = false;
for (OneBlockInfo oneblock : blocks) {
if (blockToNodes.containsKey(oneblock)) {
validBlocks.add(oneblock);
blockToNodes.remove(oneblock);
curSplitSize += oneblock.length;
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
createdSplit = true;
break;
}
}
}
// if we created a split, then just go to the next rack
if (createdSplit) {
curSplitSize = 0;
validBlocks.clear();
racks.clear();
continue;
}
if (!validBlocks.isEmpty()) {
if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
// if there is a minimum size specified, then create a single split
// otherwise, store these blocks into overflow data structure
addCreatedSplit(splits, getHosts(racks), validBlocks);
} else {
// There were a few blocks in this rack that
// remained to be processed. Keep them in 'overflow' block list.
// These will be combined later.
overflowBlocks.addAll(validBlocks);
}
}
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
assert blockToNodes.isEmpty();
assert curSplitSize == 0;
assert validBlocks.isEmpty();
assert racks.isEmpty();
// Process all overflow blocks
for (OneBlockInfo oneblock : overflowBlocks) {
validBlocks.add(oneblock);
curSplitSize += oneblock.length;
// This might cause an exiting rack location to be re-added,
// but it should be ok.
for (int i = 0; i < oneblock.racks.length; i++) {
racks.add(oneblock.racks[i]);
}
// if the accumulated split size exceeds the maximum, then
// create this split.
if (maxSize != 0 && curSplitSize >= maxSize) {
// create an input split and add it to the splits array
addCreatedSplit(splits, getHosts(racks), validBlocks);
curSplitSize = 0;
validBlocks.clear();
racks.clear();
}
}
// Process any remaining blocks, if any.
if (!validBlocks.isEmpty()) {
addCreatedSplit(splits, getHosts(racks), validBlocks);
}
}