public LaunchInfo launch()

in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java [285:404]


  public LaunchInfo launch(String project, String region, LaunchConfig options) throws IOException {
    checkState(
        options.sdk() != null,
        "Cannot launch a dataflow job "
            + "without sdk specified. Please specify sdk and try again!");
    LOG.info("Getting ready to launch {} in {} under {}", options.jobName(), region, project);
    LOG.info("Using parameters:\n{}", formatForLogging(options.parameters()));
    // Create SDK specific command and execute to launch dataflow job
    List<String> cmd = new ArrayList<>();
    String jobId;
    switch (options.sdk()) {
      case JAVA:
        checkState(
            options.pipeline() != null,
            "Cannot launch a dataflow job "
                + "without pipeline specified. Please specify pipeline and try again!");
        PipelineOptions pipelineOptions = options.pipeline().getOptions();
        if ("DataflowRunner".equalsIgnoreCase(options.getParameter("runner"))) {
          List<String> optionFromConfig = extractOptions(project, region, options);
          // a few options need to be set in pipeline expansion time, so they need to be preserved
          // here.
          // known options included: --streaming(expansion depends on) --tempLocation(validation
          // depends on)
          if (pipelineOptions.as(StreamingOptions.class).isStreaming()) {
            optionFromConfig.add("--streaming");
          }
          if (!Strings.isNullOrEmpty(pipelineOptions.getTempLocation())) {
            optionFromConfig.add(
                String.format("--tempLocation=%s", pipelineOptions.getTempLocation()));
          }

          // dataflow runner specific options
          PipelineOptions updatedOptions =
              PipelineOptionsFactory.fromArgs(optionFromConfig.toArray(new String[] {})).create();
          updatedOptions.setJobName(options.jobName());
          PipelineResult pipelineResult = options.pipeline().run(updatedOptions);
          // dataflow runner generated a jobId of certain format for each job
          DataflowPipelineJob job = (DataflowPipelineJob) pipelineResult;
          jobId = job.getJobId();
          UNMANAGED_JOBS.put(jobId, pipelineResult);
          launchedJobs.add(jobId);
        } else {

          pipelineOptions.setRunner(PipelineUtils.getRunnerClass(options.getParameter("runner")));
          pipelineOptions.setJobName(options.jobName());
          // for unsupported runners (e.g. direct runner) runner, manually record job properties
          Map<String, String> jobProperties = new HashMap<>();
          jobProperties.put(
              "createTime", Timestamps.toString(Timestamps.fromMillis(System.currentTimeMillis())));
          if (pipelineOptions.as(StreamingOptions.class).isStreaming()) {
            jobProperties.put("jobType", "JOB_TYPE_STREAMING");
          } else {
            jobProperties.put("jobType", "JOB_TYPE_BATCH");
          }
          PipelineResult pipelineResult = options.pipeline().run();
          // for unsupported runners (e.g. direct runner), set jobId the same as jobName
          jobId = options.jobName();
          MANAGED_JOBS.put(jobId, pipelineResult);
          // for unsupported runners (e.g. direct runner), return a wrapped LaunchInfo
          return LaunchInfo.builder()
              .setJobId(jobId)
              .setProjectId(project)
              .setRegion(region)
              .setCreateTime(jobProperties.get("createTime"))
              .setSdk("DirectBeam")
              .setVersion("0.0.1")
              .setJobType(jobProperties.get("jobType"))
              .setRunner(options.getParameter("runner"))
              .setParameters(options.parameters())
              .setState(JobState.RUNNING)
              .build();
        }
        break;
      case PYTHON:
        checkState(
            options.executable() != null,
            "Cannot launch a dataflow job "
                + "without executable specified. Please specify executable and try again!");
        if (options.requirementsFile() != null) {
          // install requirements
          cmd.add(
              "virtualenv . && source ./bin/activate && pip3 install -r "
                  + options.requirementsFile());
          cmd.add("&&");
        }
        LOG.info("Using the executable at {}", options.executable());
        cmd.add("python3");
        cmd.add(options.executable());
        cmd.addAll(extractOptions(project, region, options));
        if (options.requirementsFile() != null) {
          cmd.add("&&");
          cmd.add("deactivate");
        }
        jobId = executeCommandAndParseResponse(String.join(" ", cmd));
        break;
      case GO:
        checkState(
            options.executable() != null,
            "Cannot launch a dataflow job "
                + "without executable specified. Please specify executable and try again!");
        LOG.info("Using the executable at {}", options.executable());
        cmd.add("go");
        cmd.add("run");
        cmd.add(options.executable());
        cmd.addAll(extractOptions(project, region, options));
        jobId = executeCommandAndParseResponse(String.join(" ", cmd));
        break;
      default:
        throw new RuntimeException(
            String.format(
                "Invalid sdk %s specified. " + "sdk can be one of java, python, or go.",
                options.sdk()));
    }
    // Wait until the job is active to get more information
    JobState state = waitUntilActive(project, region, jobId);
    Job job = getJob(project, region, jobId, "JOB_VIEW_DESCRIPTION");
    LOG.info("Received Dataflow job {}: {}", job.getId(), formatForLogging(job));

    return getJobInfo(options, state, job);
  }