in odps-sqoop/src/java/org/apache/sqoop/mapreduce/CombineFileInputFormat.java [484:611]
public abstract RecordReader<K, V> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException;
/**
* information about one file from the File System
*/
private static class OneFileInfo {
private long fileSize; // size of the file
private OneBlockInfo[] blocks; // all blocks in this file
OneFileInfo(Path path, Configuration conf,
boolean isSplitable,
HashMap<String, List<OneBlockInfo>> rackToBlocks,
HashMap<OneBlockInfo, String[]> blockToNodes,
HashMap<String, List<OneBlockInfo>> nodeToBlocks,
HashMap<String, Set<String>> rackToNodes,
long maxSize)
throws IOException {
this.fileSize = 0;
// get block locations from file system
FileSystem fs = path.getFileSystem(conf);
FileStatus stat = fs.getFileStatus(path);
BlockLocation[] locations = fs.getFileBlockLocations(stat, 0,
stat.getLen());
// create a list of all block and their locations
if (locations == null) {
blocks = new OneBlockInfo[0];
} else {
if(locations.length == 0) {
locations = new BlockLocation[] { new BlockLocation() };
}
if (!isSplitable) {
// if the file is not splitable, just create the one block with
// full file length
blocks = new OneBlockInfo[1];
fileSize = stat.getLen();
blocks[0] = new OneBlockInfo(path, 0, fileSize, locations[0]
.getHosts(), locations[0].getTopologyPaths());
} else {
ArrayList<OneBlockInfo> blocksList = new ArrayList<OneBlockInfo>(
locations.length);
for (int i = 0; i < locations.length; i++) {
fileSize += locations[i].getLength();
// each split can be a maximum of maxSize
long left = locations[i].getLength();
long myOffset = locations[i].getOffset();
long myLength = 0;
do {
if (maxSize == 0) {
myLength = left;
} else {
if (left > maxSize && left < 2 * maxSize) {
// if remainder is between max and 2*max - then
// instead of creating splits of size max, left-max we
// create splits of size left/2 and left/2. This is
// a heuristic to avoid creating really really small
// splits.
myLength = left / 2;
} else {
myLength = Math.min(maxSize, left);
}
}
OneBlockInfo oneblock = new OneBlockInfo(path, myOffset,
myLength, locations[i].getHosts(), locations[i]
.getTopologyPaths());
left -= myLength;
myOffset += myLength;
blocksList.add(oneblock);
} while (left > 0);
}
blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
}
for (OneBlockInfo oneblock : blocks) {
// add this block to the block --> node locations map
blockToNodes.put(oneblock, oneblock.hosts);
// For blocks that do not have host/rack information,
// assign to default rack.
String[] racks = null;
if (oneblock.hosts.length == 0) {
racks = new String[]{NetworkTopology.DEFAULT_RACK};
} else {
racks = oneblock.racks;
}
// add this block to the rack --> block map
for (int j = 0; j < racks.length; j++) {
String rack = racks[j];
List<OneBlockInfo> blklist = rackToBlocks.get(rack);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
rackToBlocks.put(rack, blklist);
}
blklist.add(oneblock);
if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
// Add this host to rackToNodes map
addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
}
}
// add this block to the node --> block map
for (int j = 0; j < oneblock.hosts.length; j++) {
String node = oneblock.hosts[j];
List<OneBlockInfo> blklist = nodeToBlocks.get(node);
if (blklist == null) {
blklist = new ArrayList<OneBlockInfo>();
nodeToBlocks.put(node, blklist);
}
blklist.add(oneblock);
}
}
}
}
long getLength() {
return fileSize;
}
OneBlockInfo[] getBlocks() {
return blocks;
}
}