in it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DefaultPipelineLauncher.java [285:404]
public LaunchInfo launch(String project, String region, LaunchConfig options) throws IOException {
checkState(
options.sdk() != null,
"Cannot launch a dataflow job "
+ "without sdk specified. Please specify sdk and try again!");
LOG.info("Getting ready to launch {} in {} under {}", options.jobName(), region, project);
LOG.info("Using parameters:\n{}", formatForLogging(options.parameters()));
// Create SDK specific command and execute to launch dataflow job
List<String> cmd = new ArrayList<>();
String jobId;
switch (options.sdk()) {
case JAVA:
checkState(
options.pipeline() != null,
"Cannot launch a dataflow job "
+ "without pipeline specified. Please specify pipeline and try again!");
PipelineOptions pipelineOptions = options.pipeline().getOptions();
if ("DataflowRunner".equalsIgnoreCase(options.getParameter("runner"))) {
List<String> optionFromConfig = extractOptions(project, region, options);
// a few options need to be set in pipeline expansion time, so they need to be preserved
// here.
// known options included: --streaming(expansion depends on) --tempLocation(validation
// depends on)
if (pipelineOptions.as(StreamingOptions.class).isStreaming()) {
optionFromConfig.add("--streaming");
}
if (!Strings.isNullOrEmpty(pipelineOptions.getTempLocation())) {
optionFromConfig.add(
String.format("--tempLocation=%s", pipelineOptions.getTempLocation()));
}
// dataflow runner specific options
PipelineOptions updatedOptions =
PipelineOptionsFactory.fromArgs(optionFromConfig.toArray(new String[] {})).create();
updatedOptions.setJobName(options.jobName());
PipelineResult pipelineResult = options.pipeline().run(updatedOptions);
// dataflow runner generated a jobId of certain format for each job
DataflowPipelineJob job = (DataflowPipelineJob) pipelineResult;
jobId = job.getJobId();
UNMANAGED_JOBS.put(jobId, pipelineResult);
launchedJobs.add(jobId);
} else {
pipelineOptions.setRunner(PipelineUtils.getRunnerClass(options.getParameter("runner")));
pipelineOptions.setJobName(options.jobName());
// for unsupported runners (e.g. direct runner) runner, manually record job properties
Map<String, String> jobProperties = new HashMap<>();
jobProperties.put(
"createTime", Timestamps.toString(Timestamps.fromMillis(System.currentTimeMillis())));
if (pipelineOptions.as(StreamingOptions.class).isStreaming()) {
jobProperties.put("jobType", "JOB_TYPE_STREAMING");
} else {
jobProperties.put("jobType", "JOB_TYPE_BATCH");
}
PipelineResult pipelineResult = options.pipeline().run();
// for unsupported runners (e.g. direct runner), set jobId the same as jobName
jobId = options.jobName();
MANAGED_JOBS.put(jobId, pipelineResult);
// for unsupported runners (e.g. direct runner), return a wrapped LaunchInfo
return LaunchInfo.builder()
.setJobId(jobId)
.setProjectId(project)
.setRegion(region)
.setCreateTime(jobProperties.get("createTime"))
.setSdk("DirectBeam")
.setVersion("0.0.1")
.setJobType(jobProperties.get("jobType"))
.setRunner(options.getParameter("runner"))
.setParameters(options.parameters())
.setState(JobState.RUNNING)
.build();
}
break;
case PYTHON:
checkState(
options.executable() != null,
"Cannot launch a dataflow job "
+ "without executable specified. Please specify executable and try again!");
if (options.requirementsFile() != null) {
// install requirements
cmd.add(
"virtualenv . && source ./bin/activate && pip3 install -r "
+ options.requirementsFile());
cmd.add("&&");
}
LOG.info("Using the executable at {}", options.executable());
cmd.add("python3");
cmd.add(options.executable());
cmd.addAll(extractOptions(project, region, options));
if (options.requirementsFile() != null) {
cmd.add("&&");
cmd.add("deactivate");
}
jobId = executeCommandAndParseResponse(String.join(" ", cmd));
break;
case GO:
checkState(
options.executable() != null,
"Cannot launch a dataflow job "
+ "without executable specified. Please specify executable and try again!");
LOG.info("Using the executable at {}", options.executable());
cmd.add("go");
cmd.add("run");
cmd.add(options.executable());
cmd.addAll(extractOptions(project, region, options));
jobId = executeCommandAndParseResponse(String.join(" ", cmd));
break;
default:
throw new RuntimeException(
String.format(
"Invalid sdk %s specified. " + "sdk can be one of java, python, or go.",
options.sdk()));
}
// Wait until the job is active to get more information
JobState state = waitUntilActive(project, region, jobId);
Job job = getJob(project, region, jobId, "JOB_VIEW_DESCRIPTION");
LOG.info("Received Dataflow job {}: {}", job.getId(), formatForLogging(job));
return getJobInfo(options, state, job);
}