in source/benchmark-sample/tpcds-gen/src/main/java/org/notmysock/tpcds/GenTable.java [33:111]
public int run(String[] args) throws Exception {
String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs();
CommandLineParser parser = new BasicParser();
getConf().setInt("io.sort.mb", 4);
org.apache.commons.cli.Options options = new org.apache.commons.cli.Options();
options.addOption("s","scale", true, "scale");
options.addOption("t","table", true, "table");
options.addOption("d","dir", true, "dir");
options.addOption("p", "parallel", true, "parallel");
CommandLine line = parser.parse(options, remainingArgs);
if(!(line.hasOption("scale") && line.hasOption("dir"))) {
HelpFormatter f = new HelpFormatter();
f.printHelp("GenTable", options);
return 1;
}
int scale = Integer.parseInt(line.getOptionValue("scale"));
String table = "all";
if(line.hasOption("table")) {
table = line.getOptionValue("table");
}
Path out = new Path(line.getOptionValue("dir"));
int parallel = scale;
if(line.hasOption("parallel")) {
parallel = Integer.parseInt(line.getOptionValue("parallel"));
}
if(parallel == 1 || scale == 1) {
System.err.println("The MR task does not work for scale=1 or parallel=1");
return 1;
}
Path in = genInput(table, scale, parallel);
Path dsdgen = copyJar(new File("target/lib/dsdgen.jar"));
URI dsuri = dsdgen.toUri();
URI link = new URI(dsuri.getScheme(),
dsuri.getUserInfo(), dsuri.getHost(),
dsuri.getPort(),dsuri.getPath(),
dsuri.getQuery(),"dsdgen");
Configuration conf = getConf();
conf.setInt("mapred.task.timeout",0);
conf.setInt("mapreduce.task.timeout",0);
conf.setBoolean("mapreduce.map.output.compress", true);
conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
DistributedCache.addCacheArchive(link, conf);
DistributedCache.createSymlink(conf);
Job job = new Job(conf, "GenTable+"+table+"_"+scale);
job.setJarByClass(getClass());
job.setNumReduceTasks(0);
job.setMapperClass(DSDGen.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(NLineInputFormat.class);
NLineInputFormat.setNumLinesPerSplit(job, 1);
FileInputFormat.addInputPath(job, in);
FileOutputFormat.setOutputPath(job, out);
// use multiple output to only write the named files
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
MultipleOutputs.addNamedOutput(job, "text",
TextOutputFormat.class, LongWritable.class, Text.class);
boolean success = job.waitForCompletion(true);
// cleanup
FileSystem fs = FileSystem.get(getConf());
fs.delete(in, false);
fs.delete(dsdgen, false);
return 0;
}