in sdks/go/pkg/beam/core/runtime/harness/harness.go [363:693]
func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse {
instID := instructionID(req.GetInstructionId())
ctx = metrics.SetBundleID(ctx, string(instID))
switch {
case req.GetRegister() != nil:
msg := req.GetRegister()
c.mu.Lock()
for _, desc := range msg.GetProcessBundleDescriptor() {
c.descriptors[bundleDescriptorID(desc.GetId())] = desc
}
c.mu.Unlock()
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_Register{
Register: &fnpb.RegisterResponse{},
},
}
case req.GetProcessBundle() != nil:
msg := req.GetProcessBundle()
// NOTE: the harness sends a 0-length process bundle request to sources (changed?)
bdID := bundleDescriptorID(msg.GetProcessBundleDescriptorId())
// TODO(lostluck): 2023/03/29 fix debug level logging to be flagged.
// log.Debugf(ctx, "PB [%v]: %v", instID, msg)
plan, err := c.getOrCreatePlan(bdID)
// Make the plan active.
c.mu.Lock()
c.inactive.Remove(instID)
c.active[instID] = plan
// Get the user metrics store for this bundle.
store := metrics.GetStore(ctx)
c.metStore[instID] = store
c.mu.Unlock()
if err != nil {
c.failed[instID] = err
return fail(ctx, instID, "ProcessBundle failed: %v", err)
}
tokens := msg.GetCacheTokens()
c.cache.SetValidTokens(tokens...)
data := NewScopedDataManager(c.data, instID)
state := NewScopedStateReaderWithCache(c.state, instID, c.cache)
sampler := newSampler(store)
go sampler.start(ctx, samplePeriod)
err = plan.Execute(ctx, string(instID), exec.DataContext{Data: data, State: state})
sampler.stop()
dataError := data.Close()
state.Close()
c.cache.CompleteBundle(tokens...)
mons, pylds, _ := monitoring(plan, store, c.runnerCapabilities[URNMonitoringInfoShortID])
checkpoints := plan.Checkpoint()
requiresFinalization := false
// Move the plan back to the candidate state
c.mu.Lock()
// Mark the instruction as failed.
if err != nil {
c.failed[instID] = err
} else if dataError != io.EOF && dataError != nil {
// If there was an error on the data channel reads, fail this bundle
// since we may have had a short read.
c.failed[instID] = dataError
err = dataError
} else {
// Non failure plans should either be moved to the finalized state
// or to plans so they can be re-used.
expiration := plan.GetExpirationTime()
if time.Now().Before(expiration) {
// TODO(BEAM-10976) - we can be a little smarter about data structures here by
// by storing plans awaiting finalization in a heap. That way when we expire plans
// here its O(1) instead of O(n) (though adding/finalizing will still be O(logn))
requiresFinalization = true
c.awaitingFinalization[instID] = awaitingFinalization{
expiration: expiration,
plan: plan,
bdID: bdID,
}
// Move any plans that have exceeded their expiration back into the re-use pool
for id, af := range c.awaitingFinalization {
if time.Now().After(af.expiration) {
c.plans[af.bdID] = append(c.plans[af.bdID], af.plan)
delete(c.awaitingFinalization, id)
}
}
} else {
c.plans[bdID] = append(c.plans[bdID], plan)
}
}
var rRoots []*fnpb.DelayedBundleApplication
if len(checkpoints) > 0 {
for _, cp := range checkpoints {
for _, r := range cp.SR.RS {
rRoots = append(rRoots, &fnpb.DelayedBundleApplication{
Application: &fnpb.BundleApplication{
TransformId: cp.SR.TId,
InputId: cp.SR.InId,
Element: r,
OutputWatermarks: cp.SR.OW,
},
RequestedTimeDelay: durationpb.New(cp.Reapply),
})
}
}
}
delete(c.active, instID)
if removed, ok := c.inactive.Insert(instID); ok {
delete(c.failed, removed) // Also GC old failed bundles.
}
delete(c.metStore, instID)
c.mu.Unlock()
if err != nil {
return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err)
}
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundle{
ProcessBundle: &fnpb.ProcessBundleResponse{
ResidualRoots: rRoots,
MonitoringData: pylds,
MonitoringInfos: mons,
RequiresFinalization: requiresFinalization,
},
},
}
case req.GetFinalizeBundle() != nil:
msg := req.GetFinalizeBundle()
ref := instructionID(msg.GetInstructionId())
af, ok := c.awaitingFinalization[ref]
if !ok {
return fail(ctx, instID, "finalize bundle failed for instruction %v: couldn't find plan in finalizing map", ref)
}
if time.Now().Before(af.expiration) {
if err := af.plan.Finalize(); err != nil {
return fail(ctx, instID, "finalize bundle failed for instruction %v using plan %v : %v", ref, af.bdID, err)
}
}
c.plans[af.bdID] = append(c.plans[af.bdID], af.plan)
delete(c.awaitingFinalization, ref)
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_FinalizeBundle{
FinalizeBundle: &fnpb.FinalizeBundleResponse{},
},
}
case req.GetProcessBundleProgress() != nil:
msg := req.GetProcessBundleProgress()
ref := instructionID(msg.GetInstructionId())
plan, store, resp := c.getPlanOrResponse(ctx, "progress", instID, ref)
if resp != nil {
return resp
}
if plan == nil && resp == nil {
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleProgress{
ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{},
},
}
}
mons, pylds, consumingReceivedData := monitoring(plan, store, c.runnerCapabilities[URNMonitoringInfoShortID])
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleProgress{
ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{
MonitoringData: pylds,
MonitoringInfos: mons,
ConsumingReceivedData: &consumingReceivedData,
},
},
}
case req.GetProcessBundleSplit() != nil:
msg := req.GetProcessBundleSplit()
// TODO(lostluck): 2023/03/29 fix debug level logging to be flagged.
// log.Debugf(ctx, "PB Split: %v", msg)
ref := instructionID(msg.GetInstructionId())
plan, _, resp := c.getPlanOrResponse(ctx, "split", instID, ref)
if resp != nil {
return resp
}
if plan == nil {
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
},
}
}
// Get the desired splits for the root FnAPI read operation.
ds := msg.GetDesiredSplits()[plan.SourcePTransformID()]
if ds == nil {
return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref)
}
sr, err := plan.Split(ctx, exec.SplitPoints{
Splits: ds.GetAllowedSplitPoints(),
Frac: ds.GetFractionOfRemainder(),
BufSize: ds.GetEstimatedInputElements(),
})
if err != nil {
return fail(ctx, instID, "unable to split %v: %v", ref, err)
}
// Unsuccessful splits without errors indicate we should return an empty response,
// as processing can continue.
if sr.Unsuccessful {
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
},
}
}
var pRoots []*fnpb.BundleApplication
var rRoots []*fnpb.DelayedBundleApplication
if len(sr.PS) > 0 && len(sr.RS) > 0 {
pRoots = make([]*fnpb.BundleApplication, len(sr.PS))
for i, p := range sr.PS {
pRoots[i] = &fnpb.BundleApplication{
TransformId: sr.TId,
InputId: sr.InId,
Element: p,
}
}
rRoots = make([]*fnpb.DelayedBundleApplication, len(sr.RS))
for i, r := range sr.RS {
rRoots[i] = &fnpb.DelayedBundleApplication{
Application: &fnpb.BundleApplication{
TransformId: sr.TId,
InputId: sr.InId,
Element: r,
OutputWatermarks: sr.OW,
},
}
}
}
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_ProcessBundleSplit{
ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{
ChannelSplits: []*fnpb.ProcessBundleSplitResponse_ChannelSplit{{
TransformId: plan.SourcePTransformID(),
LastPrimaryElement: sr.PI,
FirstResidualElement: sr.RI,
}},
PrimaryRoots: pRoots,
ResidualRoots: rRoots,
},
},
}
case req.GetMonitoringInfos() != nil:
msg := req.GetMonitoringInfos()
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_MonitoringInfos{
MonitoringInfos: &fnpb.MonitoringInfosMetadataResponse{
MonitoringInfo: shortIdsToInfos(msg.GetMonitoringInfoId()),
},
},
}
case req.GetHarnessMonitoringInfos() != nil:
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_HarnessMonitoringInfos{
HarnessMonitoringInfos: &fnpb.HarnessMonitoringInfosResponse{
// TODO(BEAM-11092): Populate with non-bundle metrics data.
MonitoringData: map[string][]byte{},
},
},
}
case req.GetSampleData() != nil:
msg := req.GetSampleData()
var samples = make(map[string]*fnpb.SampleDataResponse_ElementList)
if c.dataSampler != nil {
var elementsMap = c.dataSampler.GetSamples(msg.GetPcollectionIds())
for pid, elements := range elementsMap {
var elementList fnpb.SampleDataResponse_ElementList
for i := range elements {
var sampledElement = &fnpb.SampledElement{
Element: elements[i].Element,
SampleTimestamp: timestamppb.New(elements[i].Timestamp),
}
elementList.Elements = append(elementList.Elements, sampledElement)
}
samples[pid] = &elementList
}
}
return &fnpb.InstructionResponse{
InstructionId: string(instID),
Response: &fnpb.InstructionResponse_SampleData{
SampleData: &fnpb.SampleDataResponse{ElementSamples: samples},
},
}
default:
return fail(ctx, instID, "Unexpected request: %v", req)
}
}