func getJobOptions()

in sdks/go/pkg/beam/runners/dataflow/dataflow.go [259:416]


func getJobOptions(ctx context.Context, streaming bool) (*dataflowlib.JobOptions, error) {
	project := gcpopts.GetProjectFromFlagOrEnvironment(ctx)
	if project == "" {
		return nil, errors.New("no Google Cloud project specified. Use --project=<project>")
	}
	region := gcpopts.GetRegion(ctx)
	if region == "" {
		return nil, errors.New("no Google Cloud region specified. Use --region=<region>. See https://cloud.google.com/dataflow/docs/concepts/regional-endpoints")
	}
	if *stagingLocation == "" {
		return nil, errors.New("no GCS staging location specified. Use --staging_location=gs://<bucket>/<path>")
	}
	var jobLabels map[string]string
	if *labels != "" {
		if err := json.Unmarshal([]byte(*labels), &jobLabels); err != nil {
			return nil, errors.Wrapf(err, "error reading --label flag as JSON")
		}
	}

	if *cpuProfiling != "" {
		perf.EnableProfCaptureHook("gcs_profile_writer", *cpuProfiling)
	}

	if *autoscalingAlgorithm != "" {
		if *autoscalingAlgorithm != "NONE" && *autoscalingAlgorithm != "THROUGHPUT_BASED" {
			return nil, errors.New("invalid autoscaling algorithm. Use --autoscaling_algorithm=(NONE|THROUGHPUT_BASED)")
		}
	}

	if *flexRSGoal != "" {
		switch *flexRSGoal {
		case "FLEXRS_UNSPECIFIED", "FLEXRS_SPEED_OPTIMIZED", "FLEXRS_COST_OPTIMIZED":
			// valid values
		default:
			return nil, errors.Errorf("invalid flex resource scheduling goal. Got %q; Use --flexrs_goal=(FLEXRS_UNSPECIFIED|FLEXRS_SPEED_OPTIMIZED|FLEXRS_COST_OPTIMIZED)", *flexRSGoal)
		}
	}
	if !streaming && *transformMapping != "" {
		return nil, errors.New("provided transform_name_mapping for a batch pipeline, did you mean to construct a streaming pipeline?")
	}
	if !*update && *transformMapping != "" {
		return nil, errors.New("provided transform_name_mapping without setting the --update flag, so the pipeline would not be updated")
	}
	var updateTransformMapping map[string]string
	if *transformMapping != "" {
		if err := json.Unmarshal([]byte(*transformMapping), &updateTransformMapping); err != nil {
			return nil, errors.Wrapf(err, "error reading --transform_name_mapping flag as JSON")
		}
	}
	if *usePublicIPs == *noUsePublicIPs {
		useSet := isFlagPassed("use_public_ips")
		noUseSet := isFlagPassed("no_use_public_ips")
		// If use_public_ips was explicitly set but no_use_public_ips was not, use that value
		// We take the explicit value of no_use_public_ips if it was set but use_public_ips was not.
		if useSet && !noUseSet {
			*noUsePublicIPs = !*usePublicIPs
		} else if useSet && noUseSet {
			return nil, errors.New("exactly one of usePublicIPs and noUsePublicIPs must be true, please check that only one is true")
		}
	}

	hooks.SerializeHooksToOptions()

	experiments := jobopts.GetExperiments()
	// Ensure that we enable the same set of experiments across all SDKs
	// for runner v2.
	var fnApiSet, v2set, uwSet, portaSubmission, seSet, wsSet bool
	for _, e := range experiments {
		if strings.Contains(e, "beam_fn_api") {
			fnApiSet = true
		}
		if strings.Contains(e, "use_runner_v2") {
			v2set = true
		}
		if strings.Contains(e, "use_unified_worker") {
			uwSet = true
		}
		if strings.Contains(e, "use_portable_job_submission") {
			portaSubmission = true
		}
		if strings.Contains(e, "disable_runner_v2") || strings.Contains(e, "disable_runner_v2_until_2023") || strings.Contains(e, "disable_prime_runner_v2") {
			return nil, errors.New("detected one of the following experiments: disable_runner_v2 | disable_runner_v2_until_2023 | disable_prime_runner_v2. Disabling runner v2 is no longer supported as of Beam version 2.45.0+")
		}
	}
	// Enable default experiments.
	if !fnApiSet {
		experiments = append(experiments, "beam_fn_api")
	}
	if !v2set {
		experiments = append(experiments, "use_runner_v2")
	}
	if !uwSet {
		experiments = append(experiments, "use_unified_worker")
	}
	if !portaSubmission {
		experiments = append(experiments, "use_portable_job_submission")
	}

	// Ensure that streaming specific experiments are set for streaming pipelines
	// since runner v2 only supports using streaming engine.
	if streaming {
		if !seSet {
			experiments = append(experiments, "enable_streaming_engine")
		}
		if !wsSet {
			experiments = append(experiments, "enable_windmill_service")
		}
	}

	if *minCPUPlatform != "" {
		experiments = append(experiments, fmt.Sprintf("min_cpu_platform=%v", *minCPUPlatform))
	}

	var dfServiceOptions []string
	if *dataflowServiceOptions != "" {
		dfServiceOptions = strings.Split(*dataflowServiceOptions, ",")
	}

	beam.PipelineOptions.LoadOptionsFromFlags(flagFilter)
	opts := &dataflowlib.JobOptions{
		Name:                   jobopts.GetJobName(),
		Streaming:              streaming,
		Experiments:            experiments,
		DataflowServiceOptions: dfServiceOptions,
		Options:                beam.PipelineOptions.Export(),
		Project:                project,
		Region:                 region,
		Zone:                   *zone,
		KmsKey:                 *kmsKey,
		Network:                *network,
		Subnetwork:             *subnetwork,
		NoUsePublicIPs:         *noUsePublicIPs,
		NumWorkers:             *numWorkers,
		MaxNumWorkers:          *maxNumWorkers,
		WorkerHarnessThreads:   *workerHarnessThreads,
		DiskSizeGb:             *diskSizeGb,
		DiskType:               *diskType,
		Algorithm:              *autoscalingAlgorithm,
		FlexRSGoal:             *flexRSGoal,
		MachineType:            *firstNonEmpty(workerMachineType, machineType),
		Labels:                 jobLabels,
		ServiceAccountEmail:    *serviceAccountEmail,
		TempLocation:           *tempLocation,
		TemplateLocation:       *templateLocation,
		Worker:                 *jobopts.WorkerBinary,
		WorkerRegion:           *workerRegion,
		WorkerZone:             *workerZone,
		TeardownPolicy:         *teardownPolicy,
		ContainerImage:         getContainerImage(ctx),
		Update:                 *update,
		TransformNameMapping:   updateTransformMapping,
	}
	if opts.TempLocation == "" {
		opts.TempLocation = gcsx.Join(*stagingLocation, "tmp")
	}

	return opts, nil
}