sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go (1,524 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package engine handles the operational components of a runner, to
// track elements, watermarks, timers, triggers etc
package engine
import (
"bytes"
"container/heap"
"context"
"fmt"
"io"
"log/slog"
"runtime/debug"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"golang.org/x/exp/maps"
)
type element struct {
window typex.Window
timestamp mtime.Time
holdTimestamp mtime.Time // only used for Timers
pane typex.PaneInfo
transform, family, tag string // only used for Timers.
// Used to ensure ordering within a key when sorting the heap,
// which isn't using a stable sort.
// Since ordering is weak across multiple bundles, it needs only
// be consistent between exiting a stage and entering a stateful stage.
// No synchronization is required in specifying this,
// since keyed elements are only processed by a single bundle at a time,
// if stateful stages are concerned.
sequence int
elmBytes []byte // When nil, indicates this is a timer.
keyBytes []byte
}
func (e *element) IsTimer() bool {
return e.elmBytes == nil
}
func (e *element) IsData() bool {
return !e.IsTimer()
}
func (e element) String() string {
if e.IsTimer() {
return fmt.Sprintf("{Timer - Window %v, EventTime %v, Hold %v, %q %q %q %q}", e.window, e.timestamp, e.holdTimestamp, e.transform, e.family, e.tag, e.keyBytes)
}
return fmt.Sprintf("{Data - Window %v, EventTime %v, Pane: %v Element %v - %q}", e.window, e.timestamp, e.pane, e.elmBytes, string(e.elmBytes))
}
type elements struct {
es []element
minTimestamp mtime.Time
}
type PColInfo struct {
GlobalID string
WindowCoder WinCoderType
WDec exec.WindowDecoder
WEnc exec.WindowEncoder
EDec func(io.Reader) []byte
KeyDec func(io.Reader) []byte
}
// WinCoderType indicates what kind of coder
// the window is using. There are only 3
// valid single window encodings.
//
// - Global (for Global windows)
// - Interval (for fixed, sliding, and session windows)
// - Custom (for custom user windows)
//
// TODO: Handle custom variants with built in "known" coders, and length prefixed ones as separate cases.
// As a rule we don't care about the bytes, but we do need to be able to get to the next element.
type WinCoderType int
const (
// WinGlobal indicates the window is empty coded, with 0 bytes.
WinGlobal WinCoderType = iota
// WinInterval indicates the window is interval coded with the end event time timestamp followed by the duration in milliseconds
WinInterval
// WinCustom indicates the window customm coded with end event time timestamp followed by a custom coder.
WinCustom
)
// ToData recodes the elements with their approprate windowed value header.
func (es elements) ToData(info PColInfo) [][]byte {
var ret [][]byte
for _, e := range es.es {
var buf bytes.Buffer
exec.EncodeWindowedValueHeader(info.WEnc, []typex.Window{e.window}, e.timestamp, e.pane, &buf)
buf.Write(e.elmBytes)
ret = append(ret, buf.Bytes())
}
return ret
}
// elementHeap orders elements based on their timestamps
// so we can always find the minimum timestamp of pending elements.
type elementHeap []element
func (h elementHeap) Len() int { return len(h) }
func (h elementHeap) Less(i, j int) bool {
// If the timestamps are the same, data comes before timers.
if h[i].timestamp == h[j].timestamp {
if h[i].IsTimer() && h[j].IsData() {
return false // j before i
} else if h[i].IsData() && h[j].IsTimer() {
return true // i before j.
}
// They're the same kind, so compare by the sequence value.
return h[i].sequence < h[j].sequence
}
// Otherwise compare by timestamp.
return h[i].timestamp < h[j].timestamp
}
func (h elementHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *elementHeap) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(element))
}
func (h *elementHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
type Config struct {
// MaxBundleSize caps the number of elements permitted in a bundle.
// 0 or less means this is ignored.
MaxBundleSize int
}
// ElementManager handles elements, watermarks, and related errata to determine
// if a stage is able to be executed. It is the core execution engine of Prism.
//
// Essentially, it needs to track the current watermarks for each PCollection
// and transform/stage. But it's tricky, since the watermarks for the
// PCollections are always relative to transforms/stages.
//
// Key parts:
//
// - The parallel input's PCollection's watermark is relative to committed consumed
// elements. That is, the input elements consumed by the transform after a successful
// bundle, can advance the watermark, based on the minimum of their elements.
// - An output PCollection's watermark is relative to its producing transform,
// which relates to *all of it's outputs*.
//
// This means that a PCollection's watermark is the minimum of all it's consuming transforms.
//
// So, the watermark manager needs to track:
// Pending Elements for each stage, along with their windows and timestamps.
// Each transform's view of the watermarks for the PCollections.
//
// Watermarks are advanced based on consumed input, except if the stage produces residuals.
type ElementManager struct {
config Config
nextBundID func() string // Generates unique bundleIDs. Set in the Bundles method.
stages map[string]*stageState // The state for each stage.
consumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as primary input.
sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input.
pcolParents map[string]string // Map from pcollectionID to stageIDs that produce the pcollection.
refreshCond sync.Cond // refreshCond protects the following fields with it's lock, and unblocks bundle scheduling.
inprogressBundles set[string] // Active bundleIDs
changedStages set[string] // Stages that have changed and need their watermark refreshed.
injectedBundles []RunBundle // Represents ready to execute bundles prepared outside of the main loop, such as for onWindowExpiration, or for Triggers.
livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
processTimeEvents *stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time.
testStreamHandler *testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
}
func (em *ElementManager) addPending(v int) {
em.livePending.Add(int64(v))
em.pendingElements.Add(v)
}
// LinkID represents a fully qualified input or output.
type LinkID struct {
Transform, Local, Global string
}
func NewElementManager(config Config) *ElementManager {
return &ElementManager{
config: config,
stages: map[string]*stageState{},
consumers: map[string][]string{},
sideConsumers: map[string][]LinkID{},
pcolParents: map[string]string{},
changedStages: set[string]{},
inprogressBundles: set[string]{},
refreshCond: sync.Cond{L: &sync.Mutex{}},
processTimeEvents: newStageRefreshQueue(),
}
}
// AddStage adds a stage to this element manager, connecting it's PCollections and
// nodes to the watermark propagation graph.
func (em *ElementManager) AddStage(ID string, inputIDs, outputIDs []string, sides []LinkID) {
slog.Debug("AddStage", slog.String("ID", ID), slog.Any("inputs", inputIDs), slog.Any("sides", sides), slog.Any("outputs", outputIDs))
ss := makeStageState(ID, inputIDs, outputIDs, sides)
em.stages[ss.ID] = ss
for _, outputID := range ss.outputIDs {
em.pcolParents[outputID] = ss.ID
}
for _, input := range inputIDs {
em.consumers[input] = append(em.consumers[input], ss.ID)
}
// In very rare cases, we can have a stage without any inputs, such as a flatten.
// In that case, there's nothing that will start the watermark refresh cycle,
// so we must do it here.
if len(inputIDs) == 0 {
refreshes := singleSet(ss.ID)
em.markStagesAsChanged(refreshes)
}
for _, side := range ss.sides {
// Note that we use the StageID as the global ID in the value since we need
// to be able to look up the consuming stage, from the global PCollectionID.
em.sideConsumers[side.Global] = append(em.sideConsumers[side.Global], LinkID{Global: ss.ID, Local: side.Local, Transform: side.Transform})
}
}
// StageAggregates marks the given stage as an aggregation, which
// means elements will only be processed based on windowing strategies.
func (em *ElementManager) StageAggregates(ID string, strat WinStrat) {
ss := em.stages[ID]
ss.kind = &aggregateStageKind{}
ss.strat = strat
ss.inprogressKeys = set[string]{}
}
// StageStateful marks the given stage as stateful, which means elements are
// processed by key.
func (em *ElementManager) StageStateful(ID string, stateTypeLen map[LinkID]func([]byte) int) {
ss := em.stages[ID]
ss.kind = &statefulStageKind{}
ss.stateTypeLen = stateTypeLen
ss.inprogressKeys = set[string]{}
}
// StageOnWindowExpiration marks the given stage as stateful, which means elements are
// processed by key.
func (em *ElementManager) StageOnWindowExpiration(stageID string, timer StaticTimerID) {
ss := em.stages[stageID]
ss.onWindowExpiration = timer
ss.keysToExpireByWindow = map[typex.Window]set[string]{}
ss.inProgressExpiredWindows = map[typex.Window]int{}
ss.expiryWindowsByBundles = map[string]typex.Window{}
}
// StageProcessingTimeTimers indicates which timers are processingTime domain timers.
func (em *ElementManager) StageProcessingTimeTimers(ID string, ptTimers map[string]bool) {
em.stages[ID].processingTimeTimersFamilies = ptTimers
}
// AddTestStream provides a builder interface for the execution layer to build the test stream from
// the protos.
func (em *ElementManager) AddTestStream(id string, tagToPCol map[string]string) TestStreamBuilder {
impl := &testStreamImpl{em: em}
impl.initHandler(id)
impl.TagsToPCollections(tagToPCol)
return impl
}
// Impulse marks and initializes the given stage as an impulse which
// is a root transform that starts processing.
func (em *ElementManager) Impulse(stageID string) {
stage := em.stages[stageID]
newPending := []element{{
window: window.GlobalWindow{},
timestamp: mtime.MinTimestamp,
pane: typex.NoFiringPane(),
elmBytes: []byte{0}, // Represents an encoded 0 length byte slice.
}}
consumers := em.consumers[stage.outputIDs[0]]
slog.Debug("Impulse", slog.String("stageID", stageID), slog.Any("outputs", stage.outputIDs), slog.Any("consumers", consumers))
for _, sID := range consumers {
consumer := em.stages[sID]
count := consumer.AddPending(em, newPending)
em.addPending(count)
}
refreshes := stage.updateWatermarks(em)
em.markStagesAsChanged(refreshes)
}
type RunBundle struct {
StageID string
BundleID string
Watermark mtime.Time
}
func (rb RunBundle) LogValue() slog.Value {
return slog.GroupValue(
slog.String("ID", rb.BundleID),
slog.String("stage", rb.StageID),
slog.Time("watermark", rb.Watermark.ToTime()))
}
// Bundles is the core execution loop. It produces a sequences of bundles able to be executed.
// The returned channel is closed when the context is canceled, or there are no pending elements
// remaining.
func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.CancelCauseFunc, nextBundID func() string) <-chan RunBundle {
// Make it easier for injected bundles to get unique IDs.
em.nextBundID = nextBundID
runStageCh := make(chan RunBundle)
ctx, cancelFn := context.WithCancelCause(ctx)
go func() {
em.pendingElements.Wait()
slog.Debug("no more pending elements: terminating pipeline")
cancelFn(fmt.Errorf("elementManager out of elements, cleaning up"))
// Ensure the watermark evaluation goroutine exits.
em.refreshCond.Broadcast()
}()
// Watermark evaluation goroutine.
go func() {
defer func() {
// In case of panics in bundle generation, fail and cancel the job.
if e := recover(); e != nil {
upstreamCancelFn(fmt.Errorf("panic in ElementManager.Bundles watermark evaluation goroutine: %v\n%v", e, string(debug.Stack())))
}
}()
defer close(runStageCh)
for {
em.refreshCond.L.Lock()
// Check if processing time has advanced before the wait loop.
emNow := em.ProcessingTimeNow()
changedByProcessingTime := em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)
// If there are no changed stages, ready processing time events,
// or injected bundles available, we wait until there are.
for len(em.changedStages)+len(changedByProcessingTime)+len(em.injectedBundles) == 0 {
// Check to see if we must exit
select {
case <-ctx.Done():
em.refreshCond.L.Unlock()
return
default:
}
em.refreshCond.Wait() // until watermarks may have changed.
// Update if the processing time has advanced while we waited, and add refreshes here. (TODO waking on real time here for prod mode)
emNow = em.ProcessingTimeNow()
changedByProcessingTime = em.processTimeEvents.AdvanceTo(emNow)
em.changedStages.merge(changedByProcessingTime)
}
// Run any injected bundles first.
// TODO: Migrate these to the per-stage mechanism for consistency with triggers.
for len(em.injectedBundles) > 0 {
rb := em.injectedBundles[0]
em.injectedBundles = em.injectedBundles[1:]
em.refreshCond.L.Unlock()
select {
case <-ctx.Done():
return
case runStageCh <- rb:
}
em.refreshCond.L.Lock()
}
// We know there is some work we can do that may advance the watermarks,
// refresh them, and see which stages have advanced.
advanced := em.refreshWatermarks()
advanced.merge(changedByProcessingTime)
// Check each advanced stage, to see if it's able to execute based on the watermark.
for stageID := range advanced {
ss := em.stages[stageID]
watermark, ready, ptimeEventsReady, injectedReady := ss.bundleReady(em, emNow)
if injectedReady {
ss.mu.Lock()
injected := ss.bundlesToInject
ss.bundlesToInject = nil
ss.mu.Unlock()
for _, rb := range injected {
em.refreshCond.L.Unlock()
select {
case <-ctx.Done():
return
case runStageCh <- rb:
}
em.refreshCond.L.Lock()
}
}
if ready {
bundleID, ok, reschedule, pendingAdjustment := ss.startEventTimeBundle(watermark, nextBundID)
// Handle the reschedule even when there's no bundle.
if reschedule {
em.changedStages.insert(stageID)
}
if ok {
if pendingAdjustment > 0 {
em.addPending(pendingAdjustment)
}
rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark}
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
select {
case <-ctx.Done():
return
case runStageCh <- rb:
}
em.refreshCond.L.Lock()
}
}
if ptimeEventsReady {
bundleID, ok, reschedule := ss.startProcessingTimeBundle(em, emNow, nextBundID)
// Handle the reschedule even when there's no bundle.
if reschedule {
em.changedStages.insert(stageID)
}
if ok {
rb := RunBundle{StageID: stageID, BundleID: bundleID, Watermark: watermark}
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
select {
case <-ctx.Done():
return
case runStageCh <- rb:
}
em.refreshCond.L.Lock()
}
}
}
if err := em.checkForQuiescence(advanced); err != nil {
upstreamCancelFn(err)
}
}
}()
return runStageCh
}
// checkForQuiescence sees if this element manager is no longer able to do any pending work or make progress.
//
// Quiescense can happen if there are no inprogress bundles, and there are no further watermark refreshes, which
// are the only way to access new pending elements. If there are no pending elements, then the pipeline will
// terminate successfully.
//
// Otherwise, produce information for debugging why the pipeline is stuck and take appropriate action, such as
// executing off the next TestStream event.
//
// Must be called while holding em.refreshCond.L.
func (em *ElementManager) checkForQuiescence(advanced set[string]) error {
defer em.refreshCond.L.Unlock()
if len(em.inprogressBundles) > 0 {
// If there are bundles in progress, then there may be watermark refreshes when they terminate.
return nil
}
if len(em.changedStages) > 0 {
// If there are changed stages that need a watermarks refresh,
// we aren't yet stuck.
v := em.livePending.Load()
slog.Debug("Bundles: nothing in progress after advance",
slog.Any("advanced", advanced),
slog.Int("changeCount", len(em.changedStages)),
slog.Int64("pendingElementCount", v),
)
return nil
}
if em.testStreamHandler == nil && len(em.processTimeEvents.events) > 0 {
// If there's no test stream involved, and processing time events exist, then
// it's only a matter of time.
return nil
}
// The job has quiesced!
// There are no further incoming watermark changes, see if there are test stream events for this job.
nextEvent := em.testStreamHandler.NextEvent()
if nextEvent != nil {
nextEvent.Execute(em)
// Decrement pending for the event being processed.
em.addPending(-1)
// If there are changedStages scheduled for a watermark refresh,
// then test stream permits execution to continue.
// Note: it's a prism bug if test stream never causes a refresh to occur for a given event.
// It's not correct to move to the next event if no refreshes would occur.
if len(em.changedStages) > 0 {
return nil
} else if _, ok := nextEvent.(tsProcessingTimeEvent); ok {
// It's impossible to fully control processing time SDK side handling for processing time
// Runner side, so we specialize refresh handling here to avoid spuriously getting stuck.
em.changedStages.insert(em.testStreamHandler.ID)
return nil
}
// If there are no changed stages due to a test stream event
// then there's no mechanism to make progress, so it's time to fast fail.
}
v := em.livePending.Load()
if v == 0 {
// Since there are no further pending elements, the job will be terminating successfully.
return nil
}
// The job is officially stuck. Fail fast and produce debugging information.
// Jobs must never get stuck so this indicates a bug in prism to be investigated.
slog.Debug("Bundles: nothing in progress and no refreshes", slog.Int64("pendingElementCount", v))
var stageState []string
ids := maps.Keys(em.stages)
if em.testStreamHandler != nil {
stageState = append(stageState, fmt.Sprintf("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n",
em.testStreamHandler.completed, em.testStreamHandler.nextEventIndex, len(em.testStreamHandler.events), em.testStreamHandler.events, em.testStreamHandler.processingTime, mtime.FromTime(em.testStreamHandler.processingTime), em.processTimeEvents))
} else {
stageState = append(stageState, fmt.Sprintf("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n", em.ProcessingTimeNow(), em.processTimeEvents.events, em.injectedBundles))
}
sort.Strings(ids)
for _, id := range ids {
ss := em.stages[id]
inW := ss.InputWatermark()
outW := ss.OutputWatermark()
upPCol, upW := ss.UpstreamWatermark()
upS := em.pcolParents[upPCol]
if upS == "" {
upS = "IMPULSE " // (extra spaces to allow print to align better.)
}
stageState = append(stageState, fmt.Sprintln(id, "watermark in", inW, "out", outW, "upstream", upW, "from", upS, "pending", ss.pending, "byKey", ss.pendingByKeys, "inprogressKeys", ss.inprogressKeys, "byBundle", ss.inprogressKeysByBundle, "holds", ss.watermarkHolds.heap, "holdCounts", ss.watermarkHolds.counts, "holdsInBundle", ss.inprogressHoldsByBundle, "pttEvents", ss.processingTimeTimers.toFire, "bundlesToInject", ss.bundlesToInject))
var outputConsumers, sideConsumers []string
for _, col := range ss.outputIDs {
outputConsumers = append(outputConsumers, em.consumers[col]...)
for _, l := range em.sideConsumers[col] {
sideConsumers = append(sideConsumers, l.Global)
}
}
stageState = append(stageState, fmt.Sprintf("\tsideInputs: %v outputCols: %v outputConsumers: %v sideConsumers: %v\n", ss.sides, ss.outputIDs, outputConsumers, sideConsumers))
}
return errors.Errorf("nothing in progress and no refreshes with non zero pending elements: %v\n%v", v, strings.Join(stageState, ""))
}
// InputForBundle returns pre-allocated data for the given bundle, encoding the elements using
// the PCollection's coders.
func (em *ElementManager) InputForBundle(rb RunBundle, info PColInfo) [][]byte {
ss := em.stages[rb.StageID]
ss.mu.Lock()
defer ss.mu.Unlock()
es := ss.inprogress[rb.BundleID]
return es.ToData(info)
}
// DataAndTimerInputForBundle returns pre-allocated data for the given bundle and the estimated number of elements.
// Elements are encoded with the PCollection's coders.
func (em *ElementManager) DataAndTimerInputForBundle(rb RunBundle, info PColInfo) ([]*Block, int) {
ss := em.stages[rb.StageID]
ss.mu.Lock()
defer ss.mu.Unlock()
es := ss.inprogress[rb.BundleID]
var total int
var ret []*Block
cur := &Block{}
for _, e := range es.es {
switch {
case e.IsTimer() && (cur.Kind != BlockTimer || e.family != cur.Family || cur.Transform != e.transform):
total += len(cur.Bytes)
cur = &Block{
Kind: BlockTimer,
Transform: e.transform,
Family: e.family,
}
ret = append(ret, cur)
fallthrough
case e.IsTimer() && cur.Kind == BlockTimer:
var buf bytes.Buffer
// Key
buf.Write(e.keyBytes) // Includes the length prefix if any.
// Tag
coder.EncodeVarInt(int64(len(e.tag)), &buf)
buf.WriteString(e.tag)
// Windows
info.WEnc.Encode([]typex.Window{e.window}, &buf)
// Clear
buf.Write([]byte{0})
// Firing timestamp
coder.EncodeEventTime(e.timestamp, &buf)
// Hold timestamp
coder.EncodeEventTime(e.holdTimestamp, &buf)
// Pane
coder.EncodePane(e.pane, &buf)
cur.Bytes = append(cur.Bytes, buf.Bytes())
case cur.Kind != BlockData:
total += len(cur.Bytes)
cur = &Block{
Kind: BlockData,
}
ret = append(ret, cur)
fallthrough
default:
var buf bytes.Buffer
exec.EncodeWindowedValueHeader(info.WEnc, []typex.Window{e.window}, e.timestamp, e.pane, &buf)
buf.Write(e.elmBytes)
cur.Bytes = append(cur.Bytes, buf.Bytes())
}
}
total += len(cur.Bytes)
return ret, total
}
// BlockKind indicates how the block is to be handled.
type BlockKind int32
const (
blockUnset BlockKind = iota // blockUnset
BlockData // BlockData represents data for the bundle.
BlockTimer // BlockTimer represents timers for the bundle.
)
// Block represents a contiguous set of data or timers for the same destination.
type Block struct {
Kind BlockKind
Bytes [][]byte
Transform, Family string
}
// StaticTimerID represents the static user identifiers for a timer,
// in particular, the ID of the Transform, and the family for the timer.
type StaticTimerID struct {
TransformID, TimerFamily string
}
// StateForBundle retreives relevant state for the given bundle, WRT the data in the bundle.
//
// TODO(lostluck): Consider unifiying with InputForBundle, to reduce lock contention.
func (em *ElementManager) StateForBundle(rb RunBundle) TentativeData {
ss := em.stages[rb.StageID]
ss.mu.Lock()
defer ss.mu.Unlock()
ret := TentativeData{
stateTypeLen: ss.stateTypeLen,
}
keys := ss.inprogressKeysByBundle[rb.BundleID]
// TODO(lostluck): Also track windows per bundle, to reduce copying.
if len(ss.state) > 0 {
ret.state = map[LinkID]map[typex.Window]map[string]StateData{}
}
for link, winMap := range ss.state {
for w, keyMap := range winMap {
for key := range keys {
data, ok := keyMap[key]
if !ok {
continue
}
linkMap, ok := ret.state[link]
if !ok {
linkMap = map[typex.Window]map[string]StateData{}
ret.state[link] = linkMap
}
wlinkMap, ok := linkMap[w]
if !ok {
wlinkMap = map[string]StateData{}
linkMap[w] = wlinkMap
}
var mm map[string][][]byte
if len(data.Multimap) > 0 {
mm = map[string][][]byte{}
for uk, v := range data.Multimap {
// Clone the "holding" slice, but refer to the existing data bytes.
mm[uk] = append([][]byte(nil), v...)
}
}
// Clone the "holding" slice, but refer to the existing data bytes.
wlinkMap[key] = StateData{
Bag: append([][]byte(nil), data.Bag...),
Multimap: mm,
}
}
}
}
return ret
}
// Residual represents the unprocessed portion of a single element to be rescheduled for processing later.
type Residual struct {
Element []byte
Delay time.Duration // The relative time delay.
Bounded bool // Whether this element is finite or not.
}
// Residuals is used to specify process continuations within a bundle.
type Residuals struct {
Data []Residual
TransformID, InputID string // Prism only allows one SDF at the root of a bundledescriptor so there should only be one each.
MinOutputWatermarks map[string]mtime.Time // Output watermarks (technically per Residual, but aggregated here until it makes a difference.)
}
// reElementResiduals extracts the windowed value header from residual bytes, and explodes them
// back out to their windows.
func reElementResiduals(residuals []Residual, inputInfo PColInfo, rb RunBundle) []element {
var unprocessedElements []element
for _, residual := range residuals {
buf := bytes.NewBuffer(residual.Element)
ws, et, pn, err := exec.DecodeWindowedValueHeader(inputInfo.WDec, buf)
if err != nil {
if err == io.EOF {
break
}
slog.Error("reElementResiduals: error decoding residual header", "error", err, "bundle", rb)
panic("error decoding residual header:" + err.Error())
}
if len(ws) == 0 {
slog.Error("reElementResiduals: sdk provided a windowed value header 0 windows", "bundle", rb)
panic("error decoding residual header: sdk provided a windowed value header 0 windows")
}
// POSSIBLY BAD PATTERN: The buffer is invalidated on the next call, which doesn't always happen.
// But the decoder won't be mutating the buffer bytes, just reading the data. So the elmBytes
// should remain pointing to the whole element, and we should have a copy of the key bytes.
// Ideally, we're simply refering to the key part of the existing buffer.
elmBytes := buf.Bytes()
var keyBytes []byte
if inputInfo.KeyDec != nil {
keyBytes = inputInfo.KeyDec(buf)
}
for _, w := range ws {
unprocessedElements = append(unprocessedElements,
element{
window: w,
timestamp: et,
pane: pn,
elmBytes: elmBytes,
keyBytes: keyBytes,
sequence: len(unprocessedElements),
})
}
}
return unprocessedElements
}
// PersistBundle uses the tentative bundle output to update the watermarks for the stage.
// Each stage has two monotonically increasing watermarks, the input watermark, and the output
// watermark.
//
// MAX(CurrentInputWatermark, MIN(PendingElements, InputPCollectionWatermarks)
// MAX(CurrentOutputWatermark, MIN(InputWatermark, WatermarkHolds))
//
// PersistBundle takes in the stage ID, ID of the bundle associated with the pending
// input elements, and the committed output elements.
func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PColInfo, d TentativeData, inputInfo PColInfo, residuals Residuals) {
stage := em.stages[rb.StageID]
var seq int
for output, data := range d.Raw {
info := col2Coders[output]
var newPending []element
slog.Debug("PersistBundle: processing output", "bundle", rb, slog.String("output", output))
for _, datum := range data {
buf := bytes.NewBuffer(datum)
if len(datum) == 0 {
panic(fmt.Sprintf("zero length data for %v: ", output))
}
for {
var rawBytes bytes.Buffer
tee := io.TeeReader(buf, &rawBytes)
ws, et, pn, err := exec.DecodeWindowedValueHeader(info.WDec, tee)
if err != nil {
if err == io.EOF {
break
}
slog.Error("PersistBundle: error decoding watermarks", "error", err, "bundle", rb, slog.String("output", output))
panic("error decoding watermarks")
}
if len(ws) == 0 {
slog.Error("PersistBundle: sdk provided a windowed value header 0 windows", "bundle", rb)
panic("error decoding residual header: sdk provided a windowed value header 0 windows")
}
// TODO: Optimize unnecessary copies. This is doubleteeing.
elmBytes := info.EDec(tee)
var keyBytes []byte
if info.KeyDec != nil {
kbuf := bytes.NewBuffer(elmBytes)
keyBytes = info.KeyDec(kbuf) // TODO: Optimize unnecessary copies. This is tripleteeing?
}
for _, w := range ws {
newPending = append(newPending,
element{
window: w,
timestamp: et,
pane: stage.kind.updatePane(stage, pn, w, keyBytes),
elmBytes: elmBytes,
keyBytes: keyBytes,
sequence: seq,
})
seq++
}
}
}
consumers := em.consumers[output]
sideConsumers := em.sideConsumers[output]
slog.Debug("PersistBundle: bundle has downstream consumers.", "bundle", rb, slog.Int("newPending", len(newPending)), "consumers", consumers, "sideConsumers", sideConsumers)
for _, sID := range consumers {
consumer := em.stages[sID]
count := consumer.AddPending(em, newPending)
em.addPending(count)
}
for _, link := range sideConsumers {
consumer := em.stages[link.Global]
consumer.AddPendingSide(newPending, link.Transform, link.Local)
}
}
// Triage timers into their time domains for scheduling.
// EventTime timers are handled with normal elements,
// ProcessingTime timers need to be scheduled into the processing time based queue.
newHolds, ptRefreshes := em.triageTimers(d, inputInfo, stage)
// Return unprocessed to this stage's pending
// TODO sort out pending element watermark holds for process continuation residuals.
unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb)
// Add unprocessed back to the pending stack.
if len(unprocessedElements) > 0 {
// TODO actually reschedule based on the residuals delay...
count := stage.AddPending(em, unprocessedElements)
em.addPending(count)
}
// Clear out the inprogress elements associated with the completed bundle.
// Must be done after adding the new pending elements to avoid an incorrect
// watermark advancement.
stage.mu.Lock()
completed := stage.inprogress[rb.BundleID]
em.addPending(-len(completed.es))
delete(stage.inprogress, rb.BundleID)
for k := range stage.inprogressKeysByBundle[rb.BundleID] {
delete(stage.inprogressKeys, k)
}
delete(stage.inprogressKeysByBundle, rb.BundleID)
// Adjust holds as needed.
for h, c := range newHolds {
if c > 0 {
stage.watermarkHolds.Add(h, c)
} else if c < 0 {
stage.watermarkHolds.Drop(h, -c)
}
}
for hold, v := range stage.inprogressHoldsByBundle[rb.BundleID] {
stage.watermarkHolds.Drop(hold, v)
}
delete(stage.inprogressHoldsByBundle, rb.BundleID)
// Clean up OnWindowExpiration bundle accounting, so window state
// may be garbage collected.
if stage.expiryWindowsByBundles != nil {
win, ok := stage.expiryWindowsByBundles[rb.BundleID]
if ok {
stage.inProgressExpiredWindows[win] -= 1
if stage.inProgressExpiredWindows[win] == 0 {
delete(stage.inProgressExpiredWindows, win)
}
delete(stage.expiryWindowsByBundles, rb.BundleID)
}
}
// If there are estimated output watermarks, set the estimated
// output watermark for the stage.
if len(residuals.MinOutputWatermarks) > 0 {
estimate := mtime.MaxTimestamp
for _, t := range residuals.MinOutputWatermarks {
estimate = mtime.Min(estimate, t)
}
stage.estimatedOutput = estimate
}
// Handle persisting.
for link, winMap := range d.state {
linkMap, ok := stage.state[link]
if !ok {
linkMap = map[typex.Window]map[string]StateData{}
stage.state[link] = linkMap
}
for w, keyMap := range winMap {
wlinkMap, ok := linkMap[w]
if !ok {
wlinkMap = map[string]StateData{}
linkMap[w] = wlinkMap
}
for key, data := range keyMap {
wlinkMap[key] = data
}
}
}
stage.mu.Unlock()
em.markChangedAndClearBundle(stage.ID, rb.BundleID, ptRefreshes)
}
// triageTimers prepares received timers for eventual firing, as well as rebasing processing time timers as needed.
func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stage *stageState) (map[mtime.Time]int, set[mtime.Time]) {
// Process each timer family in the order we received them, so we can filter to the last one.
// Since we're process each timer family individually, use a unique key for each userkey, tag, window.
// The last timer set for each combination is the next one we're keeping.
type timerKey struct {
key string
tag string
win typex.Window
}
em.refreshCond.L.Lock()
emNow := em.ProcessingTimeNow()
em.refreshCond.L.Unlock()
var pendingEventTimers []element
var pendingProcessingTimers []fireElement
stageRefreshTimes := set[mtime.Time]{}
for tentativeKey, timers := range d.timers {
keyToTimers := map[timerKey]element{}
for _, t := range timers {
// TODO: Call in a for:range loop when Beam's minimum Go version hits 1.23.0
iter := decodeTimerIter(inputInfo.KeyDec, inputInfo.WindowCoder, t)
iter(func(ret timerRet) bool {
for _, e := range ret.elms {
keyToTimers[timerKey{key: string(ret.keyBytes), tag: ret.tag, win: e.window}] = e
}
if len(ret.elms) == 0 {
for _, w := range ret.windows {
delete(keyToTimers, timerKey{key: string(ret.keyBytes), tag: ret.tag, win: w})
}
}
// Indicate we'd like to continue iterating.
return true
})
}
for _, elm := range keyToTimers {
elm.transform = tentativeKey.Transform
elm.family = tentativeKey.Family
if stage.processingTimeTimersFamilies[elm.family] {
// Conditionally rebase processing time or always rebase?
newTimerFire := rebaseProcessingTime(emNow, elm.timestamp)
elm.timestamp = elm.holdTimestamp // Processing Time always uses the hold timestamp as the resulting event time.
pendingProcessingTimers = append(pendingProcessingTimers, fireElement{
firing: newTimerFire,
timer: elm,
})
// Add pending Processing timers to the stage's processing time store & schedule event in the manager.
stageRefreshTimes.insert(newTimerFire)
} else {
pendingEventTimers = append(pendingEventTimers, elm)
}
}
}
if len(pendingEventTimers) > 0 {
count := stage.AddPending(em, pendingEventTimers)
em.addPending(count)
}
changedHolds := map[mtime.Time]int{}
if len(pendingProcessingTimers) > 0 {
stage.mu.Lock()
var count int
for _, v := range pendingProcessingTimers {
count += stage.processingTimeTimers.Persist(v.firing, v.timer, changedHolds)
}
em.addPending(count)
stage.mu.Unlock()
}
return changedHolds, stageRefreshTimes
}
// FailBundle clears the extant data allowing the execution to shut down.
func (em *ElementManager) FailBundle(rb RunBundle) {
stage := em.stages[rb.StageID]
stage.mu.Lock()
completed := stage.inprogress[rb.BundleID]
em.addPending(-len(completed.es))
delete(stage.inprogress, rb.BundleID)
stage.mu.Unlock()
em.markChangedAndClearBundle(rb.StageID, rb.BundleID, nil)
}
// ReturnResiduals is called after a successful split, so the remaining work
// can be re-assigned to a new bundle.
func (em *ElementManager) ReturnResiduals(rb RunBundle, firstRsIndex int, inputInfo PColInfo, residuals Residuals) {
stage := em.stages[rb.StageID]
stage.splitBundle(rb, firstRsIndex)
unprocessedElements := reElementResiduals(residuals.Data, inputInfo, rb)
if len(unprocessedElements) > 0 {
slog.Debug("ReturnResiduals: unprocessed elements", "bundle", rb, "count", len(unprocessedElements))
count := stage.AddPending(em, unprocessedElements)
em.addPending(count)
}
em.markStagesAsChanged(singleSet(rb.StageID))
}
// markStagesAsChanged updates the set of changed stages,
// and broadcasts that there may be watermark evaluation work to do.
func (em *ElementManager) markStagesAsChanged(stages set[string]) {
em.refreshCond.L.Lock()
defer em.refreshCond.L.Unlock()
em.changedStages.merge(stages)
em.refreshCond.Broadcast()
}
// markChangedAndClearBundle markes the current stage as changed,
// and removes the given bundle from being in progress.
func (em *ElementManager) markChangedAndClearBundle(stageID, bundID string, ptRefreshes set[mtime.Time]) {
em.refreshCond.L.Lock()
defer em.refreshCond.L.Unlock()
delete(em.inprogressBundles, bundID)
em.changedStages.insert(stageID)
for t := range ptRefreshes {
em.processTimeEvents.Schedule(t, stageID)
}
em.refreshCond.Broadcast()
}
// refreshWatermarks incrementally refreshes the watermarks of stages that have
// been marked as changed, and returns the set of stages where the
// the watermark may have advanced.
// Must be called while holding em.refreshCond.L
func (em *ElementManager) refreshWatermarks() set[string] {
// Need to have at least one refresh signal.
nextUpdates := set[string]{}
refreshed := set[string]{}
var i int
for stageID := range em.changedStages {
// clear out old one.
em.changedStages.remove(stageID)
ss := em.stages[stageID]
refreshed.insert(stageID)
refreshes := ss.updateWatermarks(em)
nextUpdates.merge(refreshes)
// cap refreshes incrementally.
if i < 10 {
i++
} else {
break
}
}
em.changedStages.merge(nextUpdates)
return refreshed
}
type set[K comparable] map[K]struct{}
func (s set[K]) present(k K) bool {
_, ok := s[k]
return ok
}
func (s set[K]) remove(k K) {
delete(s, k)
}
func (s set[K]) insert(k K) {
s[k] = struct{}{}
}
func (s set[K]) merge(o set[K]) {
for k := range o {
s.insert(k)
}
}
func singleSet[T comparable](v T) set[T] {
return set[T]{v: struct{}{}}
}
// stageState is the internal watermark and input tracking for a stage.
type stageState struct {
ID string
inputID string // PCollection ID of the parallel input
outputIDs []string // PCollection IDs of outputs to update consumers.
sides []LinkID // PCollection IDs of side inputs that can block execution.
// Special handling bits
kind stageKind
strat WinStrat // Windowing Strategy for aggregation fireings.
processingTimeTimersFamilies map[string]bool // Indicates which timer families use the processing time domain.
// onWindowExpiration management
onWindowExpiration StaticTimerID // The static ID of the OnWindowExpiration callback.
keysToExpireByWindow map[typex.Window]set[string] // Tracks all keys ever used with a window, so they may be expired.
inProgressExpiredWindows map[typex.Window]int // Tracks the number of bundles currently expiring these windows, so we don't prematurely garbage collect them.
expiryWindowsByBundles map[string]typex.Window // Tracks which bundle is handling which window, so the above map can be cleared.
mu sync.Mutex
upstreamWatermarks sync.Map // watermark set from inputPCollection's parent.
input mtime.Time // input watermark for the parallel input.
output mtime.Time // Output watermark for the whole stage
estimatedOutput mtime.Time // Estimated watermark output from DoFns
pending elementHeap // pending input elements for this stage that are to be processesd
inprogress map[string]elements // inprogress elements by active bundles, keyed by bundle
sideInputs map[LinkID]map[typex.Window][][]byte // side input data for this stage, from {tid, inputID} -> window
// Fields for stateful stages which need to be per key.
pendingByKeys map[string]*dataAndTimers // pending input elements by Key, if stateful.
inprogressKeys set[string] // all keys that are assigned to bundles.
inprogressKeysByBundle map[string]set[string] // bundle to key assignments.
state map[LinkID]map[typex.Window]map[string]StateData // state data for this stage, from {tid, stateID} -> window -> userKey
stateTypeLen map[LinkID]func([]byte) int // map from state to a function that will produce the total length of a single value in bytes.
bundlesToInject []RunBundle // bundlesToInject are triggered bundles that will be injected by the watermark loop to avoid premature pipeline termination.
// Accounting for handling watermark holds for timers.
// We track the count of timers with the same hold, and clear it from
// the map and heap when the count goes to zero.
// This avoids scanning the heap to remove or access a hold for each element.
watermarkHolds *holdTracker
inprogressHoldsByBundle map[string]map[mtime.Time]int // bundle to associated holds.
processingTimeTimers *timerHandler
}
// stageKind handles behavioral differences between ordinary, stateful, and aggregation stage kinds.
//
// kinds should be stateless, and stageState retains all state for the stage,
// even if it's unused by the current kind.
type stageKind interface {
// addPending handles adding new pending elements to the stage appropriate for the kind.
addPending(ss *stageState, em *ElementManager, newPending []element) int
// buildEventTimeBundle handles building bundles for the stage per it's kind.
buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool, pendingAdjustment int)
// updatePane based on the stage state.
updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, keyBytes []byte) typex.PaneInfo
}
// ordinaryStageKind represents stages that have no special behavior associated with them.
// This represents most batch pending elements, and doesn't require keyed elements.
type ordinaryStageKind struct{}
func (*ordinaryStageKind) String() string { return "OrdinaryStage" }
func (*ordinaryStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, keyBytes []byte) typex.PaneInfo {
return pane
}
// statefulStageKind require keyed elements, and handles stages with stateful transforms, with state and timers.
type statefulStageKind struct{}
func (*statefulStageKind) String() string { return "StatefulStage" }
func (*statefulStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, keyBytes []byte) typex.PaneInfo {
return pane
}
// aggregateStageKind handles stages that perform aggregations over their primary inputs.
// They are a specialized kind of stateful stage, that doesn't handle timers.
type aggregateStageKind struct{}
func (*aggregateStageKind) String() string { return "AggregateStage" }
func (*aggregateStageKind) updatePane(ss *stageState, pane typex.PaneInfo, w typex.Window, keyBytes []byte) typex.PaneInfo {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.state[LinkID{}][w][string(keyBytes)].Pane
}
// timerKey uniquely identifies a given timer within the space of a user key.
type timerKey struct {
family, tag string
window typex.Window
}
type timerTimes struct {
firing, hold mtime.Time
}
// dataAndTimers represents all elements for a single user key and the latest
// eventTime for a given family and tag.
type dataAndTimers struct {
elements elementHeap
timers map[timerKey]timerTimes
}
// makeStageState produces an initialized stageState.
func makeStageState(ID string, inputIDs, outputIDs []string, sides []LinkID) *stageState {
ss := &stageState{
ID: ID,
outputIDs: outputIDs,
sides: sides,
state: map[LinkID]map[typex.Window]map[string]StateData{},
watermarkHolds: newHoldTracker(),
kind: &ordinaryStageKind{},
input: mtime.MinTimestamp,
output: mtime.MinTimestamp,
estimatedOutput: mtime.MinTimestamp,
processingTimeTimers: newTimerHandler(),
}
// Initialize the upstream watermarks to minTime.
for _, pcol := range inputIDs {
ss.upstreamWatermarks.Store(pcol, mtime.MinTimestamp)
}
if len(inputIDs) == 1 {
ss.inputID = inputIDs[0]
}
return ss
}
// AddPending adds elements to the pending heap.
func (ss *stageState) AddPending(em *ElementManager, newPending []element) int {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.kind.addPending(ss, em, newPending)
}
// addPending for aggregate stages behaves likes stateful stages, but don't need to handle timers or a separate window
// expiration condition.
func (*aggregateStageKind) addPending(ss *stageState, em *ElementManager, newPending []element) int {
// Late Data is data that has arrived after that window has expired.
// We only need to drop late data before aggregations.
// TODO - handle for side inputs too.
threshold := ss.output
origPending := make([]element, 0, ss.pending.Len())
for _, e := range newPending {
if ss.strat.EarliestCompletion(e.window) < threshold {
continue
}
origPending = append(origPending, e)
}
newPending = origPending
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
}
count := 0
for _, e := range newPending {
count++
if len(e.keyBytes) == 0 {
panic(fmt.Sprintf("zero length key: %v %v", ss.ID, ss.inputID))
}
dnt, ok := ss.pendingByKeys[string(e.keyBytes)]
if !ok {
dnt = &dataAndTimers{}
ss.pendingByKeys[string(e.keyBytes)] = dnt
}
heap.Push(&dnt.elements, e)
// Check on triggers for this key.
// We use an empty linkID as the key into state for aggregations.
if ss.state == nil {
ss.state = make(map[LinkID]map[typex.Window]map[string]StateData)
}
lv, ok := ss.state[LinkID{}]
if !ok {
lv = make(map[typex.Window]map[string]StateData)
ss.state[LinkID{}] = lv
}
wv, ok := lv[e.window]
if !ok {
wv = make(map[string]StateData)
lv[e.window] = wv
}
state := wv[string(e.keyBytes)]
endOfWindowReached := e.window.MaxTimestamp() < ss.input
ready := ss.strat.IsTriggerReady(triggerInput{
newElementCount: 1,
endOfWindowReached: endOfWindowReached,
}, &state)
if ready {
state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached)
}
// Store the state as triggers may have changed it.
ss.state[LinkID{}][e.window][string(e.keyBytes)] = state
// If we're ready, it's time to fire!
if ready {
count += ss.buildTriggeredBundle(em, e.keyBytes, e.window)
}
}
return count
}
func (*statefulStageKind) addPending(ss *stageState, em *ElementManager, newPending []element) int {
if ss.pendingByKeys == nil {
ss.pendingByKeys = map[string]*dataAndTimers{}
}
count := 0
for _, e := range newPending {
count++
if len(e.keyBytes) == 0 {
panic(fmt.Sprintf("zero length key: %v %v", ss.ID, ss.inputID))
}
dnt, ok := ss.pendingByKeys[string(e.keyBytes)]
if !ok {
dnt = &dataAndTimers{
timers: map[timerKey]timerTimes{},
}
ss.pendingByKeys[string(e.keyBytes)] = dnt
if ss.keysToExpireByWindow != nil {
w, ok := ss.keysToExpireByWindow[e.window]
if !ok {
w = make(set[string])
ss.keysToExpireByWindow[e.window] = w
}
w.insert(string(e.keyBytes))
}
}
heap.Push(&dnt.elements, e)
if e.IsTimer() {
if lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]; ok {
// existing timer!
// don't increase the count this time, as "this" timer is already pending.
count--
// clear out the existing hold for accounting purposes.
ss.watermarkHolds.Drop(lastSet.hold, 1)
}
// Update the last set time on the timer.
dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}] = timerTimes{firing: e.timestamp, hold: e.holdTimestamp}
// Mark the hold in the heap.
ss.watermarkHolds.Add(e.holdTimestamp, 1)
}
}
return count
}
func (*ordinaryStageKind) addPending(ss *stageState, em *ElementManager, newPending []element) int {
ss.pending = append(ss.pending, newPending...)
heap.Init(&ss.pending)
return len(newPending)
}
// computeNextTriggeredPane produces the correct pane relative to the previous pane,
// and the end of window state.
func computeNextTriggeredPane(pane typex.PaneInfo, endOfWindowReached bool) typex.PaneInfo {
// This is the first firing, since index and first are both
// set to their zero values.
if pane.Index == 0 && !pane.IsFirst {
pane.IsFirst = true
} else {
pane.Index++
pane.IsFirst = false
}
if endOfWindowReached {
pane.Timing = typex.PaneLate
pane.NonSpeculativeIndex++
} else {
pane.Timing = typex.PaneEarly
pane.NonSpeculativeIndex = -1
}
return pane
}
// computeNextWatermarkPane computes the next pane given the previous pane,
// when the watermark passes either the End of Window, or End of Window plus
// the allowed lateness.
func computeNextWatermarkPane(pane typex.PaneInfo) typex.PaneInfo {
// The pane state is still early: this is the OnTime firing.
switch pane.Timing {
case typex.PaneEarly:
// We haven't fired ontime yet.
pane.Timing = typex.PaneOnTime
pane.NonSpeculativeIndex = 0
case typex.PaneOnTime:
// This must be the closing pane after an ontime pane.
pane.Timing = typex.PaneLate
pane.NonSpeculativeIndex++
case typex.PaneLate:
// We have had some other late pane.
pane.NonSpeculativeIndex++
}
// This is the first firing, since index and first are both
// set to their zero values.
if pane.Index == 0 && !pane.IsFirst {
pane.IsFirst = true
} else {
pane.Index++
pane.IsFirst = false
}
return pane
}
// buildTriggeredBundle must be called with the stage.mu lock held.
// When in discarding mode, returns 0.
// When in accumulating mode, returns the number of fired elements to maintain a correct pending count.
func (ss *stageState) buildTriggeredBundle(em *ElementManager, key []byte, win typex.Window) int {
var toProcess []element
dnt := ss.pendingByKeys[string(key)]
var notYet []element
rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input}
// Look at all elements for this key, and only for this window.
for dnt.elements.Len() > 0 {
e := heap.Pop(&dnt.elements).(element)
if e.window != win {
notYet = append(notYet, e)
continue
}
toProcess = append(toProcess, e)
}
// accumulationDiff adjusts the pending elements count to include
// the accumulated elements, which would be the new elements, but
// also all previous elements as well, which are duplicated at
// at every pane.
accumulationDiff := 0
if ss.strat.Accumulating {
// When accumulating, we need to retain all elements until the last pane firing.
for _, e := range toProcess {
heap.Push(&dnt.elements, e)
}
accumulationDiff += len(toProcess)
}
dnt.elements = append(dnt.elements, notYet...)
if dnt.elements.Len() == 0 {
delete(ss.pendingByKeys, string(key))
} else {
// Ensure the heap invariants are maintained.
heap.Init(&dnt.elements)
}
if ss.inprogressKeys == nil {
ss.inprogressKeys = set[string]{}
}
ss.makeInProgressBundle(
func() string { return rb.BundleID },
toProcess,
ss.input,
singleSet(string(key)),
nil,
)
ss.bundlesToInject = append(ss.bundlesToInject, rb)
// Bundle is marked in progress here to prevent a race condition.
em.refreshCond.L.Lock()
em.inprogressBundles.insert(rb.BundleID)
em.refreshCond.L.Unlock()
return accumulationDiff
}
// AddPendingSide adds elements to be consumed as side inputs.
func (ss *stageState) AddPendingSide(newPending []element, tID, inputID string) {
ss.mu.Lock()
defer ss.mu.Unlock()
if ss.sideInputs == nil {
ss.sideInputs = map[LinkID]map[typex.Window][][]byte{}
}
key := LinkID{Transform: tID, Local: inputID}
in, ok := ss.sideInputs[key]
if !ok {
in = map[typex.Window][][]byte{}
ss.sideInputs[key] = in
}
for _, e := range newPending {
in[e.window] = append(in[e.window], e.elmBytes)
}
}
// GetSideData returns side input data for the provided transform+input pair, valid to the watermark.
func (ss *stageState) GetSideData(tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte {
ss.mu.Lock()
defer ss.mu.Unlock()
d := ss.sideInputs[LinkID{Transform: tID, Local: inputID}]
ret := map[typex.Window][][]byte{}
for win, ds := range d {
if win.MaxTimestamp() <= watermark {
ret[win] = ds
}
}
return ret
}
// GetSideData returns side input data for the provided stage+transform+input tuple, valid to the watermark.
func (em *ElementManager) GetSideData(sID, tID, inputID string, watermark mtime.Time) map[typex.Window][][]byte {
return em.stages[sID].GetSideData(tID, inputID, watermark)
}
// updateUpstreamWatermark is for the parent of the input pcollection
// to call, to update downstream stages with it's current watermark.
// This avoids downstream stages inverting lock orderings from
// calling their parent stage to get their input pcollection's watermark.
func (ss *stageState) updateUpstreamWatermark(pcol string, upstream mtime.Time) {
// A stage will only have a single upstream watermark, so
// we simply set this.
ss.upstreamWatermarks.Store(pcol, upstream)
}
// UpstreamWatermark gets the minimum value of all upstream watermarks.
func (ss *stageState) UpstreamWatermark() (string, mtime.Time) {
upstream := mtime.MaxTimestamp
var name string
ss.upstreamWatermarks.Range(func(key, val any) bool {
// Use <= to ensure if available we get a name.
if val.(mtime.Time) <= upstream {
upstream = val.(mtime.Time)
name = key.(string)
}
return true
})
return name, upstream
}
// InputWatermark gets the current input watermark for the stage.
func (ss *stageState) InputWatermark() mtime.Time {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.input
}
// OutputWatermark gets the current output watermark for the stage.
func (ss *stageState) OutputWatermark() mtime.Time {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.output
}
// TODO: Move to better place for configuration
var (
OneKeyPerBundle bool // OneKeyPerBundle sets if a bundle is restricted to a single key.
OneElementPerKey bool // OneElementPerKey sets if a key in a bundle is restricted to one element.
)
// startBundle initializes a bundle with elements if possible.
// A bundle only starts if there are elements at all, and if it's
// an aggregation stage, if the windowing stratgy allows it.
// Returns a non-zero adjustment to the pending elements count if the stage is accumulating.
func (ss *stageState) startEventTimeBundle(watermark mtime.Time, genBundID func() string) (string, bool, bool, int) {
defer func() {
if e := recover(); e != nil {
panic(fmt.Sprintf("generating bundle for stage %v at watermark %v panicked\n%v", ss.ID, watermark, e))
}
}()
ss.mu.Lock()
defer ss.mu.Unlock()
toProcess, minTs, newKeys, holdsInBundle, stillSchedulable, accumulatingPendingAdjustment := ss.kind.buildEventTimeBundle(ss, watermark)
if len(toProcess) == 0 {
// If we have nothing, there's nothing to progress.
return "", false, stillSchedulable, accumulatingPendingAdjustment
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle)
return bundID, true, stillSchedulable, accumulatingPendingAdjustment
}
// buildEventTimeBundle for ordinary stages processes all pending elements.
func (*ordinaryStageKind) buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
toProcess = ss.pending
ss.pending = nil
return toProcess, mtime.MaxTimestamp, nil, nil, true, 0
}
// buildEventTimeBundle for stateful stages, processes all elements that are before the input watermark time.
func (*statefulStageKind) buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
minTs := mtime.MaxTimestamp
// TODO: Allow configurable limit of keys per bundle, and elements per key to improve parallelism.
// TODO: when we do, we need to ensure that the stage remains schedualable for bundle execution, for remaining pending elements and keys.
// With the greedy approach, we don't need to since "new data" triggers a refresh, and so should completing processing of a bundle.
newKeys := set[string]{}
holdsInBundle := map[mtime.Time]int{}
// If timers are cleared, and we end up with nothing to process
// we need to reschedule a watermark refresh, since those vestigial
// timers might have held back the minimum pending watermark.
timerCleared := false
keysPerBundle:
for k, dnt := range ss.pendingByKeys {
if ss.inprogressKeys.present(k) {
continue
}
newKeys.insert(k)
// Track the min-timestamp for later watermark handling.
if dnt.elements[0].timestamp < minTs {
minTs = dnt.elements[0].timestamp
}
dataInBundle := false
var toProcessForKey []element
// Can we pre-compute this bit when adding to pendingByKeys?
// startBundle is in run in a single scheduling goroutine, so moving per-element code
// to be computed by the bundle parallel goroutines will speed things up a touch.
for dnt.elements.Len() > 0 {
// We can't mix data and timers in the same bundle, as there's no
// guarantee which is processed first SDK side.
// If the bundle already contains data, then it's before the timer
// by the heap invariant, and must be processed before we can fire a timer.
// AKA, keep them seperate.
if len(toProcessForKey) > 0 && // If we have already picked some elements AND
((dataInBundle && dnt.elements[0].IsTimer()) || // we're about to add a timer to a Bundle that already has data OR
(!dataInBundle && !dnt.elements[0].IsTimer())) { // we're about to add data to a bundle that already has a time
break
}
e := heap.Pop(&dnt.elements).(element)
if e.IsData() {
dataInBundle = true
} else {
lastSet, ok := dnt.timers[timerKey{family: e.family, tag: e.tag, window: e.window}]
if !ok {
timerCleared = true
continue // Timer has "fired" already, so this can be ignored.
}
if e.timestamp != lastSet.firing {
timerCleared = true
continue
}
holdsInBundle[e.holdTimestamp]++
// Clear the "fired" timer so subsequent matches can be ignored.
delete(dnt.timers, timerKey{family: e.family, tag: e.tag, window: e.window})
}
toProcessForKey = append(toProcessForKey, e)
if OneElementPerKey {
break
}
}
toProcess = append(toProcess, toProcessForKey...)
if dnt.elements.Len() == 0 {
delete(ss.pendingByKeys, k)
}
if OneKeyPerBundle {
break keysPerBundle
}
}
// If we're out of data, and timers were not cleared then the watermark is accurate.
stillSchedulable := !(len(ss.pendingByKeys) == 0 && !timerCleared)
return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable, 0
}
// buildEventTimeBundle for aggregation stages, processes all elements that are within the watermark for completed windows.
func (*aggregateStageKind) buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, _ mtime.Time, _ set[string], _ map[mtime.Time]int, schedulable bool, pendingAdjustment int) {
minTs := mtime.MaxTimestamp
// TODO: Allow configurable limit of keys per bundle, and elements per key to improve parallelism.
// TODO: when we do, we need to ensure that the stage remains schedualable for bundle execution, for remaining pending elements and keys.
// With the greedy approach, we don't need to since "new data" triggers a refresh, and so should completing processing of a bundle.
newKeys := set[string]{}
holdsInBundle := map[mtime.Time]int{}
accumulatingPendingAdjustment := 0
keysPerBundle:
for k, dnt := range ss.pendingByKeys {
if ss.inprogressKeys.present(k) {
continue
}
newKeys.insert(k)
// Track the min-timestamp for later watermark handling.
if dnt.elements[0].timestamp < minTs {
minTs = dnt.elements[0].timestamp
}
var toProcessForKey []element
// Can we pre-compute this bit when adding to pendingByKeys?
// startBundle is in run in a single scheduling goroutine, so moving per-element code
// to be computed by the bundle parallel goroutines will speed things up a touch.
for dnt.elements.Len() > 0 {
// For aggregations, only include elements for this key
// if we're after the end of window, or after the window expiry deadline.
// We will only ever trigger aggregations by watermark at most twice, once the watermark passes the window ends for OnTime completion,
// and once for when the window is closing.
elm := dnt.elements[0]
if watermark <= elm.window.MaxTimestamp() {
// The watermark hasn't passed the end of the window yet, we do nothing.
break
}
// Watermark is past the end of this window. Have we fired an OnTime pane yet?
state := ss.state[LinkID{}][elm.window][string(elm.keyBytes)]
// If this is not the ontime firing for this key.
if state.Pane.Timing != typex.PaneEarly && watermark <= ss.strat.EarliestCompletion(elm.window) {
// The watermark is still before the earliest final completion for this window.
// Do not add further data for this firing.
// If this is the Never trigger, we also don't fire OnTime until after the earliest completion.
break
}
if ss.strat.IsNeverTrigger() && watermark <= ss.strat.EarliestCompletion(elm.window) {
// The NeverTrigger only has a single firing at the end of window + allowed lateness.
break
}
e := heap.Pop(&dnt.elements).(element)
toProcessForKey = append(toProcessForKey, e)
}
// Get the pane for the aggregation correct, only mutate it once per key and window.
handledWindows := set[typex.Window]{}
for _, elm := range toProcessForKey {
state := ss.state[LinkID{}][elm.window][string(elm.keyBytes)]
if handledWindows.present(elm.window) {
// The pane is already correct for this key + window + firing.
if ss.strat.Accumulating && !state.Pane.IsLast {
// If this isn't the last pane, then we must add the element back to the pending store for subsequent firings.
heap.Push(&dnt.elements, elm)
accumulatingPendingAdjustment++
}
continue
}
handledWindows.insert(elm.window)
state.Pane = computeNextWatermarkPane(state.Pane)
// Determine if this is the last pane.
// Check if this is the post closing firing, which will be the last one.
// Unless it's the ontime pane, at which point it can never be last.
if watermark > ss.strat.EarliestCompletion(elm.window) && state.Pane.Timing != typex.PaneOnTime {
state.Pane.IsLast = true
}
if ss.strat.AllowedLateness == 0 || ss.strat.IsNeverTrigger() {
// If the allowed lateness is zero, then this will be the last pane.
// If this is the NeverTrigger, it's the last pane.
state.Pane.IsLast = true
}
ss.state[LinkID{}][elm.window][string(elm.keyBytes)] = state
// The pane is already correct for this key + window + firing.
if ss.strat.Accumulating && !state.Pane.IsLast {
// If this isn't the last pane, then we must add the element back to the pending store for subsequent firings.
heap.Push(&dnt.elements, elm)
accumulatingPendingAdjustment++
}
}
toProcess = append(toProcess, toProcessForKey...)
if dnt.elements.Len() == 0 {
delete(ss.pendingByKeys, k)
}
if OneKeyPerBundle {
break keysPerBundle
}
}
// If this is an aggregate, we need a watermark change in order to reschedule
stillSchedulable := false
return toProcess, minTs, newKeys, holdsInBundle, stillSchedulable, accumulatingPendingAdjustment
}
func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.Time, genBundID func() string) (string, bool, bool) {
ss.mu.Lock()
defer ss.mu.Unlock()
// TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime
// Special Case for ProcessingTime handling.
// Eg. Always queue EventTime elements at minTime.
// Iterate all available processingTime events until we can't anymore.
//
// Potentially puts too much work on the scheduling thread though.
var toProcess []element
minTs := mtime.MaxTimestamp
holdsInBundle := map[mtime.Time]int{}
var notYet []fireElement
nextTime := ss.processingTimeTimers.Peek()
keyCounts := map[string]int{}
newKeys := set[string]{}
for nextTime <= emNow {
elems := ss.processingTimeTimers.FireAt(nextTime)
for _, e := range elems {
// Check if we're already executing this timer's key.
if ss.inprogressKeys.present(string(e.keyBytes)) {
notYet = append(notYet, fireElement{firing: nextTime, timer: e})
continue
}
// If we are set to have OneKeyPerBundle, and we already have a key for this bundle, we process it later.
if len(keyCounts) > 0 && OneKeyPerBundle {
notYet = append(notYet, fireElement{firing: nextTime, timer: e})
continue
}
// If we are set to have OneElementPerKey, and we already have an element for this key we set this to process later.
if v := keyCounts[string(e.keyBytes)]; v > 0 && OneElementPerKey {
notYet = append(notYet, fireElement{firing: nextTime, timer: e})
continue
}
keyCounts[string(e.keyBytes)]++
newKeys.insert(string(e.keyBytes))
if e.timestamp < minTs {
minTs = e.timestamp
}
holdsInBundle[e.holdTimestamp]++
// We're going to process this timer!
toProcess = append(toProcess, e)
}
nextTime = ss.processingTimeTimers.Peek()
if nextTime == mtime.MaxTimestamp {
// Escape the loop if there are no more events.
break
}
}
// Reschedule unfired timers.
notYetHolds := map[mtime.Time]int{}
for _, v := range notYet {
ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds)
em.processTimeEvents.Schedule(v.firing, ss.ID)
}
// Add a refresh if there are still processing time events to process.
stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0)
if len(toProcess) == 0 {
// If we have nothing
return "", false, stillSchedulable
}
bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle)
return bundID, true, stillSchedulable
}
// makeInProgressBundle is common code to store a set of elements as a bundle in progress.
//
// Callers must hold the stage lock.
func (ss *stageState) makeInProgressBundle(genBundID func() string, toProcess []element, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int) string {
// Catch the ordinary case for the minimum timestamp.
if toProcess[0].timestamp < minTs {
minTs = toProcess[0].timestamp
}
es := elements{
es: toProcess,
minTimestamp: minTs,
}
if ss.inprogress == nil {
ss.inprogress = make(map[string]elements)
}
if ss.inprogressKeysByBundle == nil {
ss.inprogressKeysByBundle = make(map[string]set[string])
}
if ss.inprogressHoldsByBundle == nil {
ss.inprogressHoldsByBundle = make(map[string]map[mtime.Time]int)
}
bundID := genBundID()
ss.inprogress[bundID] = es
ss.inprogressKeysByBundle[bundID] = newKeys
ss.inprogressKeys.merge(newKeys)
ss.inprogressHoldsByBundle[bundID] = holdsInBundle
return bundID
}
func (ss *stageState) splitBundle(rb RunBundle, firstResidual int) {
ss.mu.Lock()
defer ss.mu.Unlock()
es := ss.inprogress[rb.BundleID]
slog.Debug("split elements", "bundle", rb, "elem count", len(es.es), "res", firstResidual)
prim := es.es[:firstResidual]
res := es.es[firstResidual:]
es.es = prim
ss.pending = append(ss.pending, res...)
heap.Init(&ss.pending)
ss.inprogress[rb.BundleID] = es
}
// minimumPendingTimestamp returns the minimum pending timestamp from all pending elements,
// including in progress ones.
//
// Assumes that the pending heap is initialized if it's not empty.
func (ss *stageState) minPendingTimestamp() mtime.Time {
ss.mu.Lock()
defer ss.mu.Unlock()
return ss.minPendingTimestampLocked()
}
// minPendingTimestampLocked must be called under the ss.mu Lock.
func (ss *stageState) minPendingTimestampLocked() mtime.Time {
minPending := mtime.MaxTimestamp
if len(ss.pending) != 0 {
minPending = ss.pending[0].timestamp
}
if len(ss.pendingByKeys) != 0 {
// TODO(lostluck): Can we figure out how to avoid checking every key on every watermark refresh?
for _, dnt := range ss.pendingByKeys {
minPending = mtime.Min(minPending, dnt.elements[0].timestamp)
}
}
for _, es := range ss.inprogress {
minPending = mtime.Min(minPending, es.minTimestamp)
}
return minPending
}
func (ss *stageState) String() string {
pcol, up := ss.UpstreamWatermark()
return fmt.Sprintf("[%v] IN: %v OUT: %v UP: %q %v, kind: %v", ss.ID, ss.input, ss.output, pcol, up, ss.kind)
}
// updateWatermarks performs the following operations:
//
// Watermark_In' = MAX(Watermark_In, MIN(U(TS_Pending), U(Watermark_InputPCollection)))
// Watermark_Out' = MAX(Watermark_Out, MIN(Watermark_In', U(minWatermarkHold)))
// Watermark_PCollection = Watermark_Out_ProducingPTransform
func (ss *stageState) updateWatermarks(em *ElementManager) set[string] {
ss.mu.Lock()
defer ss.mu.Unlock()
minPending := ss.minPendingTimestampLocked()
minWatermarkHold := ss.watermarkHolds.Min()
// PCollection watermarks are based on their parents's output watermark.
_, newIn := ss.UpstreamWatermark()
// Set the input watermark based on the minimum pending elements,
// and the current input pcollection watermark.
if minPending < newIn {
newIn = minPending
}
// If bigger, advance the input watermark.
if newIn > ss.input {
ss.input = newIn
}
// The output starts with the new input as the basis.
newOut := ss.input
// If we're given an estimate, and it's further ahead, we use that instead.
if ss.estimatedOutput > ss.output {
newOut = ss.estimatedOutput
}
// We adjust based on the minimum state hold.
// If we hold it, mark this stage as refreshable?
if minWatermarkHold < newOut {
newOut = minWatermarkHold
}
// If the newOut is smaller, then don't change downstream watermarks.
if newOut <= ss.output {
return nil
}
// If bigger, advance the output watermark
preventDownstreamUpdate := ss.createOnWindowExpirationBundles(newOut, em)
// Garbage collect state, timers and side inputs, for all windows
// that are before the new output watermark, if they aren't in progress
// of being expired.
// They'll never be read in again.
for _, wins := range ss.sideInputs {
for win := range wins {
// Clear out anything we've already used.
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
}
delete(wins, win)
}
}
}
for _, wins := range ss.state {
for win := range wins {
if ss.strat.EarliestCompletion(win) < newOut {
// If the expiry is in progress, skip collecting this window.
if ss.inProgressExpiredWindows[win] > 0 {
continue
}
delete(wins, win)
}
}
}
// If there are windows to expire, we don't update the output watermark yet.
if preventDownstreamUpdate {
return nil
}
// Update this stage's output watermark, and then propagate that to downstream stages
refreshes := set[string]{}
ss.output = newOut
for _, outputCol := range ss.outputIDs {
consumers := em.consumers[outputCol]
for _, sID := range consumers {
em.stages[sID].updateUpstreamWatermark(outputCol, ss.output)
refreshes.insert(sID)
}
// Inform side input consumers, but don't update the upstream watermark.
for _, sID := range em.sideConsumers[outputCol] {
refreshes.insert(sID.Global)
}
}
return refreshes
}
// createOnWindowExpirationBundles injects bundles when windows
// expire for all keys that were used in that window. Returns true if any
// bundles are created, which means that the window must not yet be garbage
// collected.
//
// Must be called within the stageState.mu's and the ElementManager.refreshCond
// critical sections.
func (ss *stageState) createOnWindowExpirationBundles(newOut mtime.Time, em *ElementManager) bool {
var preventDownstreamUpdate bool
for win, keys := range ss.keysToExpireByWindow {
// Check if the window has expired.
if ss.strat.EarliestCompletion(win) >= newOut {
continue
}
// We can't advance the output watermark if there's garbage to collect.
preventDownstreamUpdate = true
// Hold off on garbage collecting data for these windows while these
// are in progress.
ss.inProgressExpiredWindows[win] += 1
// Produce bundle(s) for these keys and window, and inject them.
wm := win.MaxTimestamp()
rb := RunBundle{StageID: ss.ID, BundleID: "owe-" + em.nextBundID(), Watermark: wm}
// Now we need to actually build the bundle.
var toProcess []element
busyKeys := set[string]{}
usedKeys := set[string]{}
for k := range keys {
if ss.inprogressKeys.present(k) {
busyKeys.insert(k)
continue
}
usedKeys.insert(k)
toProcess = append(toProcess, element{
window: win,
timestamp: wm,
pane: typex.NoFiringPane(),
holdTimestamp: wm,
transform: ss.onWindowExpiration.TransformID,
family: ss.onWindowExpiration.TimerFamily,
sequence: 1,
keyBytes: []byte(k),
elmBytes: nil,
})
}
em.addPending(len(toProcess))
ss.watermarkHolds.Add(wm, 1)
ss.makeInProgressBundle(
func() string { return rb.BundleID },
toProcess,
wm,
usedKeys,
map[mtime.Time]int{wm: 1},
)
ss.expiryWindowsByBundles[rb.BundleID] = win
slog.Debug("OnWindowExpiration-Bundle Created", slog.Any("bundle", rb), slog.Any("usedKeys", usedKeys), slog.Any("window", win), slog.Any("toProcess", toProcess), slog.Any("busyKeys", busyKeys))
// We're already in the refreshCond critical section.
// Insert that this is in progress here to avoid a race condition.
em.inprogressBundles.insert(rb.BundleID)
em.injectedBundles = append(em.injectedBundles, rb)
// Remove the key accounting, or continue tracking which keys still need clearing.
if len(busyKeys) == 0 {
delete(ss.keysToExpireByWindow, win)
} else {
ss.keysToExpireByWindow[win] = busyKeys
}
}
return preventDownstreamUpdate
}
// bundleReady returns the maximum allowed watermark for this stage, and whether
// it's permitted to execute by side inputs.
func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.Time, bool, bool, bool) {
ss.mu.Lock()
defer ss.mu.Unlock()
ptimeEventsReady := ss.processingTimeTimers.Peek() <= emNow || emNow == mtime.MaxTimestamp
injectedReady := len(ss.bundlesToInject) > 0
// If the upstream watermark and the input watermark are the same,
// then we can't yet process this stage.
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
if inputW == upstreamW {
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
slog.Any("upstream", upstreamW),
slog.Any("input", inputW)))
return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady
}
ready := true
for _, side := range ss.sides {
pID, ok := em.pcolParents[side.Global]
if !ok {
panic(fmt.Sprintf("stage[%v] no parent ID for side input %v", ss.ID, side))
}
parent, ok := em.stages[pID]
if !ok {
panic(fmt.Sprintf("stage[%v] no parent for side input %v, with parent ID %v", ss.ID, side, pID))
}
ow := parent.OutputWatermark()
if upstreamW > ow {
ready = false
}
}
return upstreamW, ready, ptimeEventsReady, injectedReady
}
// ProcessingTimeNow gives the current processing time for the runner.
func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) {
if em.testStreamHandler != nil && !em.testStreamHandler.completed {
return em.testStreamHandler.Now()
}
// TODO toggle between testmode and production mode.
// "Test" mode -> advance to next processing time event if any, to allow execution.
// if test mode...
if t, ok := em.processTimeEvents.Peek(); ok {
return t
}
// "Production" mode, always real time now.
now := mtime.Now()
return now
}
// rebaseProcessingTime turns an absolute processing time to be relative to the provided local clock now.
// Necessary to reasonably schedule ProcessingTime timers within a TestStream using pipeline.
func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time {
return localNow + (scheduled - mtime.Now())
}