private List readEntries()

in emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/importformat/ImportInputFormat.java [167:207]


  private List<InputSplit> readEntries(JsonReader reader, JobConf job) throws IOException {
    List<Path> paths = new ArrayList<Path>();
    Gson gson = DynamoDBUtil.getGson();

    reader.beginArray();
    while (reader.hasNext()) {
      ExportManifestEntry entry = gson.fromJson(reader, ExportManifestEntry.class);
      paths.add(new Path(entry.url));
    }
    reader.endArray();
    log.info("Number of S3 files: " + paths.size());

    if (paths.size() == 0) {
      return Collections.emptyList();
    }

    int filesPerSplit = (int) Math.ceil((double) (paths.size()) / Math.min(MAX_NUM_SPLITS, paths
        .size()));
    int numSplits = (int) Math.ceil((double) (paths.size()) / filesPerSplit);

    long[] fileMaxLengths = new long[filesPerSplit];
    Arrays.fill(fileMaxLengths, Long.MAX_VALUE / filesPerSplit);

    long[] fileStarts = new long[filesPerSplit];
    Arrays.fill(fileStarts, 0);

    List<InputSplit> splits = new ArrayList<InputSplit>(numSplits);
    for (int i = 0; i < numSplits; i++) {
      int start = filesPerSplit * i;
      int end = filesPerSplit * (i + 1);
      if (i == (numSplits - 1)) {
        end = paths.size();
      }
      Path[] pathsInOneSplit = paths.subList(start, end).toArray(new Path[end - start]);
      CombineFileSplit combineFileSplit = new CombineFileSplit(job, pathsInOneSplit, fileStarts,
          fileMaxLengths, new String[0]);
      splits.add(combineFileSplit);
    }

    return splits;
  }