protected LaunchInfo launchTemplate()

in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java [505:590]


  protected LaunchInfo launchTemplate(
      LaunchConfig.Builder options, boolean setupShutdownHook, Template templateMetadata)
      throws IOException {

    boolean flex =
        templateMetadata.flexContainerName() != null
            && !templateMetadata.flexContainerName().isEmpty();

    // Property allows testing with Runner v2 / Unified Worker
    String unifiedWorkerHarnessContainerImage =
        System.getProperty("unifiedWorkerHarnessContainerImage");
    if (!skipRunnerV2
        && (System.getProperty("unifiedWorker") != null
            || unifiedWorkerHarnessContainerImage != null)) {
      appendExperiment(options, "use_runner_v2");
      if (System.getProperty("sdkContainerImage") != null) {
        options.addParameter("sdkContainerImage", System.getProperty("sdkContainerImage"));
      }
      if (unifiedWorkerHarnessContainerImage != null) {
        appendExperiment(
            options,
            "runner_harness_container_image="
                + System.getProperty("unifiedWorkerHarnessContainerImage"));
        appendExperiment(options, "use_beam_bq_sink");
        appendExperiment(options, "beam_fn_api");
        appendExperiment(options, "use_unified_worker");
        appendExperiment(options, "use_portable_job_submission");
        appendExperiment(options, "worker_region=" + REGION);
      }
    }

    if (System.getProperty("enableCleanupState") != null) {
      appendExperiment(options, "enable_cleanup_state");
    }

    // Property allows testing with Streaming Engine Enabled
    if (System.getProperty("enableStreamingEngine") != null) {
      options.addEnvironment("enableStreamingEngine", true);
    }

    if (System.getProperty("workerMachineType") != null) {
      options.addEnvironment("workerMachineType", System.getProperty("workerMachineType"));
    }

    if (System.getProperty("streamingAtLeastOnce") != null) {
      appendExperiment(options, "streaming_mode_at_least_once");
    }

    if (usingDirectRunner) {
      // For direct runner tests we need to explicitly add a tempLocation if missing
      if (options.getParameter("tempLocation") == null) {
        options.addParameter("tempLocation", "gs://" + artifactBucketName + "/temp/");
      }

      if (System.getProperty("runner") != null) {
        options.addParameter("runner", System.getProperty("runner"));
      }

      // If template has creation parameters, they need to be specified as a --parameter=value
      for (Method method : templateMetadata.optionsClass().getMethods()) {
        TemplateCreationParameters creationParameters =
            method.getAnnotation(TemplateCreationParameters.class);
        if (creationParameters != null) {
          for (TemplateCreationParameter param : creationParameters.value()) {
            if (param.template() == null
                || param.template().isEmpty()
                || param.template().equals(template.name())) {
              options.addParameter(
                  MetadataUtils.getParameterNameFromMethod(method.getName()), param.value());
            }
          }
        }
      }
    }

    LaunchInfo launchInfo = pipelineLauncher.launch(PROJECT, REGION, options.build());

    // if the launch succeeded and setupShutdownHook is enabled, setup a thread to cancel job
    if (setupShutdownHook && launchInfo.jobId() != null && !launchInfo.jobId().isEmpty()) {
      Runtime.getRuntime()
          .addShutdownHook(new Thread(new CancelJobShutdownHook(pipelineLauncher, launchInfo)));
    }
    printJobLink(testName, launchInfo);

    return launchInfo;
  }