def run_pipeline()

in sdks/python/apache_beam/runners/dataflow/dataflow_runner.py [0:0]


  def run_pipeline(self, pipeline, options, pipeline_proto=None):
    """Remotely executes entire pipeline or parts reachable from node."""
    if _is_runner_v2_disabled(options):
      raise ValueError(
          'Disabling Runner V2 no longer supported '
          'using Beam Python %s.' % beam.version.__version__)

    # Label goog-dataflow-notebook if job is started from notebook.
    if is_in_notebook():
      notebook_version = (
          'goog-dataflow-notebook=' +
          beam.version.__version__.replace('.', '_'))
      if options.view_as(GoogleCloudOptions).labels:
        options.view_as(GoogleCloudOptions).labels.append(notebook_version)
      else:
        options.view_as(GoogleCloudOptions).labels = [notebook_version]

    # Import here to avoid adding the dependency for local running scenarios.
    try:
      # pylint: disable=wrong-import-order, wrong-import-position
      from apache_beam.runners.dataflow.internal import apiclient
    except ImportError:
      raise ImportError(
          'Google Cloud Dataflow runner not available, '
          'please install apache_beam[gcp]')

    _check_and_add_missing_options(options)

    # Convert all side inputs into a form acceptable to Dataflow.
    if pipeline:
      pipeline.visit(self.combinefn_visitor())

      pipeline.visit(
          self.side_input_visitor(
              deterministic_key_coders=not options.view_as(
                  TypeOptions).allow_non_deterministic_key_coders))

      # Performing configured PTransform overrides. Note that this is currently
      # done before Runner API serialization, since the new proto needs to
      # contain any added PTransforms.
      pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)

      if options.view_as(DebugOptions).lookup_experiment('use_legacy_bq_sink'):
        warnings.warn(
            "Native sinks no longer implemented; "
            "ignoring use_legacy_bq_sink.")

    if pipeline_proto:
      self.proto_pipeline = pipeline_proto

    else:
      if options.view_as(SetupOptions).prebuild_sdk_container_engine:
        # if prebuild_sdk_container_engine is specified we will build a new sdk
        # container image with dependencies pre-installed and use that image,
        # instead of using the inferred default container image.
        self._default_environment = (
            environments.DockerEnvironment.from_options(options))
        options.view_as(WorkerOptions).sdk_container_image = (
            self._default_environment.container_image)
      else:
        artifacts = environments.python_sdk_dependencies(options)
        if artifacts:
          _LOGGER.info(
              "Pipeline has additional dependencies to be installed "
              "in SDK worker container, consider using the SDK "
              "container image pre-building workflow to avoid "
              "repetitive installations. Learn more on "
              "https://cloud.google.com/dataflow/docs/guides/"
              "using-custom-containers#prebuild")
        self._default_environment = (
            environments.DockerEnvironment.from_container_image(
                apiclient.get_container_image_from_options(options),
                artifacts=artifacts,
                resource_hints=environments.resource_hints_from_options(
                    options)))

      # This has to be performed before pipeline proto is constructed to make
      # sure that the changes are reflected in the portable job submission path.
      self._adjust_pipeline_for_dataflow_v2(pipeline)

      # Snapshot the pipeline in a portable proto.
      self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
          return_context=True, default_environment=self._default_environment)

    if any(pcoll.is_bounded == beam_runner_api_pb2.IsBounded.UNBOUNDED
           for pcoll in self.proto_pipeline.components.pcollections.values()):
      if (not options.view_as(StandardOptions).streaming and
          not options.view_as(DebugOptions).lookup_experiment(
              'unsafely_attempt_to_process_unbounded_data_in_batch_mode')):
        _LOGGER.info(
            'Automatically inferring streaming mode '
            'due to unbounded PCollections.')
        options.view_as(StandardOptions).streaming = True

    if options.view_as(StandardOptions).streaming:
      _check_and_add_missing_streaming_options(options)

    # Dataflow can only handle Docker environments.
    for env_id, env in self.proto_pipeline.components.environments.items():
      self.proto_pipeline.components.environments[env_id].CopyFrom(
          environments.resolve_anyof_environment(
              env, common_urns.environments.DOCKER.urn))
    self.proto_pipeline = merge_common_environments(
        merge_superset_dep_environments(self.proto_pipeline))

    # Optimize the pipeline if it not streaming and the pre_optimize
    # experiment is set.
    if not options.view_as(StandardOptions).streaming:
      pre_optimize = options.view_as(DebugOptions).lookup_experiment(
          'pre_optimize', 'default').lower()
      from apache_beam.runners.portability.fn_api_runner import translations
      if pre_optimize == 'none':
        phases = []
      elif pre_optimize == 'default' or pre_optimize == 'all':
        phases = [translations.pack_combiners, translations.sort_stages]
      else:
        phases = []
        for phase_name in pre_optimize.split(','):
          # For now, these are all we allow.
          if phase_name in ('pack_combiners', ):
            phases.append(getattr(translations, phase_name))
          else:
            raise ValueError(
                'Unknown or inapplicable phase for pre_optimize: %s' %
                phase_name)
        phases.append(translations.sort_stages)

      if phases:
        self.proto_pipeline = translations.optimize_pipeline(
            self.proto_pipeline,
            phases=phases,
            known_runner_urns=frozenset(),
            partial=True)

    # Add setup_options for all the BeamPlugin imports
    setup_options = options.view_as(SetupOptions)
    plugins = BeamPlugin.get_all_plugin_paths()
    if setup_options.beam_plugins is not None:
      plugins = list(set(plugins + setup_options.beam_plugins))
    setup_options.beam_plugins = plugins

    # Elevate "min_cpu_platform" to pipeline option, but using the existing
    # experiment.
    debug_options = options.view_as(DebugOptions)
    worker_options = options.view_as(WorkerOptions)
    if worker_options.min_cpu_platform:
      debug_options.add_experiment(
          'min_cpu_platform=' + worker_options.min_cpu_platform)

    self.job = apiclient.Job(options, self.proto_pipeline)

    test_options = options.view_as(TestOptions)
    # If it is a dry run, return without submitting the job.
    if test_options.dry_run:
      result = PipelineResult(PipelineState.DONE)
      result.wait_until_finish = lambda duration=None: None
      result.job = self.job
      return result

    # Get a Dataflow API client and set its options
    self.dataflow_client = apiclient.DataflowApplicationClient(
        options, self.job.root_staging_location)

    # Create the job description and send a request to the service. The result
    # can be None if there is no need to send a request to the service (e.g.
    # template creation). If a request was sent and failed then the call will
    # raise an exception.
    result = DataflowPipelineResult(
        self.dataflow_client.create_job(self.job), self, options)

    # TODO(BEAM-4274): Circular import runners-metrics. Requires refactoring.
    from apache_beam.runners.dataflow.dataflow_metrics import DataflowMetrics
    self._metrics = DataflowMetrics(self.dataflow_client, result, self.job)
    result.metric_results = self._metrics
    return result