in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/LOTGenerator.java [589:738]
private String genMapper(LogicalOperatorTree.Builder tree, String sourceId, Column[] inColumns,
List<Column> outColumns, String parentId) {
LogicalOperator.Builder builder = LogicalOperator.newBuilder();
boolean isStreaming = isStreamingMap;
Transform.Builder ab = Transform.newBuilder();
for (ResourceItem item : resourceItems) {
Transform.Resources.Builder rb = Transform.Resources.newBuilder();
rb.setProject(item.projectName);
rb.setResourceName(item.resourceName);
ab.addResources(rb.build());
}
if (!isStreaming) {
LanguageTransform.Builder tb = LanguageTransform.newBuilder();
tb.setClassName(LotMapperUDTF.class.getName());
tb.setLanguage(Language.Java);
ab.setLanguageTransform(tb.build());
} else {
StreamingTransform.Builder sb = StreamingTransform.newBuilder();
sb.setCmd(job.get("stream.map.streamprocessor", null));
// TODO properties to pb fields
fillStreamingMapProperties(sb);
ab.setStreamingTransform(sb.build());
}
for (Column col : inColumns) {
ScalarExpression.Builder exprBuilder = ScalarExpression.newBuilder();
Reference.Builder refBuilder = Reference.newBuilder();
refBuilder.setName(col.getName());
refBuilder.setFrom(sourceId);
if (isStreaming) {
if (col.getType().equals(OdpsType.BOOLEAN)) {
exprBuilder.setExpression(castBooleanAsStreamingString(refBuilder.build()));
} else if (!col.getType().equals(OdpsType.STRING)) {
// cast as string
ScalarFunction.Builder castBuilder = ScalarFunction.newBuilder();
castBuilder.setProject(project);
castBuilder.setName("TOSTRING");
castBuilder.addParameters(ScalarExpression.newBuilder().setReference(refBuilder.build()).build());
exprBuilder.setExpression(castBuilder.build());
}
else {
exprBuilder.setReference(refBuilder.build());
}
} else {
exprBuilder.setReference(refBuilder.build());
}
ab.addParameters(exprBuilder.build());
}
Schema.Builder schemaBuilder = Schema.newBuilder();
for (Column col : outColumns) {
Schema.Columns.Builder scb = Schema.Columns.newBuilder();
scb.setName(col.getName());
scb.setType((isStreaming && !col.getName().equals(PARTITION_ID)) ? TypesProtos.Type.String
: TypeUtils.getLotTypeFromColumn(col));
schemaBuilder.addColumns(scb.build());
}
ab.setSchema(schemaBuilder.build());
ab.setParentId(parentId);
ab.setId("MapTransform_" + opId++);
//volume related
if (inputVolumeInfos != null && inputVolumeInfos.length > 0) {
for (VolumeInfo vol : inputVolumeInfos) {
Volume.Builder volumeBuilder = Volume.newBuilder();
volumeBuilder.setProject(vol.getProjectName());
volumeBuilder.setVolumeName(vol.getVolumeName());
volumeBuilder.setPartition(vol.getPartSpec());
volumeBuilder.setLabel(vol.getLabel());
volumeBuilder.setIsInput(true);
ab.addVolumes(volumeBuilder.build());
}
}
if (outputVolumeInfos != null && outputVolumeInfos.length > 0) {
for (VolumeInfo vol : outputVolumeInfos) {
Volume.Builder volumeBuilder = Volume.newBuilder();
volumeBuilder.setProject(vol.getProjectName());
volumeBuilder.setVolumeName(vol.getVolumeName());
volumeBuilder.setPartition(vol.getPartSpec());
volumeBuilder.setLabel(vol.getLabel());
volumeBuilder.setIsInput(false);
ab.addVolumes(volumeBuilder.build());
}
}
Transform mapper = ab.build();
builder.setTransform(mapper);
tree.addOperators(builder.build());
String mapperId = mapper.getId();
if (job.getNumReduceTasks() > 0 && isStreaming) {
// convert key type for shuffle keys
boolean hasNonStringOutput = false;
for (Column col : outColumns) {
if (!col.getName().equals(PARTITION_ID) && !col.getType().equals(OdpsType.STRING)) {
hasNonStringOutput = true;
}
}
if (hasNonStringOutput) {
// add select for converting from string
Select.Builder sb = Select.newBuilder();
for (Column col : outColumns) {
Select.Expressions.Builder seb = Select.Expressions.newBuilder();
ScalarExpression.Builder eb = ScalarExpression.newBuilder();
Reference.Builder refBuilder = Reference.newBuilder();
refBuilder.setName(col.getName());
refBuilder.setFrom(mapper.getId());
if (col.getName().equals(PARTITION_ID) || col.getType().equals(OdpsType.STRING)) {
eb.setReference(refBuilder.build());
} else {
// cast from string
ScalarFunction.Builder castBuilder = ScalarFunction.newBuilder();
castBuilder.setProject(project);
castBuilder.setName("TO" + col.getType().toString());
castBuilder.addParameters(
ScalarExpression.newBuilder().setReference(refBuilder.build()).build());
eb.setExpression(castBuilder.build());
}
seb.setExpression(eb.build());
seb.setAlias(col.getName());
sb.addExpressions(seb.build());
}
sb.setParentId(mapper.getId());
sb.setId("SEL_" + opId++);
Select select = sb.build();
tree.addOperators(LogicalOperator.newBuilder().setSelect(select).build());
mapperId = select.getId();
}
}
return mapperId;
}