in compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java [484:529]
private static AbstractDoFnTransform createDoFnTransform(final PipelineTranslationContext ctx,
final TransformHierarchy.Node beamNode,
final Map<Integer, PCollectionView<?>> sideInputMap) {
try {
final AppliedPTransform pTransform = beamNode.toAppliedPTransform(ctx.getPipeline());
final DoFn doFn = ParDoTranslation.getDoFn(pTransform);
final TupleTag mainOutputTag = ParDoTranslation.getMainOutputTag(pTransform);
final TupleTagList additionalOutputTags = ParDoTranslation.getAdditionalOutputTags(pTransform);
final PCollection<?> mainInput = (PCollection<?>)
Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform));
final HasDisplayData displayData = (builder) -> builder.add(DisplayData.item("name", beamNode.getFullName()));
final DoFnSchemaInformation doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(beamNode.toAppliedPTransform(ctx.getPipeline()));
if (sideInputMap.isEmpty()) {
return new DoFnTransform(
doFn,
mainInput.getCoder(),
getOutputCoders(pTransform),
mainOutputTag,
additionalOutputTags.getAll(),
mainInput.getWindowingStrategy(),
ctx.getPipelineOptions(),
DisplayData.from(displayData),
doFnSchemaInformation,
Collections.emptyMap());
} else {
return new PushBackDoFnTransform(
doFn,
mainInput.getCoder(),
getOutputCoders(pTransform),
mainOutputTag,
additionalOutputTags.getAll(),
mainInput.getWindowingStrategy(),
sideInputMap,
ctx.getPipelineOptions(),
DisplayData.from(displayData),
doFnSchemaInformation,
Collections.emptyMap());
}
} catch (final IOException e) {
throw new RuntimeException(e);
}
}