in sdks/go/pkg/beam/runners/dataflow/dataflow.go [259:416]
func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions, error) {
project := gcpopts.GetProjectFromFlagOrEnvironment(ctx)
if project == "" {
return nil, errors.New("no Google Cloud project specified. Use --project=<project>")
}
region := gcpopts.GetRegion(ctx)
if region == "" {
return nil, errors.New("no Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
}
if *stagingLocation == "" {
return nil, errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
}
var jobLabels map[string]string
if *labels != "" {
if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil {
return nil, errors.Wrapf(err, "error reading --label flag as JSON")
}
}
if *cpuProfiling != "" {
perf.EnableProfCaptureHook("gcs_profile_writer", *cpuProfiling)
}
if *autoscalingAlgorithm != "" {
if *autoscalingAlgorithm != "NONE" && *autoscalingAlgorithm != "THROUGHPUT_BASED" {
return nil, errors.New("invalid autoscaling algorithm. Use --autoscaling_algorithm=(NONE|THROUGHPUT_BASED)")
}
}
if *flexRSGoal != "" {
switch *flexRSGoal {
case "FLEXRS_UNSPECIFIED", "FLEXRS_SPEED_OPTIMIZED", "FLEXRS_COST_OPTIMIZED":
// valid values
default:
return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
}
}
if !streaming && *transformMapping != "" {
return nil, errors.New("provided transform_name_mapping for a batch pipeline, did you mean to construct a streaming pipeline?")
}
if !*update && *transformMapping != "" {
return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated")
}
var updateTransformMapping map[string]string
if *transformMapping != "" {
if err := json.Unmarshal([]byte(*transformMapping), &updateTransformMapping); err != nil {
return nil, errors.Wrapf(err, "error reading --transform_name_mapping flag as JSON")
}
}
if *usePublicIPs == *noUsePublicIPs {
useSet := isFlagPassed("use_public_ips")
noUseSet := isFlagPassed("no_use_public_ips")
// If use_public_ips was explicitly set but no_use_public_ips was not, use that value
// We take the explicit value of no_use_public_ips if it was set but use_public_ips was not.
if useSet && !noUseSet {
*noUsePublicIPs = !*usePublicIPs
} else if useSet && noUseSet {
return nil, errors.New("exactly one of usePublicIPs and noUsePublicIPs must be true, please check that only one is true")
}
}
hooks.SerializeHooksToOptions()
experiments := jobopts.GetExperiments()
// Ensure that we enable the same set of experiments across all SDKs
// for runner v2.
var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
for _, e := range experiments {
if strings.Contains(e, "beam_fn_api") {
fnApiSet = true
}
if strings.Contains(e, "use_runner_v2") {
v2set = true
}
if strings.Contains(e, "use_unified_worker") {
uwSet = true
}
if strings.Contains(e, "use_portable_job_submission") {
portaSubmission = true
}
if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") {
return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
}
}
// Enable default experiments.
if !fnApiSet {
experiments = append(experiments, "beam_fn_api")
}
if !v2set {
experiments = append(experiments, "use_runner_v2")
}
if !uwSet {
experiments = append(experiments, "use_unified_worker")
}
if !portaSubmission {
experiments = append(experiments, "use_portable_job_submission")
}
// Ensure that streaming specific experiments are set for streaming pipelines
// since runner v2 only supports using streaming engine.
if streaming {
if !seSet {
experiments = append(experiments, "enable_streaming_engine")
}
if !wsSet {
experiments = append(experiments, "enable_windmill_service")
}
}
if *minCPUPlatform != "" {
experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform))
}
var dfServiceOptions []string
if *dataflowServiceOptions != "" {
dfServiceOptions = strings.Split(*dataflowServiceOptions, ",")
}
beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
opts := &dataflowlib.JobOptions{
Name: jobopts.GetJobName(),
Streaming: streaming,
Experiments: experiments,
DataflowServiceOptions: dfServiceOptions,
Options: beam.PipelineOptions.Export(),
Project: project,
Region: region,
Zone: *zone,
KmsKey: *kmsKey,
Network: *network,
Subnetwork: *subnetwork,
NoUsePublicIPs: *noUsePublicIPs,
NumWorkers: *numWorkers,
MaxNumWorkers: *maxNumWorkers,
WorkerHarnessThreads: *workerHarnessThreads,
DiskSizeGb: *diskSizeGb,
DiskType: *diskType,
Algorithm: *autoscalingAlgorithm,
FlexRSGoal: *flexRSGoal,
MachineType: *firstNonEmpty(workerMachineType, machineType),
Labels: jobLabels,
ServiceAccountEmail: *serviceAccountEmail,
TempLocation: *tempLocation,
TemplateLocation: *templateLocation,
Worker: *jobopts.WorkerBinary,
WorkerRegion: *workerRegion,
WorkerZone: *workerZone,
TeardownPolicy: *teardownPolicy,
ContainerImage: getContainerImage(ctx),
Update: *update,
TransformNameMapping: updateTransformMapping,
}
if opts.TempLocation == "" {
opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
}
return opts, nil
}