private String genMapper()

in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/LOTGenerator.java [589:738]


  private String genMapper(LogicalOperatorTree.Builder tree, String sourceId, Column[] inColumns,
                           List<Column> outColumns, String parentId) {
    LogicalOperator.Builder builder = LogicalOperator.newBuilder();

    boolean isStreaming = isStreamingMap;

    Transform.Builder ab = Transform.newBuilder();
    for (ResourceItem item : resourceItems) {
      Transform.Resources.Builder rb = Transform.Resources.newBuilder();
      rb.setProject(item.projectName);
      rb.setResourceName(item.resourceName);
      ab.addResources(rb.build());
    }

    if (!isStreaming) {
      LanguageTransform.Builder tb = LanguageTransform.newBuilder();
      tb.setClassName(LotMapperUDTF.class.getName());
      tb.setLanguage(Language.Java);

      ab.setLanguageTransform(tb.build());
    } else {
      StreamingTransform.Builder sb = StreamingTransform.newBuilder();
      sb.setCmd(job.get("stream.map.streamprocessor", null));
      // TODO properties to pb fields
      fillStreamingMapProperties(sb);
      ab.setStreamingTransform(sb.build());
    }

    for (Column col : inColumns) {
      ScalarExpression.Builder exprBuilder = ScalarExpression.newBuilder();

      Reference.Builder refBuilder = Reference.newBuilder();
      refBuilder.setName(col.getName());
      refBuilder.setFrom(sourceId);

      if (isStreaming) {
        if (col.getType().equals(OdpsType.BOOLEAN)) {
          exprBuilder.setExpression(castBooleanAsStreamingString(refBuilder.build()));
        } else if (!col.getType().equals(OdpsType.STRING)) {
          // cast as string
          ScalarFunction.Builder castBuilder = ScalarFunction.newBuilder();
          castBuilder.setProject(project);
          castBuilder.setName("TOSTRING");
          castBuilder.addParameters(ScalarExpression.newBuilder().setReference(refBuilder.build()).build());
          exprBuilder.setExpression(castBuilder.build());
        }
        else {
          exprBuilder.setReference(refBuilder.build());
        }
      } else {
        exprBuilder.setReference(refBuilder.build());
      }

      ab.addParameters(exprBuilder.build());
    }

    Schema.Builder schemaBuilder = Schema.newBuilder();
    for (Column col : outColumns) {
      Schema.Columns.Builder scb = Schema.Columns.newBuilder();
      scb.setName(col.getName());
      scb.setType((isStreaming && !col.getName().equals(PARTITION_ID)) ? TypesProtos.Type.String
              : TypeUtils.getLotTypeFromColumn(col));
      schemaBuilder.addColumns(scb.build());
    }
    ab.setSchema(schemaBuilder.build());

    ab.setParentId(parentId);
    ab.setId("MapTransform_" + opId++);

    //volume related
    if (inputVolumeInfos != null && inputVolumeInfos.length > 0) {
      for (VolumeInfo vol : inputVolumeInfos) {
        Volume.Builder volumeBuilder = Volume.newBuilder();
        volumeBuilder.setProject(vol.getProjectName());
        volumeBuilder.setVolumeName(vol.getVolumeName());
        volumeBuilder.setPartition(vol.getPartSpec());
        volumeBuilder.setLabel(vol.getLabel());
        volumeBuilder.setIsInput(true);
        ab.addVolumes(volumeBuilder.build());
      }
    }
    if (outputVolumeInfos != null && outputVolumeInfos.length > 0) {
      for (VolumeInfo vol : outputVolumeInfos) {
        Volume.Builder volumeBuilder = Volume.newBuilder();
        volumeBuilder.setProject(vol.getProjectName());
        volumeBuilder.setVolumeName(vol.getVolumeName());
        volumeBuilder.setPartition(vol.getPartSpec());
        volumeBuilder.setLabel(vol.getLabel());
        volumeBuilder.setIsInput(false);
        ab.addVolumes(volumeBuilder.build());
      }
    }

    Transform mapper = ab.build();

    builder.setTransform(mapper);
    tree.addOperators(builder.build());

    String mapperId = mapper.getId();

    if (job.getNumReduceTasks() > 0 && isStreaming) {
      // convert key type for shuffle keys
      boolean hasNonStringOutput = false;
      for (Column col : outColumns) {
        if (!col.getName().equals(PARTITION_ID) && !col.getType().equals(OdpsType.STRING)) {
          hasNonStringOutput = true;
        }
      }
      if (hasNonStringOutput) {
        // add select for converting from string
        Select.Builder sb = Select.newBuilder();

        for (Column col : outColumns) {
          Select.Expressions.Builder seb = Select.Expressions.newBuilder();
          ScalarExpression.Builder eb = ScalarExpression.newBuilder();

          Reference.Builder refBuilder = Reference.newBuilder();
          refBuilder.setName(col.getName());
          refBuilder.setFrom(mapper.getId());

          if (col.getName().equals(PARTITION_ID) || col.getType().equals(OdpsType.STRING)) {
            eb.setReference(refBuilder.build());
          } else {
            // cast from string
            ScalarFunction.Builder castBuilder = ScalarFunction.newBuilder();
            castBuilder.setProject(project);
            castBuilder.setName("TO" + col.getType().toString());
            castBuilder.addParameters(
                ScalarExpression.newBuilder().setReference(refBuilder.build()).build());
            eb.setExpression(castBuilder.build());
          }

          seb.setExpression(eb.build());
          seb.setAlias(col.getName());
          sb.addExpressions(seb.build());
        }

        sb.setParentId(mapper.getId());
        sb.setId("SEL_" + opId++);

        Select select = sb.build();

        tree.addOperators(LogicalOperator.newBuilder().setSelect(select).build());

        mapperId = select.getId();
      }
    }

    return mapperId;
  }