in sdks/go/pkg/beam/runners/prism/internal/stage.go [111:344]
func (s *stage) Execute(ctx context.Context, j *jobservices.Job, wk *worker.W, comps *pipepb.Components, em *engine.ElementManager, rb engine.RunBundle) (err error) {
if s.baseProgTick.Load() == nil {
s.baseProgTick.Store(minimumProgTick)
}
defer func() {
// Convert execution panics to errors to fail the bundle.
if e := recover(); e != nil {
err = fmt.Errorf("panic in stage.Execute bundle processing goroutine: %v, stage: %+v,stackTrace:\n%s", e, s, debug.Stack())
}
}()
slog.Debug("Execute: starting bundle", "bundle", rb)
var b *worker.B
initialState := em.StateForBundle(rb)
var dataReady <-chan struct{}
switch s.envID {
case "": // Runner Transforms
if len(s.transforms) != 1 {
panic(fmt.Sprintf("unexpected number of runner transforms, want 1: %+v", s))
}
tid := s.transforms[0]
// Runner transforms are processed immeadiately.
b = s.exe.ExecuteTransform(s.ID, tid, comps.GetTransforms()[tid], comps, rb.Watermark, em.InputForBundle(rb, s.inputInfo))
b.InstID = rb.BundleID
slog.Debug("Execute: runner transform", "bundle", rb, slog.String("tid", tid))
// Do some accounting for the fake bundle.
b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
close(b.Resp) // To avoid blocking downstream, since we don't send on this.
closed := make(chan struct{})
close(closed)
dataReady = closed
case wk.Env:
input, estimatedElements := em.DataAndTimerInputForBundle(rb, s.inputInfo)
b = &worker.B{
PBDID: s.ID,
InstID: rb.BundleID,
InputTransformID: s.inputTransformID,
Input: input,
EstimatedInputElements: estimatedElements,
OutputData: initialState,
HasTimers: s.hasTimers,
SinkToPCollection: s.SinkToPCollection,
OutputCount: len(s.outputs),
}
b.Init()
s.prepareSides(b, rb.Watermark)
slog.Debug("Execute: processing", "bundle", rb)
defer b.Cleanup(wk)
dataReady = b.ProcessOn(ctx, wk)
default:
err := fmt.Errorf("unknown environment[%v]", s.envID)
slog.Error("Execute", "error", err)
panic(err)
}
// Progress + split loop.
previousIndex := int64(-2)
previousTotalCount := int64(-2) // Total count of all pcollection elements.
unsplit := true
baseTick := s.baseProgTick.Load().(time.Duration)
ticked := false
progTick := time.NewTicker(baseTick)
defer progTick.Stop()
var dataFinished, bundleFinished bool
// If we have no data outputs, we still need to have progress & splits
// while waiting for bundle completion.
if b.OutputCount == 0 {
dataFinished = true
}
var resp *fnpb.ProcessBundleResponse
progress:
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
case resp = <-b.Resp:
bundleFinished = true
if b.BundleErr != nil {
return b.BundleErr
}
if dataFinished && bundleFinished {
break progress // exit progress loop on close.
}
case <-dataReady:
dataFinished = true
if dataFinished && bundleFinished {
break progress // exit progress loop on close.
}
case <-progTick.C:
ticked = true
resp, err := b.Progress(ctx, wk)
if err != nil {
slog.Debug("SDK Error from progress, aborting progress", "bundle", rb, "error", err.Error())
break progress
}
index, unknownIDs := j.ContributeTentativeMetrics(resp)
if len(unknownIDs) > 0 {
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
slog.Debug("progress report", "bundle", rb, "index", index, "prevIndex", previousIndex)
// Check if there has been any measurable progress by the input, or all output pcollections since last report.
slow := previousIndex == index["index"] && previousTotalCount == index["totalCount"]
if slow && unsplit {
slog.Debug("splitting report", "bundle", rb, "index", index)
sr, err := b.Split(ctx, wk, 0.5 /* fraction of remainder */, nil /* allowed splits */)
if err != nil {
slog.Warn("SDK Error from split, aborting splits", "bundle", rb, "error", err.Error())
break progress
}
if sr.GetChannelSplits() == nil {
slog.Debug("SDK returned no splits", "bundle", rb)
unsplit = false
continue progress
}
// TODO sort out rescheduling primary Roots on bundle failure.
var residuals []engine.Residual
for _, rr := range sr.GetResidualRoots() {
ba := rr.GetApplication()
residuals = append(residuals, engine.Residual{Element: ba.GetElement()})
if len(ba.GetElement()) == 0 {
slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
panic("sdk returned empty residual application")
}
// TODO what happens to output watermarks on splits?
}
if len(sr.GetChannelSplits()) != 1 {
slog.Warn("received non-single channel split", "bundle", rb)
}
cs := sr.GetChannelSplits()[0]
fr := cs.GetFirstResidualElement()
// The first residual can be after the end of data, so filter out those cases.
if b.EstimatedInputElements >= int(fr) {
b.EstimatedInputElements = int(fr) // Update the estimate for the next split.
// Split Residuals are returned right away for rescheduling.
em.ReturnResiduals(rb, int(fr), s.inputInfo, engine.Residuals{
Data: residuals,
})
}
// Any split means we're processing slower than desired, but splitting should increase
// throughput. Back off for this and other bundles for this stage
baseTime := s.baseProgTick.Load().(time.Duration)
newTime := clampTick(baseTime * 4)
if s.baseProgTick.CompareAndSwap(baseTime, newTime) {
progTick.Reset(newTime)
} else {
progTick.Reset(s.baseProgTick.Load().(time.Duration))
}
} else {
previousIndex = index["index"]
previousTotalCount = index["totalCount"]
}
}
}
// If we never received any progress ticks, we may have too long a time, shrink it for new runs instead.
if !ticked {
newTick := clampTick(baseTick - minimumProgTick)
// If it's otherwise unchanged, apply the new duration.
s.baseProgTick.CompareAndSwap(baseTick, newTick)
}
// Tentative Data is ready, commit it to the main datastore.
slog.Debug("Execute: committing data", "bundle", rb, slog.Any("outputsWithData", maps.Keys(b.OutputData.Raw)), slog.Any("outputs", maps.Keys(s.OutputsToCoders)))
// Tally metrics immeadiately so they're available before
// pipeline termination.
unknownIDs := j.ContributeFinalMetrics(resp)
if len(unknownIDs) > 0 {
md := wk.MonitoringMetadata(ctx, unknownIDs)
j.AddMetricShortIDs(md)
}
// ProcessContinuation residuals are rescheduled after the specified delay.
residuals := engine.Residuals{
MinOutputWatermarks: map[string]mtime.Time{},
}
for _, rr := range resp.GetResidualRoots() {
ba := rr.GetApplication()
if len(ba.GetElement()) == 0 {
slog.LogAttrs(context.TODO(), slog.LevelError, "returned empty residual application", slog.Any("bundle", rb))
panic("sdk returned empty residual application")
}
if residuals.TransformID == "" {
residuals.TransformID = ba.GetTransformId()
}
if residuals.InputID == "" {
residuals.InputID = ba.GetInputId()
}
if residuals.TransformID != ba.GetTransformId() {
panic("sdk returned inconsistent residual application transform : got = " + ba.GetTransformId() + " want = " + residuals.TransformID)
}
if residuals.InputID != ba.GetInputId() {
panic("sdk returned inconsistent residual application input : got = " + ba.GetInputId() + " want = " + residuals.InputID)
}
for col, wm := range ba.GetOutputWatermarks() {
cur, ok := residuals.MinOutputWatermarks[col]
if !ok {
cur = mtime.MaxTimestamp
}
residuals.MinOutputWatermarks[col] = mtime.Min(mtime.FromTime(wm.AsTime()), cur)
}
residuals.Data = append(residuals.Data, engine.Residual{
Element: ba.GetElement(),
Delay: rr.GetRequestedTimeDelay().AsDuration(),
Bounded: ba.GetIsBounded() == pipepb.IsBounded_BOUNDED,
})
}
if l := len(residuals.Data); l == 0 {
slog.Debug("returned empty residual application", "bundle", rb, slog.Int("numResiduals", l), slog.String("pcollection", s.primaryInput))
}
em.PersistBundle(rb, s.OutputsToCoders, b.OutputData, s.inputInfo, residuals)
if s.finalize {
_, err := b.Finalize(ctx, wk)
if err != nil {
slog.Error("SDK Error from bundle finalization", "bundle", rb, "error", err.Error())
panic(err)
}
slog.Info("finalized bundle", "bundle", rb)
}
b.OutputData = engine.TentativeData{} // Clear the data.
return nil
}