in emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/importformat/ImportInputFormat.java [167:207]
private List<InputSplit> readEntries(JsonReader reader, JobConf job) throws IOException {
List<Path> paths = new ArrayList<Path>();
Gson gson = DynamoDBUtil.getGson();
reader.beginArray();
while (reader.hasNext()) {
ExportManifestEntry entry = gson.fromJson(reader, ExportManifestEntry.class);
paths.add(new Path(entry.url));
}
reader.endArray();
log.info("Number of S3 files: " + paths.size());
if (paths.size() == 0) {
return Collections.emptyList();
}
int filesPerSplit = (int) Math.ceil((double) (paths.size()) / Math.min(MAX_NUM_SPLITS, paths
.size()));
int numSplits = (int) Math.ceil((double) (paths.size()) / filesPerSplit);
long[] fileMaxLengths = new long[filesPerSplit];
Arrays.fill(fileMaxLengths, Long.MAX_VALUE / filesPerSplit);
long[] fileStarts = new long[filesPerSplit];
Arrays.fill(fileStarts, 0);
List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
for (int i = 0; i < numSplits; i++) {
int start = filesPerSplit * i;
int end = filesPerSplit * (i + 1);
if (i == (numSplits - 1)) {
end = paths.size();
}
Path[] pathsInOneSplit = paths.subList(start, end).toArray(new Path[end - start]);
CombineFileSplit combineFileSplit = new CombineFileSplit(job, pathsInOneSplit, fileStarts,
fileMaxLengths, new String[0]);
splits.add(combineFileSplit);
}
return splits;
}