in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/streaming/StreamJob.java [211:359]
void parseArgv() {
CommandLine cmdLine = null;
try {
cmdLine = parser.parse(allOptions, argv_);
} catch (Exception oe) {
LOG.error(oe.getMessage());
exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
}
if (cmdLine == null) {
exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
return;
}
@SuppressWarnings("unchecked")
List<String> args = cmdLine.getArgList();
if (args != null && args.size() > 0) {
fail("Found " + args.size() + " unexpected arguments on the " +
"command line " + args);
}
detailedUsage_ = cmdLine.hasOption("info");
if (cmdLine.hasOption("help") || detailedUsage_) {
printUsage = true;
return;
}
verbose_ = cmdLine.hasOption("verbose");
background_ = cmdLine.hasOption("background");
debug_ = cmdLine.hasOption("debug") ? debug_ + 1 : debug_;
output_ = cmdLine.getOptionValue("output");
comCmd_ = cmdLine.getOptionValue("combiner");
redCmd_ = cmdLine.getOptionValue("reducer");
lazyOutput_ = cmdLine.hasOption("lazyOutput");
String[] values = cmdLine.getOptionValues("file");
SessionState ss = SessionState.get();
MetaExplorer metaExplorer = new MetaExplorerImpl(ss.getOdps());
Map<String, String> aliasToTempResource = new HashMap<String, String>();
String padding = "_" + UUID.randomUUID().toString();
if (values != null && values.length > 0) {
for (int i = 0; i < values.length; i++) {
String file = values[i];
packageFiles_.add(file);
try {
aliasToTempResource.put(FilenameUtils.getName(file),
metaExplorer.addFileResourceWithRetry(file, Resource.Type.FILE,
padding, true));
} catch (OdpsException e) {
throw new RuntimeException(e);
}
}
config_.set("stream.temp.resource.alias",
new GsonBuilder().disableHtmlEscaping().create().toJson(aliasToTempResource));
String[] res = config_.getResources();
Set<String> resources = aliasToTempResource.keySet();
if (res != null) {
config_.setResources(StringUtils.join(res, ",") + "," + StringUtils.join(resources, ","));
} else {
config_.setResources(StringUtils.join(resources, ","));
}
}
additionalConfSpec_ = cmdLine.getOptionValue("additionalconfspec");
numReduceTasksSpec_ = cmdLine.getOptionValue("numReduceTasks");
partitionerSpec_ = cmdLine.getOptionValue("partitioner");
mapDebugSpec_ = cmdLine.getOptionValue("mapdebug");
reduceDebugSpec_ = cmdLine.getOptionValue("reducedebug");
ioSpec_ = cmdLine.getOptionValue("io");
String[] car = cmdLine.getOptionValues("cacheArchive");
if (null != car) {
fail("no -cacheArchive option any more, please use -resources instead.");
}
String[] caf = cmdLine.getOptionValues("cacheFile");
if (null != caf) {
fail("no -cacheFile option any more, please use -resources instead.");
}
mapCmd_ = cmdLine.getOptionValue("mapper");
String[] cmd = cmdLine.getOptionValues("cmdenv");
if (null != cmd && cmd.length > 0) {
for (String s : cmd) {
if (addTaskEnvironment_.length() > 0) {
addTaskEnvironment_ += " ";
}
addTaskEnvironment_ += s;
}
}
// per table input config
Map<String, Map<String, String>> inputConfigs =
new HashMap<String, Map<String, String>>();
String[] columns = null;
for (Option opt : cmdLine.getOptions()) {
if ("jobconf".equals(opt.getOpt())) {
String[] jobconf = opt.getValues();
if (null != jobconf && jobconf.length > 0) {
for (String s : jobconf) {
String[] parts = s.split("=", 2);
config_.set(parts[0], parts[1]);
}
}
} else if ("columns".equals(opt.getOpt())) {
String columnsValue = opt.getValue();
if (columnsValue.equals("ALL")) {
columns = null;
} else {
columns = columnsValue.split(",");
}
} else if ("input".equals(opt.getOpt())) {
values = opt.getValues();
if (values != null && values.length > 0) {
for (String input : values) {
TableInfo ti = parseTableInfo(input);
if (columns != null) {
ti.setCols(columns);
}
inputSpecs_.add(ti);
String inputKey = (ti.getProjectName() + "." + ti.getTableName()).toLowerCase();
// XXX only apply once per table
if (inputConfigs.get(inputKey) != null) {
continue;
}
Map<String, String> inputConfig = new HashMap<String, String>();
inputConfig.put("stream.map.input.field.separator",
config_.get("stream.map.input.field.separator", "\t"));
// TODO other per table input config: cols, etc.
inputConfigs.put(inputKey, inputConfig);
}
}
}
}
try {
config_.set("stream.map.input.configs",
new GsonBuilder().disableHtmlEscaping().create().toJson(inputConfigs));
} catch (Exception e) {
throw new RuntimeException("fail to set input configs");
}
}