protected void setUp()

in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/BridgeJobRunner.java [176:326]


  protected void setUp() throws OdpsException {
    // Prepare additional config parameters

    // merge streaming job alias resources if exist
    if (job.get("stream.temp.resource.alias") != null) {
      String aliasJson = job.get("stream.temp.resource.alias");
      try {
        Gson gson = new GsonBuilder().disableHtmlEscaping().create();
        aliasToTempResource.putAll((Map<String, String>) gson.fromJson(aliasJson,
                new TypeToken<Map<String, String>>() {}.getType()));
      } catch (JsonParseException e) {
        throw new OdpsException("parse stream temp resource alias json failed!", e);
      }
    }
    // for user defined partitioner, estimate reduce number if not set
    boolean isEstimateReduceNum =
        (job.getPartitionerClass() != null) && (job.get("odps.stage.reducer.num") == null);
    long inputSize = 0;
    // Expand input columns if applicable.
    TableInfo[] infos = InputUtils.getTables(job);
    // for multi inputs not allow inner output in mapper
    if (infos != null && infos.length > 1) {
      job.setMapperInnerOutputEnable(false);
    }
    String project = metaExplorer.getDefaultProject();
    boolean changed = false;
    if (infos != null) {
      for (int i = 0; i < infos.length; i++) {
        TableInfo info = infos[i];
        if (info.getProjectName() == null) {
          changed = true;
          info.setProjectName(project);
        }

        Table tbl = metaExplorer.getTable(info.getProjectName(), info.getTableName());
        List<Column> schema = tbl.getSchema().getColumns();
        String[] inputCols = getInputColumnsFromCommandSettings(job, info);
        if (inputCols.length == 0 && info.getCols() == null) {
          changed = true;
          Column[] columns = schema.toArray(new Column[schema.size()]);
          job.setInputSchema(info, columns);
          info.setCols(SchemaUtils.getNames(columns));
        } else {
          if (inputCols.length == 0) {
            inputCols = info.getCols();
          }
          Column[] columns = new Column[inputCols.length];
          for (int k = 0; k < inputCols.length; k++) {
            String colName = inputCols[k];
            for (Column c : schema) {
              if (c.getName().equalsIgnoreCase(colName)) {
                columns[k] = c;
                break;
              }
            }
          }
          job.setInputSchema(info, columns);
        }
        if (isEstimateReduceNum) {
          PartitionSpec part = info.getPartitionSpec();
          if (!part.isEmpty()) {
            // for partition table input
            inputSize += tbl.getPartition(part).getSize();
          } else {
            inputSize += tbl.getSize();
          }
        }
      }
    }
    if (changed) {
      InputUtils.setTables(infos, job);
    }
    if (isEstimateReduceNum) {
      job.setNumReduceTasks(estimateReduceNum(inputSize, job));
    }

    //add project information for volume if necessary
    changed = false;
    VolumeInfo[] volumeInfos = InputUtils.getVolumes(job);
    if (volumeInfos != null) {
      for (VolumeInfo volume : volumeInfos) {
        if (volume.getProjectName() == null) {
          changed = true;
          volume.setProjectName(project);
        }
      }
    }
    if (changed) {
      InputUtils.setVolumes(volumeInfos, job);
    }
    changed = false;
    volumeInfos = OutputUtils.getVolumes(job);
    if (volumeInfos != null) {
      for (VolumeInfo volume : volumeInfos) {
        if (volume.getProjectName() == null) {
          changed = true;
          volume.setProjectName(project);
        }
      }
    }
    if (changed) {
      OutputUtils.setVolumes(volumeInfos, job);
    }

    // Expand output columns.
    infos = OutputUtils.getTables(job);
    if (infos == null) {
      job.setOutputSchema(new Column[]{new Column("nil", OdpsType.STRING)},
                          TableInfo.DEFAULT_LABEL);
    } else {
      for (TableInfo info : infos) {
        if (info.getProjectName() == null) {
          info.setProjectName(project);
        }
        List<Column> schema = metaExplorer.getTable(info.getProjectName(), info.getTableName())
            .getSchema().getColumns();
        Column[] schemaArray = schema.toArray(new Column[schema.size()]);
        info.setCols(SchemaUtils.getNames(schemaArray));
        job.setOutputSchema(schemaArray, info.getLabel());
      }
      OutputUtils.setTables(infos, job);
    }

    getProjectModeConf();
    processTempResources();

    // Adding jobconf jar.
    ByteArrayOutputStream jarOut = null;
    try {
      jarOut = createJarArchive();
      jarOut.close();
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
    String resName = metaExplorer.addTempResourceWithRetry(
        new ByteArrayInputStream(jarOut.toByteArray()), jobId + ".jar", Resource.Type.JAR);
    aliasToTempResource.put("jobconf.jar", resName);

    if (job.getBoolean("odps.mapred.upload.framework.resources.enable", true)) {
      applyFrameworkResources();
    }

    List<String> totalRes = new ArrayList<String>();
    String[] resources = job.getResources();
    if (resources != null) {
      Collections.addAll(totalRes, resources);
    }
    totalRes.addAll(aliasToTempResource.keySet());
    job.setResources(StringUtils.join(totalRes, ","));
    job.setFunctionResources(StringUtils.join(aliasToTempResource.values(), ","));
  }