func()

in sdks/go/pkg/beam/core/runtime/harness/harness.go [363:693]


func (c *control) handleInstruction(ctx context.Context, req *fnpb.InstructionRequest) *fnpb.InstructionResponse {
	instID := instructionID(req.GetInstructionId())
	ctx = metrics.SetBundleID(ctx, string(instID))

	switch {
	case req.GetRegister() != nil:
		msg := req.GetRegister()

		c.mu.Lock()
		for _, desc := range msg.GetProcessBundleDescriptor() {
			c.descriptors[bundleDescriptorID(desc.GetId())] = desc
		}
		c.mu.Unlock()

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_Register{
				Register: &fnpb.RegisterResponse{},
			},
		}

	case req.GetProcessBundle() != nil:
		msg := req.GetProcessBundle()

		// NOTE: the harness sends a 0-length process bundle request to sources (changed?)
		bdID := bundleDescriptorID(msg.GetProcessBundleDescriptorId())

		// TODO(lostluck): 2023/03/29 fix debug level logging to be flagged.
		// log.Debugf(ctx, "PB [%v]: %v", instID, msg)
		plan, err := c.getOrCreatePlan(bdID)

		// Make the plan active.
		c.mu.Lock()
		c.inactive.Remove(instID)
		c.active[instID] = plan
		// Get the user metrics store for this bundle.
		store := metrics.GetStore(ctx)
		c.metStore[instID] = store
		c.mu.Unlock()

		if err != nil {
			c.failed[instID] = err
			return fail(ctx, instID, "ProcessBundle failed: %v", err)
		}

		tokens := msg.GetCacheTokens()
		c.cache.SetValidTokens(tokens...)

		data := NewScopedDataManager(c.data, instID)
		state := NewScopedStateReaderWithCache(c.state, instID, c.cache)

		sampler := newSampler(store)
		go sampler.start(ctx, samplePeriod)

		err = plan.Execute(ctx, string(instID), exec.DataContext{Data: data, State: state})

		sampler.stop()

		dataError := data.Close()
		state.Close()

		c.cache.CompleteBundle(tokens...)

		mons, pylds, _ := monitoring(plan, store, c.runnerCapabilities[URNMonitoringInfoShortID])

		checkpoints := plan.Checkpoint()
		requiresFinalization := false
		// Move the plan back to the candidate state
		c.mu.Lock()
		// Mark the instruction as failed.
		if err != nil {
			c.failed[instID] = err
		} else if dataError != io.EOF && dataError != nil {
			// If there was an error on the data channel reads, fail this bundle
			// since we may have had a short read.
			c.failed[instID] = dataError
			err = dataError
		} else {
			// Non failure plans should either be moved to the finalized state
			// or to plans so they can be re-used.
			expiration := plan.GetExpirationTime()
			if time.Now().Before(expiration) {
				// TODO(BEAM-10976) - we can be a little smarter about data structures here by
				// by storing plans awaiting finalization in a heap. That way when we expire plans
				// here its O(1) instead of O(n) (though adding/finalizing will still be O(logn))
				requiresFinalization = true
				c.awaitingFinalization[instID] = awaitingFinalization{
					expiration: expiration,
					plan:       plan,
					bdID:       bdID,
				}
				// Move any plans that have exceeded their expiration back into the re-use pool
				for id, af := range c.awaitingFinalization {
					if time.Now().After(af.expiration) {
						c.plans[af.bdID] = append(c.plans[af.bdID], af.plan)
						delete(c.awaitingFinalization, id)
					}
				}
			} else {
				c.plans[bdID] = append(c.plans[bdID], plan)
			}
		}

		var rRoots []*fnpb.DelayedBundleApplication
		if len(checkpoints) > 0 {
			for _, cp := range checkpoints {
				for _, r := range cp.SR.RS {
					rRoots = append(rRoots, &fnpb.DelayedBundleApplication{
						Application: &fnpb.BundleApplication{
							TransformId:      cp.SR.TId,
							InputId:          cp.SR.InId,
							Element:          r,
							OutputWatermarks: cp.SR.OW,
						},
						RequestedTimeDelay: durationpb.New(cp.Reapply),
					})
				}
			}
		}

		delete(c.active, instID)
		if removed, ok := c.inactive.Insert(instID); ok {
			delete(c.failed, removed) // Also GC old failed bundles.
		}
		delete(c.metStore, instID)

		c.mu.Unlock()

		if err != nil {
			return fail(ctx, instID, "process bundle failed for instruction %v using plan %v : %v", instID, bdID, err)
		}
		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_ProcessBundle{
				ProcessBundle: &fnpb.ProcessBundleResponse{
					ResidualRoots:        rRoots,
					MonitoringData:       pylds,
					MonitoringInfos:      mons,
					RequiresFinalization: requiresFinalization,
				},
			},
		}

	case req.GetFinalizeBundle() != nil:
		msg := req.GetFinalizeBundle()

		ref := instructionID(msg.GetInstructionId())

		af, ok := c.awaitingFinalization[ref]
		if !ok {
			return fail(ctx, instID, "finalize bundle failed for instruction %v: couldn't find plan in finalizing map", ref)
		}

		if time.Now().Before(af.expiration) {
			if err := af.plan.Finalize(); err != nil {
				return fail(ctx, instID, "finalize bundle failed for instruction %v using plan %v : %v", ref, af.bdID, err)
			}
		}
		c.plans[af.bdID] = append(c.plans[af.bdID], af.plan)
		delete(c.awaitingFinalization, ref)

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_FinalizeBundle{
				FinalizeBundle: &fnpb.FinalizeBundleResponse{},
			},
		}

	case req.GetProcessBundleProgress() != nil:
		msg := req.GetProcessBundleProgress()

		ref := instructionID(msg.GetInstructionId())

		plan, store, resp := c.getPlanOrResponse(ctx, "progress", instID, ref)
		if resp != nil {
			return resp
		}
		if plan == nil && resp == nil {
			return &fnpb.InstructionResponse{
				InstructionId: string(instID),
				Response: &fnpb.InstructionResponse_ProcessBundleProgress{
					ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{},
				},
			}
		}

		mons, pylds, consumingReceivedData := monitoring(plan, store, c.runnerCapabilities[URNMonitoringInfoShortID])

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_ProcessBundleProgress{
				ProcessBundleProgress: &fnpb.ProcessBundleProgressResponse{
					MonitoringData:        pylds,
					MonitoringInfos:       mons,
					ConsumingReceivedData: &consumingReceivedData,
				},
			},
		}

	case req.GetProcessBundleSplit() != nil:
		msg := req.GetProcessBundleSplit()

		// TODO(lostluck): 2023/03/29 fix debug level logging to be flagged.
		// log.Debugf(ctx, "PB Split: %v", msg)
		ref := instructionID(msg.GetInstructionId())

		plan, _, resp := c.getPlanOrResponse(ctx, "split", instID, ref)
		if resp != nil {
			return resp
		}
		if plan == nil {
			return &fnpb.InstructionResponse{
				InstructionId: string(instID),
				Response: &fnpb.InstructionResponse_ProcessBundleSplit{
					ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
				},
			}
		}

		// Get the desired splits for the root FnAPI read operation.
		ds := msg.GetDesiredSplits()[plan.SourcePTransformID()]
		if ds == nil {
			return fail(ctx, instID, "failed to split: desired splits for root of %v was empty.", ref)
		}
		sr, err := plan.Split(ctx, exec.SplitPoints{
			Splits:  ds.GetAllowedSplitPoints(),
			Frac:    ds.GetFractionOfRemainder(),
			BufSize: ds.GetEstimatedInputElements(),
		})

		if err != nil {
			return fail(ctx, instID, "unable to split %v: %v", ref, err)
		}

		// Unsuccessful splits without errors indicate we should return an empty response,
		// as processing can continue.
		if sr.Unsuccessful {
			return &fnpb.InstructionResponse{
				InstructionId: string(instID),
				Response: &fnpb.InstructionResponse_ProcessBundleSplit{
					ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{},
				},
			}
		}

		var pRoots []*fnpb.BundleApplication
		var rRoots []*fnpb.DelayedBundleApplication
		if len(sr.PS) > 0 && len(sr.RS) > 0 {
			pRoots = make([]*fnpb.BundleApplication, len(sr.PS))
			for i, p := range sr.PS {
				pRoots[i] = &fnpb.BundleApplication{
					TransformId: sr.TId,
					InputId:     sr.InId,
					Element:     p,
				}
			}
			rRoots = make([]*fnpb.DelayedBundleApplication, len(sr.RS))
			for i, r := range sr.RS {
				rRoots[i] = &fnpb.DelayedBundleApplication{
					Application: &fnpb.BundleApplication{
						TransformId:      sr.TId,
						InputId:          sr.InId,
						Element:          r,
						OutputWatermarks: sr.OW,
					},
				}
			}
		}

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_ProcessBundleSplit{
				ProcessBundleSplit: &fnpb.ProcessBundleSplitResponse{
					ChannelSplits: []*fnpb.ProcessBundleSplitResponse_ChannelSplit{{
						TransformId:          plan.SourcePTransformID(),
						LastPrimaryElement:   sr.PI,
						FirstResidualElement: sr.RI,
					}},
					PrimaryRoots:  pRoots,
					ResidualRoots: rRoots,
				},
			},
		}
	case req.GetMonitoringInfos() != nil:
		msg := req.GetMonitoringInfos()
		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_MonitoringInfos{
				MonitoringInfos: &fnpb.MonitoringInfosMetadataResponse{
					MonitoringInfo: shortIdsToInfos(msg.GetMonitoringInfoId()),
				},
			},
		}
	case req.GetHarnessMonitoringInfos() != nil:
		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_HarnessMonitoringInfos{
				HarnessMonitoringInfos: &fnpb.HarnessMonitoringInfosResponse{
					// TODO(BEAM-11092): Populate with non-bundle metrics data.
					MonitoringData: map[string][]byte{},
				},
			},
		}
	case req.GetSampleData() != nil:
		msg := req.GetSampleData()
		var samples = make(map[string]*fnpb.SampleDataResponse_ElementList)
		if c.dataSampler != nil {
			var elementsMap = c.dataSampler.GetSamples(msg.GetPcollectionIds())
			for pid, elements := range elementsMap {
				var elementList fnpb.SampleDataResponse_ElementList
				for i := range elements {
					var sampledElement = &fnpb.SampledElement{
						Element:         elements[i].Element,
						SampleTimestamp: timestamppb.New(elements[i].Timestamp),
					}
					elementList.Elements = append(elementList.Elements, sampledElement)
				}
				samples[pid] = &elementList
			}
		}

		return &fnpb.InstructionResponse{
			InstructionId: string(instID),
			Response: &fnpb.InstructionResponse_SampleData{
				SampleData: &fnpb.SampleDataResponse{ElementSamples: samples},
			},
		}
	default:
		return fail(ctx, instID, "Unexpected request: %v", req)
	}
}