void parseArgv()

in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/streaming/StreamJob.java [211:359]


  void parseArgv() {
    CommandLine cmdLine = null;
    try {
      cmdLine = parser.parse(allOptions, argv_);
    } catch (Exception oe) {
      LOG.error(oe.getMessage());
      exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
    }

    if (cmdLine == null) {
      exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
      return;
    }

    @SuppressWarnings("unchecked")
    List<String> args = cmdLine.getArgList();
    if (args != null && args.size() > 0) {
      fail("Found " + args.size() + " unexpected arguments on the " +
           "command line " + args);
    }

    detailedUsage_ = cmdLine.hasOption("info");
    if (cmdLine.hasOption("help") || detailedUsage_) {
      printUsage = true;
      return;
    }
    verbose_ = cmdLine.hasOption("verbose");
    background_ = cmdLine.hasOption("background");
    debug_ = cmdLine.hasOption("debug") ? debug_ + 1 : debug_;

    output_ = cmdLine.getOptionValue("output");

    comCmd_ = cmdLine.getOptionValue("combiner");
    redCmd_ = cmdLine.getOptionValue("reducer");

    lazyOutput_ = cmdLine.hasOption("lazyOutput");

    String[] values = cmdLine.getOptionValues("file");
    SessionState ss = SessionState.get();
    MetaExplorer metaExplorer = new MetaExplorerImpl(ss.getOdps());
    Map<String, String> aliasToTempResource = new HashMap<String, String>();
    String padding = "_" + UUID.randomUUID().toString();
    if (values != null && values.length > 0) {
      for (int i = 0; i < values.length; i++) {
        String file = values[i];
        packageFiles_.add(file);
        try {
          aliasToTempResource.put(FilenameUtils.getName(file),
                                  metaExplorer.addFileResourceWithRetry(file, Resource.Type.FILE,
                                                                        padding, true));
        } catch (OdpsException e) {
          throw new RuntimeException(e);
        }
      }

      config_.set("stream.temp.resource.alias",
              new GsonBuilder().disableHtmlEscaping().create().toJson(aliasToTempResource));

      String[] res = config_.getResources();
      Set<String> resources = aliasToTempResource.keySet();
      if (res != null) {
        config_.setResources(StringUtils.join(res, ",") + "," + StringUtils.join(resources, ","));
      } else {
        config_.setResources(StringUtils.join(resources, ","));
      }
    }

    additionalConfSpec_ = cmdLine.getOptionValue("additionalconfspec");
    numReduceTasksSpec_ = cmdLine.getOptionValue("numReduceTasks");
    partitionerSpec_ = cmdLine.getOptionValue("partitioner");
    mapDebugSpec_ = cmdLine.getOptionValue("mapdebug");
    reduceDebugSpec_ = cmdLine.getOptionValue("reducedebug");
    ioSpec_ = cmdLine.getOptionValue("io");

    String[] car = cmdLine.getOptionValues("cacheArchive");
    if (null != car) {
      fail("no -cacheArchive option any more, please use -resources instead.");
    }

    String[] caf = cmdLine.getOptionValues("cacheFile");
    if (null != caf) {
      fail("no -cacheFile option any more, please use -resources instead.");
    }

    mapCmd_ = cmdLine.getOptionValue("mapper");

    String[] cmd = cmdLine.getOptionValues("cmdenv");
    if (null != cmd && cmd.length > 0) {
      for (String s : cmd) {
        if (addTaskEnvironment_.length() > 0) {
          addTaskEnvironment_ += " ";
        }
        addTaskEnvironment_ += s;
      }
    }

    // per table input config
    Map<String, Map<String, String>> inputConfigs =
        new HashMap<String, Map<String, String>>();
    String[] columns = null;

    for (Option opt : cmdLine.getOptions()) {
      if ("jobconf".equals(opt.getOpt())) {
        String[] jobconf = opt.getValues();
        if (null != jobconf && jobconf.length > 0) {
          for (String s : jobconf) {
            String[] parts = s.split("=", 2);
            config_.set(parts[0], parts[1]);
          }
        }
      } else if ("columns".equals(opt.getOpt())) {
        String columnsValue = opt.getValue();
        if (columnsValue.equals("ALL")) {
          columns = null;
        } else {
          columns = columnsValue.split(",");
        }
      } else if ("input".equals(opt.getOpt())) {
        values = opt.getValues();
        if (values != null && values.length > 0) {
          for (String input : values) {
            TableInfo ti = parseTableInfo(input);
            if (columns != null) {
              ti.setCols(columns);
            }
            inputSpecs_.add(ti);

            String inputKey = (ti.getProjectName() + "." + ti.getTableName()).toLowerCase();
            // XXX only apply once per table
            if (inputConfigs.get(inputKey) != null) {
              continue;
            }

            Map<String, String> inputConfig = new HashMap<String, String>();
            inputConfig.put("stream.map.input.field.separator",
                            config_.get("stream.map.input.field.separator", "\t"));
            // TODO other per table input config: cols, etc.
            inputConfigs.put(inputKey, inputConfig);
          }
        }
      }
    }
    try {
      config_.set("stream.map.input.configs",
              new GsonBuilder().disableHtmlEscaping().create().toJson(inputConfigs));
    } catch (Exception e) {
      throw new RuntimeException("fail to set input configs");
    }
  }