in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java [505:590]
protected LaunchInfo launchTemplate(
LaunchConfig.Builder options, boolean setupShutdownHook, Template templateMetadata)
throws IOException {
boolean flex =
templateMetadata.flexContainerName() != null
&& !templateMetadata.flexContainerName().isEmpty();
// Property allows testing with Runner v2 / Unified Worker
String unifiedWorkerHarnessContainerImage =
System.getProperty("unifiedWorkerHarnessContainerImage");
if (!skipRunnerV2
&& (System.getProperty("unifiedWorker") != null
|| unifiedWorkerHarnessContainerImage != null)) {
appendExperiment(options, "use_runner_v2");
if (System.getProperty("sdkContainerImage") != null) {
options.addParameter("sdkContainerImage", System.getProperty("sdkContainerImage"));
}
if (unifiedWorkerHarnessContainerImage != null) {
appendExperiment(
options,
"runner_harness_container_image="
+ System.getProperty("unifiedWorkerHarnessContainerImage"));
appendExperiment(options, "use_beam_bq_sink");
appendExperiment(options, "beam_fn_api");
appendExperiment(options, "use_unified_worker");
appendExperiment(options, "use_portable_job_submission");
appendExperiment(options, "worker_region=" + REGION);
}
}
if (System.getProperty("enableCleanupState") != null) {
appendExperiment(options, "enable_cleanup_state");
}
// Property allows testing with Streaming Engine Enabled
if (System.getProperty("enableStreamingEngine") != null) {
options.addEnvironment("enableStreamingEngine", true);
}
if (System.getProperty("workerMachineType") != null) {
options.addEnvironment("workerMachineType", System.getProperty("workerMachineType"));
}
if (System.getProperty("streamingAtLeastOnce") != null) {
appendExperiment(options, "streaming_mode_at_least_once");
}
if (usingDirectRunner) {
// For direct runner tests we need to explicitly add a tempLocation if missing
if (options.getParameter("tempLocation") == null) {
options.addParameter("tempLocation", "gs://" + artifactBucketName + "/temp/");
}
if (System.getProperty("runner") != null) {
options.addParameter("runner", System.getProperty("runner"));
}
// If template has creation parameters, they need to be specified as a --parameter=value
for (Method method : templateMetadata.optionsClass().getMethods()) {
TemplateCreationParameters creationParameters =
method.getAnnotation(TemplateCreationParameters.class);
if (creationParameters != null) {
for (TemplateCreationParameter param : creationParameters.value()) {
if (param.template() == null
|| param.template().isEmpty()
|| param.template().equals(template.name())) {
options.addParameter(
MetadataUtils.getParameterNameFromMethod(method.getName()), param.value());
}
}
}
}
}
LaunchInfo launchInfo = pipelineLauncher.launch(PROJECT, REGION, options.build());
// if the launch succeeded and setupShutdownHook is enabled, setup a thread to cancel job
if (setupShutdownHook && launchInfo.jobId() != null && !launchInfo.jobId().isEmpty()) {
Runtime.getRuntime()
.addShutdownHook(new Thread(new CancelJobShutdownHook(pipelineLauncher, launchInfo)));
}
printJobLink(testName, launchInfo);
return launchInfo;
}