in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/BridgeJobRunner.java [176:326]
protected void setUp() throws OdpsException {
// Prepare additional config parameters
// merge streaming job alias resources if exist
if (job.get("stream.temp.resource.alias") != null) {
String aliasJson = job.get("stream.temp.resource.alias");
try {
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
aliasToTempResource.putAll((Map<String, String>) gson.fromJson(aliasJson,
new TypeToken<Map<String, String>>() {}.getType()));
} catch (JsonParseException e) {
throw new OdpsException("parse stream temp resource alias json failed!", e);
}
}
// for user defined partitioner, estimate reduce number if not set
boolean isEstimateReduceNum =
(job.getPartitionerClass() != null) && (job.get("odps.stage.reducer.num") == null);
long inputSize = 0;
// Expand input columns if applicable.
TableInfo[] infos = InputUtils.getTables(job);
// for multi inputs not allow inner output in mapper
if (infos != null && infos.length > 1) {
job.setMapperInnerOutputEnable(false);
}
String project = metaExplorer.getDefaultProject();
boolean changed = false;
if (infos != null) {
for (int i = 0; i < infos.length; i++) {
TableInfo info = infos[i];
if (info.getProjectName() == null) {
changed = true;
info.setProjectName(project);
}
Table tbl = metaExplorer.getTable(info.getProjectName(), info.getTableName());
List<Column> schema = tbl.getSchema().getColumns();
String[] inputCols = getInputColumnsFromCommandSettings(job, info);
if (inputCols.length == 0 && info.getCols() == null) {
changed = true;
Column[] columns = schema.toArray(new Column[schema.size()]);
job.setInputSchema(info, columns);
info.setCols(SchemaUtils.getNames(columns));
} else {
if (inputCols.length == 0) {
inputCols = info.getCols();
}
Column[] columns = new Column[inputCols.length];
for (int k = 0; k < inputCols.length; k++) {
String colName = inputCols[k];
for (Column c : schema) {
if (c.getName().equalsIgnoreCase(colName)) {
columns[k] = c;
break;
}
}
}
job.setInputSchema(info, columns);
}
if (isEstimateReduceNum) {
PartitionSpec part = info.getPartitionSpec();
if (!part.isEmpty()) {
// for partition table input
inputSize += tbl.getPartition(part).getSize();
} else {
inputSize += tbl.getSize();
}
}
}
}
if (changed) {
InputUtils.setTables(infos, job);
}
if (isEstimateReduceNum) {
job.setNumReduceTasks(estimateReduceNum(inputSize, job));
}
//add project information for volume if necessary
changed = false;
VolumeInfo[] volumeInfos = InputUtils.getVolumes(job);
if (volumeInfos != null) {
for (VolumeInfo volume : volumeInfos) {
if (volume.getProjectName() == null) {
changed = true;
volume.setProjectName(project);
}
}
}
if (changed) {
InputUtils.setVolumes(volumeInfos, job);
}
changed = false;
volumeInfos = OutputUtils.getVolumes(job);
if (volumeInfos != null) {
for (VolumeInfo volume : volumeInfos) {
if (volume.getProjectName() == null) {
changed = true;
volume.setProjectName(project);
}
}
}
if (changed) {
OutputUtils.setVolumes(volumeInfos, job);
}
// Expand output columns.
infos = OutputUtils.getTables(job);
if (infos == null) {
job.setOutputSchema(new Column[]{new Column("nil", OdpsType.STRING)},
TableInfo.DEFAULT_LABEL);
} else {
for (TableInfo info : infos) {
if (info.getProjectName() == null) {
info.setProjectName(project);
}
List<Column> schema = metaExplorer.getTable(info.getProjectName(), info.getTableName())
.getSchema().getColumns();
Column[] schemaArray = schema.toArray(new Column[schema.size()]);
info.setCols(SchemaUtils.getNames(schemaArray));
job.setOutputSchema(schemaArray, info.getLabel());
}
OutputUtils.setTables(infos, job);
}
getProjectModeConf();
processTempResources();
// Adding jobconf jar.
ByteArrayOutputStream jarOut = null;
try {
jarOut = createJarArchive();
jarOut.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
String resName = metaExplorer.addTempResourceWithRetry(
new ByteArrayInputStream(jarOut.toByteArray()), jobId + ".jar", Resource.Type.JAR);
aliasToTempResource.put("jobconf.jar", resName);
if (job.getBoolean("odps.mapred.upload.framework.resources.enable", true)) {
applyFrameworkResources();
}
List<String> totalRes = new ArrayList<String>();
String[] resources = job.getResources();
if (resources != null) {
Collections.addAll(totalRes, resources);
}
totalRes.addAll(aliasToTempResource.keySet());
job.setResources(StringUtils.join(totalRes, ","));
job.setFunctionResources(StringUtils.join(aliasToTempResource.values(), ","));
}