func()

in sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go [82:314]


func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *jobpb.PrepareJobResponse, err error) {
	s.mu.Lock()
	defer s.mu.Unlock()

	// Since jobs execute in the background, they should not be tied to a request's context.
	rootCtx, cancelFn := context.WithCancelCause(context.Background())
	// Wrap in a Once so it will only be invoked a single time for the job.
	terminalOnceWrap := sync.OnceFunc(s.jobTerminated)
	job := &Job{
		key:        s.nextId(),
		Pipeline:   req.GetPipeline(),
		jobName:    req.GetJobName(),
		options:    req.GetPipelineOptions(),
		streamCond: sync.NewCond(&sync.Mutex{}),
		RootCtx:    rootCtx,
		CancelFn: func(err error) {
			cancelFn(err)
			terminalOnceWrap()
		},
		Logger:           s.logger, // TODO substitute with a configured logger.
		artifactEndpoint: s.Endpoint(),
		mw:               s.mw,
	}
	// Stop the idle timer when a new job appears.
	if idleTimer := s.idleTimer.Load(); idleTimer != nil {
		idleTimer.Stop()
	}

	// Queue initial state of the job.
	job.state.Store(jobpb.JobState_STOPPED)
	s.jobs[job.key] = job

	if err := isSupported(job.Pipeline.GetRequirements()); err != nil {
		job.Failed(err)
		slog.Error("unable to run job", slog.String("error", err.Error()), slog.String("jobname", req.GetJobName()))
		return nil, err
	}
	var errs []error
	check := func(feature string, got any, wants ...any) {
		for _, want := range wants {
			if got == want {
				return
			}
		}

		err := unimplementedError{
			feature: feature,
			value:   got,
		}
		errs = append(errs, err)
	}

	// Inspect Transforms for unsupported features.
	ts := job.Pipeline.GetComponents().GetTransforms()
	var testStreamIds []string
	for tid, t := range ts {
		urn := t.GetSpec().GetUrn()
		switch urn {
		case urns.TransformImpulse,
			urns.TransformGBK,
			urns.TransformFlatten,
			urns.TransformCombinePerKey,
			urns.TransformCombineGlobally,      // Used by Java SDK
			urns.TransformCombineGroupedValues, // Used by Java SDK
			urns.TransformMerge,                // Used directly by Python SDK if "pre-optimized"
			urns.TransformPreCombine,           // Used directly by Python SDK if "pre-optimized"
			urns.TransformExtract,              // Used directly by Python SDK if "pre-optimized"
			urns.TransformAssignWindows:
		// Very few expected transforms types for submitted pipelines.
		// Most URNs are for the runner to communicate back to the SDK for execution.
		case urns.TransformReshuffle, urns.TransformRedistributeArbitrarily, urns.TransformRedistributeByKey:
			// Reshuffles and Redistributes are permitted and have special handling during optimization.

		case urns.TransformParDo:
			var pardo pipepb.ParDoPayload
			if err := proto.Unmarshal(t.GetSpec().GetPayload(), &pardo); err != nil {
				wrapped := fmt.Errorf("unable to unmarshal ParDoPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
				job.Failed(wrapped)
				return nil, wrapped
			}

			isStateful := false

			// Validate all the state features
			for _, spec := range pardo.GetStateSpecs() {
				isStateful = true
				check("StateSpec.Protocol.Urn", spec.GetProtocol().GetUrn(),
					urns.UserStateBag, urns.UserStateMultiMap, urns.UserStateOrderedList)
			}
			// Validate all the timer features
			for _, spec := range pardo.GetTimerFamilySpecs() {
				isStateful = true
				check("TimerFamilySpecs.TimeDomain.Urn", spec.GetTimeDomain(), pipepb.TimeDomain_EVENT_TIME, pipepb.TimeDomain_PROCESSING_TIME)
			}

			// Check for a stateful SDF and direct user to https://github.com/apache/beam/issues/32139
			if pardo.GetRestrictionCoderId() != "" && isStateful {
				check("Splittable+Stateful DoFn", "See https://github.com/apache/beam/issues/32139 for information.")
			}

			// Validate whether the triggers on side inputs for are required for
			// expedient data processing..
			//
			// Currently triggered side inputs are not supported by prism, and will
			// not have early or late firings.
			//
			// This feature is required when the Side Input PCollection is unbounded
			// and is in the a Global Window. This can cause the pipeline to fully
			// stall while the input is being computed, and may never terminate.
			//
			// Other situations may not have desired results, but are valid behaviors
			// within the model.
			//
			// See https://github.com/apache/beam/issues/31438 for implementation tracking.
			for sideID := range pardo.GetSideInputs() {
				pcolID := t.GetInputs()[sideID]
				pcol := job.Pipeline.GetComponents().GetPcollections()[pcolID]
				wsID := pcol.GetWindowingStrategyId()
				ws := job.Pipeline.GetComponents().GetWindowingStrategies()[wsID]

				if pcol.GetIsBounded() == pipepb.IsBounded_BOUNDED ||
					ws.GetWindowFn().GetUrn() != urns.WindowFnGlobal {
					continue
				}

				// Within the Unbounded GlobalWindow space is a nich of expressed
				// user intent that they *do* want to wait for the end of the global
				// window for output. We should permit these pipelines, as there
				// is utility for this in testing situations anyway.
				switch trig := ws.GetTrigger().GetTrigger().(type) {
				case *pipepb.Trigger_Never_, *pipepb.Trigger_Default_:
					// Only one firing, at the end of the global window, and is
					// compatible with Prism's current execution.
					continue
				case *pipepb.Trigger_AfterEndOfWindow_:
					if early := trig.AfterEndOfWindow.GetEarlyFirings(); early == nil || early.GetNever() != nil {
						if ws.GetAllowedLateness() == 0 {
							// Late configuration doesn't matter, and there are no early firings.
							continue
						}
						if late := trig.AfterEndOfWindow.GetLateFirings(); late == nil || late.GetNever() != nil {
							// Lateness allowed, but but no firings anyway.
							continue
						}
					}
				}

				check("Unbounded GlobalWindow Triggered SideInput, are not currently supported by Prism. Sideinputs are only ready at end of window+allowed lateness. See https://github.com/apache/beam/issues/31438 for information.", prototext.Format(ws))
			}

		case urns.TransformTestStream:
			var testStream pipepb.TestStreamPayload
			if err := proto.Unmarshal(t.GetSpec().GetPayload(), &testStream); err != nil {
				wrapped := fmt.Errorf("unable to unmarshal TestStreamPayload for %v - %q: %w", tid, t.GetUniqueName(), err)
				job.Failed(wrapped)
				return nil, wrapped
			}

			t.EnvironmentId = "" // Unset the environment, to ensure it's handled prism side.
			testStreamIds = append(testStreamIds, tid)

		default:
			// Composites can often have some unknown urn, permit those.
			// Eg. The Python SDK has urns "beam:transform:generic_composite:v1", "beam:transform:pickled_python:v1",
			// as well as the deprecated "beam:transform:read:v1", but they are composites.
			// We don't do anything special with these high level composites, but
			// we may be dealing with their internal subgraph already, so we ignore this transform.
			if len(t.GetSubtransforms()) > 0 {
				continue
			}
			// This may be an "empty" composite without subtransforms or a payload.
			// These just do PCollection manipulation which is already represented in the Pipeline graph.
			// Simply ignore the composite at this stage, since the runner does nothing with them.
			if len(t.GetSpec().GetPayload()) == 0 {
				continue
			}
			// Another type of "empty" composite transforms without subtransforms but with
			// a non-empty payload and identical input/output pcollections
			if len(t.GetInputs()) == 1 && len(t.GetOutputs()) == 1 {
				inputID := getOnlyValue(t.GetInputs())
				outputID := getOnlyValue(t.GetOutputs())
				if inputID == outputID {
					slog.Warn("empty transform, with payload and identical input and output pcollection", "urn", urn, "name", t.GetUniqueName(), "pcoll", inputID)
					continue
				}
			}
			// Otherwise fail.
			slog.Warn("unknown transform, with payload", "urn", urn, "name", t.GetUniqueName(), "payload", t.GetSpec().GetPayload())
			check("PTransform.Spec.Urn", urn+" "+t.GetUniqueName(), "<doesn't exist>")
		}
	}
	// At most one test stream per pipeline.
	if len(testStreamIds) > 1 {
		check("Multiple TestStream Transforms in Pipeline", testStreamIds)
	}

	// Inspect Windowing strategies for unsupported features.
	for _, ws := range job.Pipeline.GetComponents().GetWindowingStrategies() {
		// Both Closing behaviors are identical without additional trigger firings.
		check("WindowingStrategy.ClosingBehaviour", ws.GetClosingBehavior(), pipepb.ClosingBehavior_EMIT_IF_NONEMPTY, pipepb.ClosingBehavior_EMIT_ALWAYS)
		check("WindowingStrategy.AccumulationMode", ws.GetAccumulationMode(), pipepb.AccumulationMode_DISCARDING, pipepb.AccumulationMode_ACCUMULATING)
		if ws.GetWindowFn().GetUrn() != urns.WindowFnSession {
			check("WindowingStrategy.MergeStatus", ws.GetMergeStatus(), pipepb.MergeStatus_NON_MERGING)
		} else if hasStatefulTriggers(ws.GetTrigger()) {
			// Technically for any merging windows, but per the above, we only support session windows presently.
			check("WindowingStrategy: Using stateful triggers with merging windows isn't currently supported in prism. See https://github.com/apache/beam/issues/31438 for information.", prototext.Format(ws))
		}
		check("WindowingStrategy.OnTimeBehavior", ws.GetOnTimeBehavior(), pipepb.OnTimeBehavior_FIRE_IF_NONEMPTY, pipepb.OnTimeBehavior_FIRE_ALWAYS)

		// Allow earliest and latest in pane to unblock running python tasks.
		// Tests actually using the set behavior will fail.
		check("WindowingStrategy.OutputTime", ws.GetOutputTime(), pipepb.OutputTime_END_OF_WINDOW,
			pipepb.OutputTime_EARLIEST_IN_PANE, pipepb.OutputTime_LATEST_IN_PANE)

		if hasUnsupportedTriggers(ws.GetTrigger()) {
			check("WindowingStrategy.Trigger", ws.GetTrigger().String())
		}
	}
	if len(errs) > 0 {
		jErr := &joinError{errs: errs}
		slog.Error("unable to run job", slog.String("cause", "unimplemented features"), slog.String("jobname", req.GetJobName()), slog.String("errors", jErr.Error()))
		err := fmt.Errorf("found %v uses of features unimplemented in prism in job %v:\n%v", len(errs), req.GetJobName(), jErr)
		job.Failed(err)
		return nil, err
	}
	return &jobpb.PrepareJobResponse{
		PreparationId:       job.key,
		StagingSessionToken: job.key,
		ArtifactStagingEndpoint: &pipepb.ApiServiceDescriptor{
			Url: s.Endpoint(),
		},
	}, nil
}