public DataflowPipelineJob run()

in runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java [1236:1640]


  public DataflowPipelineJob run(Pipeline pipeline) {
    // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded
    // to Runner v2.
    if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) {
      List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
      if (!experiments.contains("use_runner_v2")) {
        LOG.info(
            "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language"
                + " transforms or pipeline needed a transform upgrade.");
        options.setExperiments(
            ImmutableList.<String>builder().addAll(experiments).add("use_runner_v2").build());
      }
    }
    if (useUnifiedWorker(options)) {
      if (hasExperiment(options, "disable_runner_v2")
          || hasExperiment(options, "disable_runner_v2_until_2023")
          || hasExperiment(options, "disable_prime_runner_v2")) {
        throw new IllegalArgumentException(
            "Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
      }
      List<String> experiments =
          new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
      if (!experiments.contains("use_runner_v2")) {
        experiments.add("use_runner_v2");
      }
      if (!experiments.contains("use_unified_worker")) {
        experiments.add("use_unified_worker");
      }
      if (!experiments.contains("beam_fn_api")) {
        experiments.add("beam_fn_api");
      }
      if (!experiments.contains("use_portable_job_submission")) {
        experiments.add("use_portable_job_submission");
      }
      options.setExperiments(ImmutableList.copyOf(experiments));
      // Ensure that logging via the FnApi is enabled
      options.as(SdkHarnessOptions.class).setEnableLogViaFnApi(true);
    }

    logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
    logWarningIfBigqueryDLQUnused(pipeline);
    if (shouldActAsStreaming(pipeline)) {
      options.setStreaming(true);

      if (useUnifiedWorker(options)) {
        options.setEnableStreamingEngine(true);
        List<String> experiments =
            new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
        if (!experiments.contains("enable_streaming_engine")) {
          experiments.add("enable_streaming_engine");
        }
        if (!experiments.contains("enable_windmill_service")) {
          experiments.add("enable_windmill_service");
        }
      }
    }

    if (!ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) {
      ProjectionPushdownOptimizer.optimize(pipeline);
    }

    LOG.info(
        "Executing pipeline on the Dataflow Service, which will have billing implications "
            + "related to Google Compute Engine usage and other Google Cloud Services.");

    DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
    String workerHarnessContainerImageURL = DataflowRunner.getContainerImageForJob(dataflowOptions);

    // This incorrectly puns the worker harness container image (which implements v1beta3 API)
    // with the SDK harness image (which implements Fn API).
    //
    // The same Environment is used in different and contradictory ways, depending on whether
    // it is a v1 or v2 job submission.
    RunnerApi.Environment defaultEnvironmentForDataflow =
        Environments.createDockerEnvironment(workerHarnessContainerImageURL);

    // The SdkComponents for portable an non-portable job submission must be kept distinct. Both
    // need the default environment.
    SdkComponents portableComponents = SdkComponents.create();
    portableComponents.registerEnvironment(
        defaultEnvironmentForDataflow
            .toBuilder()
            .addAllDependencies(getDefaultArtifacts())
            .addAllCapabilities(Environments.getJavaCapabilities())
            .build());

    RunnerApi.Pipeline portablePipelineProto =
        PipelineTranslation.toProto(pipeline, portableComponents, false);
    // Note that `stageArtifacts` has to be called before `resolveArtifact` because
    // `resolveArtifact` updates local paths to staged paths in pipeline proto.
    portablePipelineProto = resolveAnyOfEnvironments(portablePipelineProto);
    List<DataflowPackage> packages = stageArtifacts(portablePipelineProto);
    portablePipelineProto = resolveArtifacts(portablePipelineProto);
    portablePipelineProto = applySdkEnvironmentOverrides(portablePipelineProto, options);
    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "Portable pipeline proto:\n{}",
          TextFormat.printer().printToString(portablePipelineProto));
    }
    // Stage the portable pipeline proto, retrieving the staged pipeline path, then update
    // the options on the new job
    // TODO: add an explicit `pipeline` parameter to the submission instead of pipeline options
    LOG.info("Staging portable pipeline proto to {}", options.getStagingLocation());
    byte[] serializedProtoPipeline = portablePipelineProto.toByteArray();

    DataflowPackage stagedPipeline =
        options.getStager().stageToFile(serializedProtoPipeline, PIPELINE_FILE_NAME);
    dataflowOptions.setPipelineUrl(stagedPipeline.getLocation());

    if (useUnifiedWorker(options)) {
      LOG.info("Skipping v1 transform replacements since job will run on v2.");
    } else {
      // Now rewrite things to be as needed for v1 (mutates the pipeline)
      // This way the job submitted is valid for v1 and v2, simultaneously
      replaceV1Transforms(pipeline);
    }
    // Capture the SdkComponents for look up during step translations
    SdkComponents dataflowV1Components = SdkComponents.create();
    dataflowV1Components.registerEnvironment(
        defaultEnvironmentForDataflow
            .toBuilder()
            .addAllDependencies(getDefaultArtifacts())
            .addAllCapabilities(Environments.getJavaCapabilities())
            .build());
    // No need to perform transform upgrading for the Runner v1 proto.
    RunnerApi.Pipeline dataflowV1PipelineProto =
        PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);

    if (LOG.isDebugEnabled()) {
      LOG.debug(
          "Dataflow v1 pipeline proto:\n{}",
          TextFormat.printer().printToString(dataflowV1PipelineProto));
    }

    // Set a unique client_request_id in the CreateJob request.
    // This is used to ensure idempotence of job creation across retried
    // attempts to create a job. Specifically, if the service returns a job with
    // a different client_request_id, it means the returned one is a different
    // job previously created with the same job name, and that the job creation
    // has been effectively rejected. The SDK should return
    // Error::Already_Exists to user in that case.
    int randomNum = new Random().nextInt(9000) + 1000;
    String requestId =
        DateTimeFormat.forPattern("YYYYMMddHHmmssmmm")
                .withZone(DateTimeZone.UTC)
                .print(DateTimeUtils.currentTimeMillis())
            + "_"
            + randomNum;

    JobSpecification jobSpecification =
        translator.translate(
            pipeline, dataflowV1PipelineProto, dataflowV1Components, this, packages);

    if (!isNullOrEmpty(dataflowOptions.getDataflowWorkerJar()) && !useUnifiedWorker(options)) {
      List<String> experiments =
          firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList());
      if (!experiments.contains("use_staged_dataflow_worker_jar")) {
        dataflowOptions.setExperiments(
            ImmutableList.<String>builder()
                .addAll(experiments)
                .add("use_staged_dataflow_worker_jar")
                .build());
      }
    }

    Job newJob = jobSpecification.getJob();
    try {
      newJob
          .getEnvironment()
          .setSdkPipelineOptions(
              MAPPER.readValue(MAPPER_WITH_MODULES.writeValueAsBytes(options), Map.class));
    } catch (IOException e) {
      throw new IllegalArgumentException(
          "PipelineOptions specified failed to serialize to JSON.", e);
    }
    newJob.setClientRequestId(requestId);

    DataflowRunnerInfo dataflowRunnerInfo = DataflowRunnerInfo.getDataflowRunnerInfo();
    String version = dataflowRunnerInfo.getVersion();
    checkState(
        !"${pom.version}".equals(version),
        "Unable to submit a job to the Dataflow service with unset version ${pom.version}");
    LOG.info("Dataflow SDK version: {}", version);

    newJob.getEnvironment().setUserAgent((Map) dataflowRunnerInfo.getProperties());
    // The Dataflow Service may write to the temporary directory directly, so
    // must be verified.
    if (!isNullOrEmpty(options.getGcpTempLocation())) {
      newJob
          .getEnvironment()
          .setTempStoragePrefix(
              dataflowOptions.getPathValidator().verifyPath(options.getGcpTempLocation()));
    }
    newJob.getEnvironment().setDataset(options.getTempDatasetId());

    if (options.getWorkerRegion() != null) {
      newJob.getEnvironment().setWorkerRegion(options.getWorkerRegion());
    }
    if (options.getWorkerZone() != null) {
      newJob.getEnvironment().setWorkerZone(options.getWorkerZone());
    }

    if (options.getFlexRSGoal()
        == DataflowPipelineOptions.FlexResourceSchedulingGoal.COST_OPTIMIZED) {
      newJob.getEnvironment().setFlexResourceSchedulingGoal("FLEXRS_COST_OPTIMIZED");
    } else if (options.getFlexRSGoal()
        == DataflowPipelineOptions.FlexResourceSchedulingGoal.SPEED_OPTIMIZED) {
      newJob.getEnvironment().setFlexResourceSchedulingGoal("FLEXRS_SPEED_OPTIMIZED");
    }

    // Represent the minCpuPlatform pipeline option as an experiment, if not already present.
    if (!isNullOrEmpty(dataflowOptions.getMinCpuPlatform())) {
      List<String> experiments =
          firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList());

      List<String> minCpuFlags =
          experiments.stream()
              .filter(p -> p.startsWith("min_cpu_platform"))
              .collect(Collectors.toList());

      if (minCpuFlags.isEmpty()) {
        dataflowOptions.setExperiments(
            ImmutableList.<String>builder()
                .addAll(experiments)
                .add("min_cpu_platform=" + dataflowOptions.getMinCpuPlatform())
                .build());
      } else {
        LOG.warn(
            "Flag min_cpu_platform is defined in both top level PipelineOption, "
                + "as well as under experiments. Proceed using {}.",
            minCpuFlags.get(0));
      }
    }

    newJob
        .getEnvironment()
        .setExperiments(
            ImmutableList.copyOf(
                firstNonNull(dataflowOptions.getExperiments(), Collections.emptyList())));

    // Set the Docker container image that executes Dataflow worker harness, residing in Google
    // Container Registry. Translator is guaranteed to create a worker pool prior to this point.
    // For runner_v1, only worker_harness_container is set.
    // For runner_v2, both worker_harness_container and sdk_harness_container are set to the same
    // value.
    String containerImage = getContainerImageForJob(options);
    for (WorkerPool workerPool : newJob.getEnvironment().getWorkerPools()) {
      workerPool.setWorkerHarnessContainerImage(containerImage);
    }

    configureSdkHarnessContainerImages(options, portablePipelineProto, newJob);

    newJob.getEnvironment().setVersion(getEnvironmentVersion(options));

    if (hooks != null) {
      hooks.modifyEnvironmentBeforeSubmission(newJob.getEnvironment());
    }

    // enable upload_graph when the graph is too large
    byte[] jobGraphBytes = DataflowPipelineTranslator.jobToString(newJob).getBytes(UTF_8);
    int jobGraphByteSize = jobGraphBytes.length;
    if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES
        && !hasExperiment(options, "upload_graph")
        && !useUnifiedWorker(options)) {
      List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
      options.setExperiments(
          ImmutableList.<String>builder().addAll(experiments).add("upload_graph").build());
      LOG.info(
          "The job graph size ({} in bytes) is larger than {}. Automatically add "
              + "the upload_graph option to experiments.",
          jobGraphByteSize,
          CREATE_JOB_REQUEST_LIMIT_BYTES);
    }

    if (hasExperiment(options, "upload_graph") && useUnifiedWorker(options)) {
      ArrayList<String> experiments = new ArrayList<>(options.getExperiments());
      while (experiments.remove("upload_graph")) {}
      options.setExperiments(experiments);
      LOG.warn(
          "The upload_graph experiment was specified, but it does not apply "
              + "to runner v2 jobs. Option has been automatically removed.");
    }

    // Upload the job to GCS and remove the graph object from the API call.  The graph
    // will be downloaded from GCS by the service.
    if (hasExperiment(options, "upload_graph")) {
      DataflowPackage stagedGraph =
          options.getStager().stageToFile(jobGraphBytes, DATAFLOW_GRAPH_FILE_NAME);
      newJob.getSteps().clear();
      newJob.setStepsLocation(stagedGraph.getLocation());
    }

    if (!isNullOrEmpty(options.getDataflowJobFile())
        || !isNullOrEmpty(options.getTemplateLocation())) {
      boolean isTemplate = !isNullOrEmpty(options.getTemplateLocation());
      if (isTemplate) {
        checkArgument(
            isNullOrEmpty(options.getDataflowJobFile()),
            "--dataflowJobFile and --templateLocation are mutually exclusive.");
      }
      String fileLocation =
          firstNonNull(options.getTemplateLocation(), options.getDataflowJobFile());
      checkArgument(
          fileLocation.startsWith("/") || fileLocation.startsWith("gs://"),
          "Location must be local or on Cloud Storage, got %s.",
          fileLocation);

      try {
        printWorkSpecJsonToFile(fileLocation, newJob);
        LOG.info("Printed job specification to {}", fileLocation);
      } catch (IOException ex) {
        String error = String.format("Cannot create output file at %s", fileLocation);
        if (isTemplate) {
          throw new RuntimeException(error, ex);
        } else {
          LOG.warn(error, ex);
        }
      }

      if (isTemplate) {
        LOG.info("Template successfully created.");
        return new DataflowTemplateJob();
      }
    }

    String jobIdToUpdate = null;
    if (options.isUpdate()) {
      jobIdToUpdate = getJobIdFromName(options.getJobName());
      newJob.setTransformNameMapping(options.getTransformNameMapping());
      newJob.setReplaceJobId(jobIdToUpdate);
    }
    if (options.getCreateFromSnapshot() != null && !options.getCreateFromSnapshot().isEmpty()) {
      newJob.setTransformNameMapping(options.getTransformNameMapping());
      newJob.setCreatedFromSnapshotId(options.getCreateFromSnapshot());
    }

    Job jobResult;
    try {
      jobResult = dataflowClient.createJob(newJob);
    } catch (GoogleJsonResponseException e) {
      String errorMessages = "Unexpected errors";
      if (e.getDetails() != null) {
        if (jobGraphByteSize >= CREATE_JOB_REQUEST_LIMIT_BYTES) {
          errorMessages =
              "The size of the serialized JSON representation of the pipeline "
                  + "exceeds the allowable limit. "
                  + "For more information, please see the documentation on job submission:\n"
                  + "https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#jobs";
        } else {
          errorMessages = e.getDetails().getMessage();
        }
      }
      throw new RuntimeException("Failed to create a workflow job: " + errorMessages, e);
    } catch (IOException e) {
      throw new RuntimeException("Failed to create a workflow job", e);
    }

    // Use a raw client for post-launch monitoring, as status calls may fail
    // regularly and need not be retried automatically.
    DataflowPipelineJob dataflowPipelineJob =
        new DataflowPipelineJob(
            DataflowClient.create(options),
            jobResult.getId(),
            options,
            jobSpecification != null ? jobSpecification.getStepNames() : Collections.emptyMap(),
            portablePipelineProto);

    // If the service returned client request id, the SDK needs to compare it
    // with the original id generated in the request, if they are not the same
    // (i.e., the returned job is not created by this request), throw
    // DataflowJobAlreadyExistsException or DataflowJobAlreadyUpdatedException
    // depending on whether this is a reload or not.
    if (jobResult.getClientRequestId() != null
        && !jobResult.getClientRequestId().isEmpty()
        && !jobResult.getClientRequestId().equals(requestId)) {
      // If updating a job.
      if (options.isUpdate()) {
        throw new DataflowJobAlreadyUpdatedException(
            dataflowPipelineJob,
            String.format(
                "The job named %s with id: %s has already been updated into job id: %s "
                    + "and cannot be updated again.",
                newJob.getName(), jobIdToUpdate, jobResult.getId()));
      } else {
        throw new DataflowJobAlreadyExistsException(
            dataflowPipelineJob,
            String.format(
                "There is already an active job named %s with id: %s. If you want to submit a"
                    + " second job, try again by setting a different name using --jobName.",
                newJob.getName(), jobResult.getId()));
      }
    }

    LOG.info(
        "To access the Dataflow monitoring console, please navigate to {}",
        MonitoringUtil.getJobMonitoringPageURL(
            options.getProject(), options.getRegion(), jobResult.getId()));
    LOG.info("Submitted job: {}", jobResult.getId());

    LOG.info(
        "To cancel the job using the 'gcloud' tool, run:\n> {}",
        MonitoringUtil.getGcloudCancelCommand(options, jobResult.getId()));

    return dataflowPipelineJob;
  }