private static AbstractDoFnTransform createDoFnTransform()

in compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java [484:529]


  private static AbstractDoFnTransform createDoFnTransform(final PipelineTranslationContext ctx,
                                                           final TransformHierarchy.Node beamNode,
                                                           final Map<Integer, PCollectionView<?>> sideInputMap) {
    try {
      final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
      final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
      final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
      final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);

      final PCollection<?> mainInput = (PCollection<?>)
        Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));

      final HasDisplayData displayData = (builder) -> builder.add(DisplayData.item("name", beamNode.getFullName()));
      final DoFnSchemaInformation doFnSchemaInformation =
        ParDoTranslation.getSchemaInformation(beamNode.toAppliedPTransform(ctx.getPipeline()));

      if (sideInputMap.isEmpty()) {
        return new DoFnTransform(
          doFn,
          mainInput.getCoder(),
          getOutputCoders(pTransform),
          mainOutputTag,
          additionalOutputTags.getAll(),
          mainInput.getWindowingStrategy(),
          ctx.getPipelineOptions(),
          DisplayData.from(displayData),
          doFnSchemaInformation,
          Collections.emptyMap());
      } else {
        return new PushBackDoFnTransform(
          doFn,
          mainInput.getCoder(),
          getOutputCoders(pTransform),
          mainOutputTag,
          additionalOutputTags.getAll(),
          mainInput.getWindowingStrategy(),
          sideInputMap,
          ctx.getPipelineOptions(),
          DisplayData.from(displayData),
          doFnSchemaInformation,
          Collections.emptyMap());
      }
    } catch (final IOException e) {
      throw new RuntimeException(e);
    }
  }