in lang/java/tools/src/main/java/org/apache/avro/tool/TetherTool.java [58:154]
public int run(InputStream ins, PrintStream outs, PrintStream err, List<String> args) throws Exception {
String[] argarry = args.toArray(new String[0]);
Options opts = new Options();
Option helpopt = OptionBuilder.hasArg(false).withDescription("print this message").create("help");
Option inopt = OptionBuilder.hasArg().isRequired().withDescription("comma-separated input paths").create("in");
Option outopt = OptionBuilder.hasArg().isRequired().withDescription("The output path.").create("out");
Option pargs = OptionBuilder.hasArg().withDescription(
"A string containing the command line arguments to pass to the tethered process. String should be enclosed in quotes")
.create("exec_args");
Option popt = OptionBuilder.hasArg().isRequired().withDescription("executable program, usually in HDFS")
.create("program");
Option outscopt = OptionBuilder.withType(File.class).hasArg().isRequired()
.withDescription("schema file for output of reducer").create("outschema");
Option outscmapopt = OptionBuilder.withType(File.class).hasArg()
.withDescription("(optional) map output schema file, if different from outschema").create("outschemamap");
Option redopt = OptionBuilder.withType(Integer.class).hasArg().withDescription("(optional) number of reducers")
.create("reduces");
Option cacheopt = OptionBuilder.withType(Boolean.class).hasArg()
.withDescription(
"(optional) boolean indicating whether or not the executable should be distributed via distributed cache")
.create("exec_cached");
Option protoopt = OptionBuilder.hasArg()
.withDescription("(optional) specifies the transport protocol 'http' or 'sasl'").create("protocol");
opts.addOption(redopt);
opts.addOption(outscopt);
opts.addOption(popt);
opts.addOption(pargs);
opts.addOption(inopt);
opts.addOption(outopt);
opts.addOption(helpopt);
opts.addOption(outscmapopt);
opts.addOption(cacheopt);
opts.addOption(protoopt);
CommandLineParser parser = new GnuParser();
CommandLine line = null;
HelpFormatter formatter = new HelpFormatter();
JobConf job = new JobConf();
try {
line = parser.parse(opts, argarry);
if (line.hasOption("help")) {
formatter.printHelp("tether", opts);
return 0;
}
FileInputFormat.addInputPaths(job, line.getOptionValue("in"));
FileOutputFormat.setOutputPath(job, new Path(line.getOptionValue("out")));
List<String> exargs = null;
Boolean cached = false;
if (line.hasOption("exec_args")) {
String[] splitargs = line.getOptionValue("exec_args").split(" ");
exargs = new ArrayList<>(Arrays.asList(splitargs));
}
if (line.hasOption("exec_cached")) {
cached = Boolean.parseBoolean(line.getOptionValue("exec_cached"));
}
TetherJob.setExecutable(job, new File(line.getOptionValue("program")), exargs, cached);
File outschema = (File) line.getParsedOptionValue("outschema");
job.set(AvroJob.OUTPUT_SCHEMA, Schema.parse(outschema).toString());
if (line.hasOption("outschemamap")) {
job.set(AvroJob.MAP_OUTPUT_SCHEMA,
new Schema.Parser().parse((File) line.getParsedOptionValue("outschemamap")).toString());
}
if (line.hasOption("reduces")) {
job.setNumReduceTasks((Integer) line.getParsedOptionValue("reduces"));
}
if (line.hasOption("protocol")) {
TetherJob.setProtocol(job, line.getOptionValue("protocol"));
}
} catch (Exception exp) {
System.out.println("Unexpected exception: " + exp.getMessage());
formatter.printHelp("tether", opts);
return -1;
}
TetherJob.runJob(job);
return 0;
}