in runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java [798:1214]
static {
registerTransformTranslator(
View.CreatePCollectionView.class,
new TransformTranslator<View.CreatePCollectionView>() {
@Override
public void translate(View.CreatePCollectionView transform, TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
StepTranslationContext stepContext =
context.addStep(transform, "CollectionToSingleton");
PCollection<ElemT> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
stepContext.addInput(
PropertyNames.WINDOWING_STRATEGY,
byteArrayToJsonString(
serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN, windowingStrategy.needsMerge());
stepContext.addCollectionToSingletonOutput(
input, PropertyNames.OUTPUT, transform.getView());
}
});
registerTransformTranslator(
CreateDataflowView.class,
new TransformTranslator<CreateDataflowView>() {
@Override
public void translate(CreateDataflowView transform, TranslationContext context) {
translateTyped(transform, context);
}
private <ElemT, ViewT> void translateTyped(
CreateDataflowView<ElemT, ViewT> transform, TranslationContext context) {
StepTranslationContext stepContext =
context.addStep(transform, "CollectionToSingleton");
PCollection<ElemT> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addCollectionToSingletonOutput(
input, PropertyNames.OUTPUT, transform.getView());
}
});
DataflowPipelineTranslator.registerTransformTranslator(
DataflowRunner.CombineGroupedValues.class,
new TransformTranslator<CombineGroupedValues>() {
@Override
public void translate(CombineGroupedValues transform, TranslationContext context) {
translateHelper(transform, context);
}
private <K, InputT, OutputT> void translateHelper(
final CombineGroupedValues<K, InputT, OutputT> primitiveTransform,
TranslationContext context) {
Combine.GroupedValues<K, InputT, OutputT> originalTransform =
primitiveTransform.getOriginalCombine();
StepTranslationContext stepContext =
context.addStep(primitiveTransform, "CombineValues");
translateInputs(
stepContext,
context.getInput(primitiveTransform),
originalTransform.getSideInputs(),
context);
AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
originalTransform.getAppliedFn(
context.getInput(primitiveTransform).getPipeline().getCoderRegistry(),
context.getInput(primitiveTransform).getCoder(),
context.getInput(primitiveTransform).getWindowingStrategy());
stepContext.addEncodingInput(fn.getAccumulatorCoder());
stepContext.addInput(
PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(primitiveTransform));
}
});
registerTransformTranslator(
Flatten.PCollections.class,
new TransformTranslator<Flatten.PCollections>() {
@Override
public void translate(Flatten.PCollections transform, TranslationContext context) {
flattenHelper(transform, context);
}
private <T> void flattenHelper(
Flatten.PCollections<T> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "Flatten");
List<OutputReference> inputs = new ArrayList<>();
for (PValue input : context.getInputs(transform).values()) {
inputs.add(context.asOutputReference(input, context.getProducer(input)));
}
stepContext.addInput(PropertyNames.INPUTS, inputs);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
}
});
registerTransformTranslator(
GroupByKeyAndSortValuesOnly.class,
new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
@Override
public void translate(GroupByKeyAndSortValuesOnly transform, TranslationContext context) {
groupByKeyAndSortValuesHelper(transform, context);
}
private <K1, K2, V> void groupByKeyAndSortValuesHelper(
GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
PCollection<KV<K1, KV<K2, V>>> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
stepContext.addInput(PropertyNames.SORT_VALUES, true);
// TODO: Add support for combiner lifting once the need arises.
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
}
});
registerTransformTranslator(
DataflowGroupByKey.class,
new TransformTranslator<DataflowGroupByKey>() {
@Override
public void translate(DataflowGroupByKey transform, TranslationContext context) {
dataflowGroupByKeyHelper(transform, context);
}
private <K, V> void dataflowGroupByKeyHelper(
DataflowGroupByKey<K, V> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
PCollection<KV<K, V>> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
stepContext.addInput(PropertyNames.ALLOW_DUPLICATES, transform.allowDuplicates());
}
});
registerTransformTranslator(
GroupByKey.class,
new TransformTranslator<GroupByKey>() {
@Override
public void translate(GroupByKey transform, TranslationContext context) {
groupByKeyHelper(transform, context);
}
private <K, V> void groupByKeyHelper(
GroupByKey<K, V> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
PCollection<KV<K, V>> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
boolean isStreaming =
context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
boolean allowCombinerLifting =
!windowingStrategy.needsMerge()
&& windowingStrategy.getWindowFn().assignsToOneWindow();
if (isStreaming) {
allowCombinerLifting &= transform.fewKeys();
// TODO: Allow combiner lifting on the non-default trigger, as appropriate.
allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger);
}
stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting);
stepContext.addInput(
PropertyNames.SERIALIZED_FN,
byteArrayToJsonString(
serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
stepContext.addInput(
PropertyNames.IS_MERGING_WINDOW_FN,
!windowingStrategy.getWindowFn().isNonMerging());
}
});
registerTransformTranslator(
ParDo.MultiOutput.class,
new TransformTranslator<ParDo.MultiOutput>() {
@Override
public void translate(ParDo.MultiOutput transform, TranslationContext context) {
translateMultiHelper(transform, context);
}
private <InputT, OutputT> void translateMultiHelper(
ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
Map<TupleTag<?>, Coder<?>> outputCoders =
context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
translateInputs(
stepContext,
context.getInput(transform),
transform.getSideInputs().values(),
context);
translateOutputs(context.getOutputs(transform), stepContext);
translateFn(
stepContext,
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs().values(),
context.getInput(transform).getCoder(),
context,
transform.getMainOutputTag(),
outputCoders,
doFnSchemaInformation,
sideInputMapping);
}
});
registerTransformTranslator(
ParDoSingle.class,
new TransformTranslator<ParDoSingle>() {
@Override
public void translate(ParDoSingle transform, TranslationContext context) {
translateSingleHelper(transform, context);
}
private <InputT, OutputT> void translateSingleHelper(
ParDoSingle<InputT, OutputT> transform, TranslationContext context) {
DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
Map<TupleTag<?>, Coder<?>> outputCoders =
context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
translateInputs(
stepContext,
context.getInput(transform),
transform.getSideInputs().values(),
context);
stepContext.addOutput(
transform.getMainOutputTag().getId(), context.getOutput(transform));
translateFn(
stepContext,
transform.getFn(),
context.getInput(transform).getWindowingStrategy(),
transform.getSideInputs().values(),
context.getInput(transform).getCoder(),
context,
transform.getMainOutputTag(),
outputCoders,
doFnSchemaInformation,
sideInputMapping);
}
});
registerTransformTranslator(
Window.Assign.class,
new TransformTranslator<Window.Assign>() {
@Override
public void translate(Window.Assign transform, TranslationContext context) {
translateHelper(transform, context);
}
private <T> void translateHelper(Window.Assign<T> transform, TranslationContext context) {
StepTranslationContext stepContext = context.addStep(transform, "Bucket");
PCollection<T> input = context.getInput(transform);
stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
byte[] serializedBytes =
serializeWindowingStrategy(strategy, context.getPipelineOptions());
String serializedJson = byteArrayToJsonString(serializedBytes);
stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
}
});
///////////////////////////////////////////////////////////////////////////
// IO Translation.
registerTransformTranslator(SplittableParDo.PrimitiveBoundedRead.class, new ReadTranslator());
registerTransformTranslator(
TestStream.class,
new TransformTranslator<TestStream>() {
@Override
public void translate(TestStream transform, TranslationContext context) {
translateTyped(transform, context);
}
private <T> void translateTyped(TestStream<T> transform, TranslationContext context) {
try {
StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
String ptransformId =
context.getSdkComponents().getPTransformIdOrThrow(context.getCurrentTransform());
stepContext.addInput(PropertyNames.SERIALIZED_FN, ptransformId);
stepContext.addInput(PropertyNames.FORMAT, "test_stream");
RunnerApi.TestStreamPayload.Builder payloadBuilder =
RunnerApi.TestStreamPayload.newBuilder();
for (TestStream.Event event : transform.getEvents()) {
if (event instanceof TestStream.ElementEvent) {
RunnerApi.TestStreamPayload.Event.AddElements.Builder addElementsBuilder =
RunnerApi.TestStreamPayload.Event.AddElements.newBuilder();
Iterable<TimestampedValue<T>> elements =
((TestStream.ElementEvent) event).getElements();
for (TimestampedValue<T> element : elements) {
addElementsBuilder.addElements(
RunnerApi.TestStreamPayload.TimestampedElement.newBuilder()
.setEncodedElement(
ByteString.copyFrom(
CoderUtils.encodeToByteArray(
transform.getValueCoder(), element.getValue())))
.setTimestamp(element.getTimestamp().getMillis() * 1000));
}
payloadBuilder.addEventsBuilder().setElementEvent(addElementsBuilder);
} else if (event instanceof TestStream.WatermarkEvent) {
payloadBuilder
.addEventsBuilder()
.setWatermarkEvent(
RunnerApi.TestStreamPayload.Event.AdvanceWatermark.newBuilder()
.setNewWatermark(
((TestStream.WatermarkEvent) event).getWatermark().getMillis()
* 1000));
} else if (event instanceof TestStream.ProcessingTimeEvent) {
payloadBuilder
.addEventsBuilder()
.setProcessingTimeEvent(
RunnerApi.TestStreamPayload.Event.AdvanceProcessingTime.newBuilder()
.setAdvanceDuration(
((TestStream.ProcessingTimeEvent) event)
.getProcessingTimeAdvance()
.getMillis()
* 1000));
}
}
stepContext.addInput(
PropertyNames.SERIALIZED_TEST_STREAM,
byteArrayToJsonString(payloadBuilder.build().toByteArray()));
stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
///////////////////////////////////////////////////////////////////////////
// Legacy Splittable DoFn translation.
registerTransformTranslator(
SplittableParDo.ProcessKeyedElements.class,
new TransformTranslator<SplittableParDo.ProcessKeyedElements>() {
@Override
public void translate(
SplittableParDo.ProcessKeyedElements transform, TranslationContext context) {
translateTyped(transform, context);
}
private <InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> void translateTyped(
SplittableParDo.ProcessKeyedElements<
InputT, OutputT, RestrictionT, WatermarkEstimatorStateT>
transform,
TranslationContext context) {
DoFnSchemaInformation doFnSchemaInformation;
doFnSchemaInformation =
ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
Map<String, PCollectionView<?>> sideInputMapping =
ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
StepTranslationContext stepContext =
context.addStep(transform, "SplittableProcessKeyed");
Map<TupleTag<?>, Coder<?>> outputCoders =
context.getOutputs(transform).entrySet().stream()
.collect(
Collectors.toMap(
Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
translateInputs(
stepContext, context.getInput(transform), transform.getSideInputs(), context);
translateOutputs(context.getOutputs(transform), stepContext);
translateFn(
stepContext,
transform.getFn(),
transform.getInputWindowingStrategy(),
transform.getSideInputs(),
transform.getElementCoder(),
context,
transform.getMainOutputTag(),
outputCoders,
doFnSchemaInformation,
sideInputMapping);
stepContext.addInput(
PropertyNames.RESTRICTION_CODER,
translateCoder(
KvCoder.of(
transform.getRestrictionCoder(),
transform.getWatermarkEstimatorStateCoder())));
}
});
}