func()

in sdks/go/pkg/beam/runners/prism/internal/stage.go [111:344]


func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) {
	if s.baseProgTick.Load() == nil {
		s.baseProgTick.Store(minimumProgTick)
	}
	defer func() {
		// Convert execution panics to errors to fail the bundle.
		if e := recover(); e != nil {
			err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v,stackTrace:\n%s", e, s, debug.Stack())
		}
	}()
	slog.Debug("Execute: starting bundle", "bundle", rb)

	var b *worker.B
	initialState := em.StateForBundle(rb)
	var dataReady <-chan struct{}
	switch s.envID {
	case "": // Runner Transforms
		if len(s.transforms) != 1 {
			panic(fmt.Sprintf("unexpected number of runner transforms, want 1: %+v", s))
		}
		tid := s.transforms[0]
		// Runner transforms are processed immeadiately.
		b = s.exe.ExecuteTransform(s.ID, tid, comps.GetTransforms()[tid], comps, rb.Watermark, em.InputForBundle(rb, s.inputInfo))
		b.InstID = rb.BundleID
		slog.Debug("Execute: runner transform", "bundle", rb, slog.String("tid", tid))

		// Do some accounting for the fake bundle.
		b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
		close(b.Resp) // To avoid blocking downstream, since we don't send on this.
		closed := make(chan struct{})
		close(closed)
		dataReady = closed
	case wk.Env:
		input, estimatedElements := em.DataAndTimerInputForBundle(rb, s.inputInfo)
		b = &worker.B{
			PBDID:  s.ID,
			InstID: rb.BundleID,

			InputTransformID: s.inputTransformID,

			Input:                  input,
			EstimatedInputElements: estimatedElements,

			OutputData: initialState,
			HasTimers:  s.hasTimers,

			SinkToPCollection: s.SinkToPCollection,
			OutputCount:       len(s.outputs),
		}
		b.Init()

		s.prepareSides(b, rb.Watermark)

		slog.Debug("Execute: processing", "bundle", rb)
		defer b.Cleanup(wk)
		dataReady = b.ProcessOn(ctx, wk)
	default:
		err := fmt.Errorf("unknown environment[%v]", s.envID)
		slog.Error("Execute", "error", err)
		panic(err)
	}

	// Progress + split loop.
	previousIndex := int64(-2)
	previousTotalCount := int64(-2) // Total count of all pcollection elements.

	unsplit := true
	baseTick := s.baseProgTick.Load().(time.Duration)
	ticked := false
	progTick := time.NewTicker(baseTick)
	defer progTick.Stop()
	var dataFinished, bundleFinished bool
	// If we have no data outputs, we still need to have progress & splits
	// while waiting for bundle completion.
	if b.OutputCount == 0 {
		dataFinished = true
	}
	var resp *fnpb.ProcessBundleResponse
progress:
	for {
		select {
		case <-ctx.Done():
			return context.Cause(ctx)
		case resp = <-b.Resp:
			bundleFinished = true
			if b.BundleErr != nil {
				return b.BundleErr
			}
			if dataFinished && bundleFinished {
				break progress // exit progress loop on close.
			}
		case <-dataReady:
			dataFinished = true
			if dataFinished && bundleFinished {
				break progress // exit progress loop on close.
			}
		case <-progTick.C:
			ticked = true
			resp, err := b.Progress(ctx, wk)
			if err != nil {
				slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
				break progress
			}
			index, unknownIDs := j.ContributeTentativeMetrics(resp)
			if len(unknownIDs) > 0 {
				md := wk.MonitoringMetadata(ctx, unknownIDs)
				j.AddMetricShortIDs(md)
			}
			slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex)

			// Check if there has been any measurable progress by the input, or all output pcollections since last report.
			slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"]
			if slow && unsplit {
				slog.Debug("splitting report", "bundle", rb, "index", index)
				sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
				if err != nil {
					slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
					break progress
				}
				if sr.GetChannelSplits() == nil {
					slog.Debug("SDK returned no splits", "bundle", rb)
					unsplit = false
					continue progress
				}

				// TODO sort out rescheduling primary Roots on bundle failure.
				var residuals []engine.Residual
				for _, rr := range sr.GetResidualRoots() {
					ba := rr.GetApplication()
					residuals = append(residuals, engine.Residual{Element: ba.GetElement()})
					if len(ba.GetElement()) == 0 {
						slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
						panic("sdk returned empty residual application")
					}
					// TODO what happens to output watermarks on splits?
				}
				if len(sr.GetChannelSplits()) != 1 {
					slog.Warn("received non-single channel split", "bundle", rb)
				}
				cs := sr.GetChannelSplits()[0]
				fr := cs.GetFirstResidualElement()
				// The first residual can be after the end of data, so filter out those cases.
				if b.EstimatedInputElements >= int(fr) {
					b.EstimatedInputElements = int(fr) // Update the estimate for the next split.
					// Split Residuals are returned right away for rescheduling.
					em.ReturnResiduals(rb, int(fr), s.inputInfo, engine.Residuals{
						Data: residuals,
					})
				}

				// Any split means we're processing slower than desired, but splitting should increase
				// throughput. Back off for this and other bundles for this stage
				baseTime := s.baseProgTick.Load().(time.Duration)
				newTime := clampTick(baseTime * 4)
				if s.baseProgTick.CompareAndSwap(baseTime, newTime) {
					progTick.Reset(newTime)
				} else {
					progTick.Reset(s.baseProgTick.Load().(time.Duration))
				}
			} else {
				previousIndex = index["index"]
				previousTotalCount = index["totalCount"]
			}
		}
	}
	// If we never received any progress ticks, we may have too long a time, shrink it for new runs instead.
	if !ticked {
		newTick := clampTick(baseTick - minimumProgTick)
		// If it's otherwise unchanged, apply the new duration.
		s.baseProgTick.CompareAndSwap(baseTick, newTick)
	}
	// Tentative Data is ready, commit it to the main datastore.
	slog.Debug("Execute: committing data", "bundle", rb, slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", maps.Keys(s.OutputsToCoders)))

	// Tally metrics immeadiately so they're available before
	// pipeline termination.
	unknownIDs := j.ContributeFinalMetrics(resp)
	if len(unknownIDs) > 0 {
		md := wk.MonitoringMetadata(ctx, unknownIDs)
		j.AddMetricShortIDs(md)
	}

	// ProcessContinuation residuals are rescheduled after the specified delay.
	residuals := engine.Residuals{
		MinOutputWatermarks: map[string]mtime.Time{},
	}
	for _, rr := range resp.GetResidualRoots() {
		ba := rr.GetApplication()
		if len(ba.GetElement()) == 0 {
			slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
			panic("sdk returned empty residual application")
		}
		if residuals.TransformID == "" {
			residuals.TransformID = ba.GetTransformId()
		}
		if residuals.InputID == "" {
			residuals.InputID = ba.GetInputId()
		}
		if residuals.TransformID != ba.GetTransformId() {
			panic("sdk returned inconsistent residual application transform : got = " + ba.GetTransformId() + " want = " + residuals.TransformID)
		}
		if residuals.InputID != ba.GetInputId() {
			panic("sdk returned inconsistent residual application input : got = " + ba.GetInputId() + " want = " + residuals.InputID)
		}

		for col, wm := range ba.GetOutputWatermarks() {
			cur, ok := residuals.MinOutputWatermarks[col]
			if !ok {
				cur = mtime.MaxTimestamp
			}
			residuals.MinOutputWatermarks[col] = mtime.Min(mtime.FromTime(wm.AsTime()), cur)
		}

		residuals.Data = append(residuals.Data, engine.Residual{
			Element: ba.GetElement(),
			Delay:   rr.GetRequestedTimeDelay().AsDuration(),
			Bounded: ba.GetIsBounded() == pipepb.IsBounded_BOUNDED,
		})
	}
	if l := len(residuals.Data); l == 0 {
		slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
	}
	em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals)
	if s.finalize {
		_, err := b.Finalize(ctx, wk)
		if err != nil {
			slog.Error("SDK Error from bundle finalization", "bundle", rb, "error", err.Error())
			panic(err)
		}
		slog.Info("finalized bundle", "bundle", rb)
	}
	b.OutputData = engine.TentativeData{} // Clear the data.
	return nil
}