static

in runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslator.java [798:1214]


  static {
    registerTransformTranslator(
        View.CreatePCollectionView.class,
        new TransformTranslator<View.CreatePCollectionView>() {
          @Override
          public void translate(View.CreatePCollectionView transform, TranslationContext context) {
            translateTyped(transform, context);
          }

          private <ElemT, ViewT> void translateTyped(
              View.CreatePCollectionView<ElemT, ViewT> transform, TranslationContext context) {
            StepTranslationContext stepContext =
                context.addStep(transform, "CollectionToSingleton");
            PCollection<ElemT> input = context.getInput(transform);
            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
            WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
            stepContext.addInput(
                PropertyNames.WINDOWING_STRATEGY,
                byteArrayToJsonString(
                    serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
            stepContext.addInput(
                PropertyNames.IS_MERGING_WINDOW_FN, windowingStrategy.needsMerge());
            stepContext.addCollectionToSingletonOutput(
                input, PropertyNames.OUTPUT, transform.getView());
          }
        });

    registerTransformTranslator(
        CreateDataflowView.class,
        new TransformTranslator<CreateDataflowView>() {
          @Override
          public void translate(CreateDataflowView transform, TranslationContext context) {
            translateTyped(transform, context);
          }

          private <ElemT, ViewT> void translateTyped(
              CreateDataflowView<ElemT, ViewT> transform, TranslationContext context) {
            StepTranslationContext stepContext =
                context.addStep(transform, "CollectionToSingleton");
            PCollection<ElemT> input = context.getInput(transform);
            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
            stepContext.addCollectionToSingletonOutput(
                input, PropertyNames.OUTPUT, transform.getView());
          }
        });

    DataflowPipelineTranslator.registerTransformTranslator(
        DataflowRunner.CombineGroupedValues.class,
        new TransformTranslator<CombineGroupedValues>() {
          @Override
          public void translate(CombineGroupedValues transform, TranslationContext context) {
            translateHelper(transform, context);
          }

          private <K, InputT, OutputT> void translateHelper(
              final CombineGroupedValues<K, InputT, OutputT> primitiveTransform,
              TranslationContext context) {
            Combine.GroupedValues<K, InputT, OutputT> originalTransform =
                primitiveTransform.getOriginalCombine();
            StepTranslationContext stepContext =
                context.addStep(primitiveTransform, "CombineValues");
            translateInputs(
                stepContext,
                context.getInput(primitiveTransform),
                originalTransform.getSideInputs(),
                context);

            AppliedCombineFn<? super K, ? super InputT, ?, OutputT> fn =
                originalTransform.getAppliedFn(
                    context.getInput(primitiveTransform).getPipeline().getCoderRegistry(),
                    context.getInput(primitiveTransform).getCoder(),
                    context.getInput(primitiveTransform).getWindowingStrategy());

            stepContext.addEncodingInput(fn.getAccumulatorCoder());

            stepContext.addInput(
                PropertyNames.SERIALIZED_FN, byteArrayToJsonString(serializeToByteArray(fn)));

            stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(primitiveTransform));
          }
        });

    registerTransformTranslator(
        Flatten.PCollections.class,
        new TransformTranslator<Flatten.PCollections>() {
          @Override
          public void translate(Flatten.PCollections transform, TranslationContext context) {
            flattenHelper(transform, context);
          }

          private <T> void flattenHelper(
              Flatten.PCollections<T> transform, TranslationContext context) {
            StepTranslationContext stepContext = context.addStep(transform, "Flatten");

            List<OutputReference> inputs = new ArrayList<>();
            for (PValue input : context.getInputs(transform).values()) {
              inputs.add(context.asOutputReference(input, context.getProducer(input)));
            }
            stepContext.addInput(PropertyNames.INPUTS, inputs);
            stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
          }
        });

    registerTransformTranslator(
        GroupByKeyAndSortValuesOnly.class,
        new TransformTranslator<GroupByKeyAndSortValuesOnly>() {
          @Override
          public void translate(GroupByKeyAndSortValuesOnly transform, TranslationContext context) {
            groupByKeyAndSortValuesHelper(transform, context);
          }

          private <K1, K2, V> void groupByKeyAndSortValuesHelper(
              GroupByKeyAndSortValuesOnly<K1, K2, V> transform, TranslationContext context) {
            StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
            PCollection<KV<K1, KV<K2, V>>> input = context.getInput(transform);
            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
            stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
            stepContext.addInput(PropertyNames.SORT_VALUES, true);

            // TODO: Add support for combiner lifting once the need arises.
            stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
          }
        });

    registerTransformTranslator(
        DataflowGroupByKey.class,
        new TransformTranslator<DataflowGroupByKey>() {
          @Override
          public void translate(DataflowGroupByKey transform, TranslationContext context) {
            dataflowGroupByKeyHelper(transform, context);
          }

          private <K, V> void dataflowGroupByKeyHelper(
              DataflowGroupByKey<K, V> transform, TranslationContext context) {
            StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
            PCollection<KV<K, V>> input = context.getInput(transform);
            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
            stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));

            WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
            stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, true);
            stepContext.addInput(
                PropertyNames.SERIALIZED_FN,
                byteArrayToJsonString(
                    serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
            stepContext.addInput(
                PropertyNames.IS_MERGING_WINDOW_FN,
                !windowingStrategy.getWindowFn().isNonMerging());
            stepContext.addInput(PropertyNames.ALLOW_DUPLICATES, transform.allowDuplicates());
          }
        });

    registerTransformTranslator(
        GroupByKey.class,
        new TransformTranslator<GroupByKey>() {
          @Override
          public void translate(GroupByKey transform, TranslationContext context) {
            groupByKeyHelper(transform, context);
          }

          private <K, V> void groupByKeyHelper(
              GroupByKey<K, V> transform, TranslationContext context) {
            StepTranslationContext stepContext = context.addStep(transform, "GroupByKey");
            PCollection<KV<K, V>> input = context.getInput(transform);
            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
            stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));

            WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
            boolean isStreaming =
                context.getPipelineOptions().as(StreamingOptions.class).isStreaming();
            boolean allowCombinerLifting =
                !windowingStrategy.needsMerge()
                    && windowingStrategy.getWindowFn().assignsToOneWindow();
            if (isStreaming) {
              allowCombinerLifting &= transform.fewKeys();
              // TODO: Allow combiner lifting on the non-default trigger, as appropriate.
              allowCombinerLifting &= (windowingStrategy.getTrigger() instanceof DefaultTrigger);
            }
            stepContext.addInput(PropertyNames.DISALLOW_COMBINER_LIFTING, !allowCombinerLifting);
            stepContext.addInput(
                PropertyNames.SERIALIZED_FN,
                byteArrayToJsonString(
                    serializeWindowingStrategy(windowingStrategy, context.getPipelineOptions())));
            stepContext.addInput(
                PropertyNames.IS_MERGING_WINDOW_FN,
                !windowingStrategy.getWindowFn().isNonMerging());
          }
        });

    registerTransformTranslator(
        ParDo.MultiOutput.class,
        new TransformTranslator<ParDo.MultiOutput>() {
          @Override
          public void translate(ParDo.MultiOutput transform, TranslationContext context) {
            translateMultiHelper(transform, context);
          }

          private <InputT, OutputT> void translateMultiHelper(
              ParDo.MultiOutput<InputT, OutputT> transform, TranslationContext context) {
            StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
            DoFnSchemaInformation doFnSchemaInformation;
            doFnSchemaInformation =
                ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
            Map<String, PCollectionView<?>> sideInputMapping =
                ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
            Map<TupleTag<?>, Coder<?>> outputCoders =
                context.getOutputs(transform).entrySet().stream()
                    .collect(
                        Collectors.toMap(
                            Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
            translateInputs(
                stepContext,
                context.getInput(transform),
                transform.getSideInputs().values(),
                context);
            translateOutputs(context.getOutputs(transform), stepContext);
            translateFn(
                stepContext,
                transform.getFn(),
                context.getInput(transform).getWindowingStrategy(),
                transform.getSideInputs().values(),
                context.getInput(transform).getCoder(),
                context,
                transform.getMainOutputTag(),
                outputCoders,
                doFnSchemaInformation,
                sideInputMapping);
          }
        });

    registerTransformTranslator(
        ParDoSingle.class,
        new TransformTranslator<ParDoSingle>() {
          @Override
          public void translate(ParDoSingle transform, TranslationContext context) {
            translateSingleHelper(transform, context);
          }

          private <InputT, OutputT> void translateSingleHelper(
              ParDoSingle<InputT, OutputT> transform, TranslationContext context) {

            DoFnSchemaInformation doFnSchemaInformation;
            doFnSchemaInformation =
                ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
            Map<String, PCollectionView<?>> sideInputMapping =
                ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
            StepTranslationContext stepContext = context.addStep(transform, "ParallelDo");
            Map<TupleTag<?>, Coder<?>> outputCoders =
                context.getOutputs(transform).entrySet().stream()
                    .collect(
                        Collectors.toMap(
                            Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));

            translateInputs(
                stepContext,
                context.getInput(transform),
                transform.getSideInputs().values(),
                context);
            stepContext.addOutput(
                transform.getMainOutputTag().getId(), context.getOutput(transform));
            translateFn(
                stepContext,
                transform.getFn(),
                context.getInput(transform).getWindowingStrategy(),
                transform.getSideInputs().values(),
                context.getInput(transform).getCoder(),
                context,
                transform.getMainOutputTag(),
                outputCoders,
                doFnSchemaInformation,
                sideInputMapping);
          }
        });

    registerTransformTranslator(
        Window.Assign.class,
        new TransformTranslator<Window.Assign>() {
          @Override
          public void translate(Window.Assign transform, TranslationContext context) {
            translateHelper(transform, context);
          }

          private <T> void translateHelper(Window.Assign<T> transform, TranslationContext context) {
            StepTranslationContext stepContext = context.addStep(transform, "Bucket");
            PCollection<T> input = context.getInput(transform);
            stepContext.addInput(PropertyNames.PARALLEL_INPUT, input);
            stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));

            WindowingStrategy<?, ?> strategy = context.getOutput(transform).getWindowingStrategy();
            byte[] serializedBytes =
                serializeWindowingStrategy(strategy, context.getPipelineOptions());
            String serializedJson = byteArrayToJsonString(serializedBytes);
            stepContext.addInput(PropertyNames.SERIALIZED_FN, serializedJson);
          }
        });

    ///////////////////////////////////////////////////////////////////////////
    // IO Translation.

    registerTransformTranslator(SplittableParDo.PrimitiveBoundedRead.class, new ReadTranslator());

    registerTransformTranslator(
        TestStream.class,
        new TransformTranslator<TestStream>() {
          @Override
          public void translate(TestStream transform, TranslationContext context) {
            translateTyped(transform, context);
          }

          private <T> void translateTyped(TestStream<T> transform, TranslationContext context) {
            try {
              StepTranslationContext stepContext = context.addStep(transform, "ParallelRead");
              String ptransformId =
                  context.getSdkComponents().getPTransformIdOrThrow(context.getCurrentTransform());
              stepContext.addInput(PropertyNames.SERIALIZED_FN, ptransformId);
              stepContext.addInput(PropertyNames.FORMAT, "test_stream");
              RunnerApi.TestStreamPayload.Builder payloadBuilder =
                  RunnerApi.TestStreamPayload.newBuilder();
              for (TestStream.Event event : transform.getEvents()) {
                if (event instanceof TestStream.ElementEvent) {
                  RunnerApi.TestStreamPayload.Event.AddElements.Builder addElementsBuilder =
                      RunnerApi.TestStreamPayload.Event.AddElements.newBuilder();
                  Iterable<TimestampedValue<T>> elements =
                      ((TestStream.ElementEvent) event).getElements();
                  for (TimestampedValue<T> element : elements) {
                    addElementsBuilder.addElements(
                        RunnerApi.TestStreamPayload.TimestampedElement.newBuilder()
                            .setEncodedElement(
                                ByteString.copyFrom(
                                    CoderUtils.encodeToByteArray(
                                        transform.getValueCoder(), element.getValue())))
                            .setTimestamp(element.getTimestamp().getMillis() * 1000));
                  }
                  payloadBuilder.addEventsBuilder().setElementEvent(addElementsBuilder);
                } else if (event instanceof TestStream.WatermarkEvent) {
                  payloadBuilder
                      .addEventsBuilder()
                      .setWatermarkEvent(
                          RunnerApi.TestStreamPayload.Event.AdvanceWatermark.newBuilder()
                              .setNewWatermark(
                                  ((TestStream.WatermarkEvent) event).getWatermark().getMillis()
                                      * 1000));
                } else if (event instanceof TestStream.ProcessingTimeEvent) {
                  payloadBuilder
                      .addEventsBuilder()
                      .setProcessingTimeEvent(
                          RunnerApi.TestStreamPayload.Event.AdvanceProcessingTime.newBuilder()
                              .setAdvanceDuration(
                                  ((TestStream.ProcessingTimeEvent) event)
                                          .getProcessingTimeAdvance()
                                          .getMillis()
                                      * 1000));
                }
              }
              stepContext.addInput(
                  PropertyNames.SERIALIZED_TEST_STREAM,
                  byteArrayToJsonString(payloadBuilder.build().toByteArray()));
              stepContext.addOutput(PropertyNames.OUTPUT, context.getOutput(transform));
            } catch (Exception e) {
              throw new RuntimeException(e);
            }
          }
        });

    ///////////////////////////////////////////////////////////////////////////
    // Legacy Splittable DoFn translation.

    registerTransformTranslator(
        SplittableParDo.ProcessKeyedElements.class,
        new TransformTranslator<SplittableParDo.ProcessKeyedElements>() {
          @Override
          public void translate(
              SplittableParDo.ProcessKeyedElements transform, TranslationContext context) {
            translateTyped(transform, context);
          }

          private <InputT, OutputT, RestrictionT, WatermarkEstimatorStateT> void translateTyped(
              SplittableParDo.ProcessKeyedElements<
                      InputT, OutputT, RestrictionT, WatermarkEstimatorStateT>
                  transform,
              TranslationContext context) {
            DoFnSchemaInformation doFnSchemaInformation;
            doFnSchemaInformation =
                ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
            Map<String, PCollectionView<?>> sideInputMapping =
                ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
            StepTranslationContext stepContext =
                context.addStep(transform, "SplittableProcessKeyed");
            Map<TupleTag<?>, Coder<?>> outputCoders =
                context.getOutputs(transform).entrySet().stream()
                    .collect(
                        Collectors.toMap(
                            Map.Entry::getKey, e -> ((PCollection) e.getValue()).getCoder()));
            translateInputs(
                stepContext, context.getInput(transform), transform.getSideInputs(), context);
            translateOutputs(context.getOutputs(transform), stepContext);
            translateFn(
                stepContext,
                transform.getFn(),
                transform.getInputWindowingStrategy(),
                transform.getSideInputs(),
                transform.getElementCoder(),
                context,
                transform.getMainOutputTag(),
                outputCoders,
                doFnSchemaInformation,
                sideInputMapping);

            stepContext.addInput(
                PropertyNames.RESTRICTION_CODER,
                translateCoder(
                    KvCoder.of(
                        transform.getRestrictionCoder(),
                        transform.getWatermarkEstimatorStateCoder())));
          }
        });
  }