in emr-dynamodb-tools/src/main/java/org/apache/hadoop/dynamodb/tools/DynamoDBImport.java [46:86]
public int run(String[] args) throws Exception {
if (args.length < 2) {
printUsage("Not enough parameters");
return -1;
}
JobConf jobConf = new JobConf(getConf(), DynamoDBImport.class);
jobConf.setJobName("dynamodb-import");
jobConf.setOutputKeyClass(Text.class);
jobConf.setOutputValueClass(DynamoDBItemWritable.class);
jobConf.setMapperClass(ImportMapper.class);
jobConf.setReducerClass(IdentityReducer.class);
jobConf.setInputFormat(ImportInputFormat.class);
jobConf.setOutputFormat(DynamoDBOutputFormat.class);
jobConf.setNumReduceTasks(0);
FileInputFormat.setInputPaths(jobConf, new Path(args[0]));
String tableName = args[1];
Double writeRatio = null;
if (args.length >= 3) {
String val = args[2];
try {
writeRatio = Double.parseDouble(val);
} catch (Exception e) {
printUsage("Could not parse write ratio (value: " + val + ")");
return -1;
}
}
setTableProperties(jobConf, tableName, writeRatio);
Date startTime = new Date();
System.out.println("Job started: " + startTime);
JobClient.runJob(jobConf);
Date endTime = new Date();
System.out.println("Job ended: " + endTime);
System.out.println("The job took " + (endTime.getTime() - startTime.getTime()) / 1000 + " "
+ "seconds.");
return 0;
}