func()

in sdks/go/pkg/beam/core/runtime/exec/translate.go [438:838]


func (b *builder) makeLink(from string, id linkID) (Node, error) {
	if n, ok := b.links[id]; ok {
		return n, nil
	}

	// Process all incoming links for the edge and cache them. It thus doesn't matter
	// which exact link triggers the Node generation. The link caching is only needed
	// to process ParDo side inputs.

	transform := b.desc.GetTransforms()[id.to]
	urn := transform.GetSpec().GetUrn()
	payload := transform.GetSpec().GetPayload()

	// TODO(herohde) 1/25/2018: do we need to handle composites?

	out, err := b.makePCollections(unmarshalKeyedValues(transform.GetOutputs()))
	if err != nil {
		return nil, err
	}

	var u Node
	switch urn {
	case graphx.URNParDo,
		urnPerKeyCombinePre,
		urnPerKeyCombineMerge,
		urnPerKeyCombineExtract,
		urnPerKeyCombineConvert,
		urnPairWithRestriction,
		urnSplitAndSizeRestrictions,
		urnProcessSizedElementsAndRestrictions,
		urnTruncateSizedRestrictions:
		var data string
		var sides map[string]*pipepb.SideInput
		var userState map[string]*pipepb.StateSpec
		var userTimers map[string]*pipepb.TimerFamilySpec
		switch urn {
		case graphx.URNParDo,
			urnPairWithRestriction,
			urnSplitAndSizeRestrictions,
			urnProcessSizedElementsAndRestrictions,
			urnTruncateSizedRestrictions:
			var pardo pipepb.ParDoPayload
			if err := proto.Unmarshal(payload, &pardo); err != nil {
				return nil, errors.Wrapf(err, "invalid ParDo payload for %v", transform)
			}
			data = string(pardo.GetDoFn().GetPayload())
			sides = pardo.GetSideInputs()
			userState = pardo.GetStateSpecs()
			userTimers = pardo.GetTimerFamilySpecs()
		case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert:
			var cmb pipepb.CombinePayload
			if err := proto.Unmarshal(payload, &cmb); err != nil {
				return nil, errors.Wrapf(err, "invalid CombinePayload payload for %v", transform)
			}
			data = string(cmb.GetCombineFn().GetPayload())
		default:
			// TODO(herohde) 12/4/2017: we see DoFns directly with Dataflow. Handle that
			// case here, for now, so that the harness can use this logic.

			data = string(payload)
		}

		// TODO(herohde) 1/28/2018: Once Dataflow's fully off the old way,
		// we can simply switch on the ParDo DoFn URN directly.

		var tp v1pb.TransformPayload
		if err := protox.DecodeBase64(data, &tp); err != nil {
			return nil, errors.Wrapf(err, "invalid transform payload for %v", transform)
		}

		switch tpUrn := tp.GetUrn(); tpUrn {
		case graphx.URNDoFn:
			op, fn, _, in, _, err := graphx.DecodeMultiEdge(tp.GetEdge())
			if err != nil {
				return nil, err
			}

			switch op {
			case graph.ParDo:
				dofn, err := graph.AsDoFn(fn, graph.MainUnknown)
				if err != nil {
					return nil, err
				}
				switch urn {
				case urnPairWithRestriction:
					u = &PairWithRestriction{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
				case urnSplitAndSizeRestrictions:
					u = &SplitAndSizeRestrictions{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
				case urnTruncateSizedRestrictions:
					u = &TruncateSizedRestriction{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
				default:
					n := &ParDo{UID: b.idgen.New(), Fn: dofn, Inbound: in, Out: out}
					n.PID = id.to

					input := unmarshalKeyedValues(transform.GetInputs())

					if len(userState) > 0 {
						stateIDToCoder := make(map[string]*coder.Coder)
						stateIDToKeyCoder := make(map[string]*coder.Coder)
						stateIDToCombineFn := make(map[string]*graph.CombineFn)
						for key, spec := range userState {
							var cID string
							var kcID string
							if rmw := spec.GetReadModifyWriteSpec(); rmw != nil {
								cID = rmw.CoderId
							} else if bs := spec.GetBagSpec(); bs != nil {
								cID = bs.ElementCoderId
							} else if cs := spec.GetCombiningSpec(); cs != nil {
								cID = cs.AccumulatorCoderId
								cmbData := string(cs.GetCombineFn().GetPayload())
								var cmbTp v1pb.TransformPayload
								if err := protox.DecodeBase64(cmbData, &cmbTp); err != nil {
									return nil, errors.Wrapf(err, "invalid transform payload %v for %v", cmbData, transform)
								}
								_, fn, _, _, _, err := graphx.DecodeMultiEdge(cmbTp.GetEdge())
								if err != nil {
									return nil, err
								}
								cfn, err := graph.AsCombineFn(fn)
								if err != nil {
									return nil, err
								}
								stateIDToCombineFn[key] = cfn
							} else if ms := spec.GetMapSpec(); ms != nil {
								cID = ms.ValueCoderId
								kcID = ms.KeyCoderId
							} else if ss := spec.GetSetSpec(); ss != nil {
								kcID = ss.ElementCoderId
							} else {
								return nil, errors.Errorf("Unrecognized state type %v", spec)
							}
							if cID != "" {
								c, err := b.coders.Coder(cID)
								if err != nil {
									return nil, err
								}
								stateIDToCoder[key] = c
							} else {
								// If no value coder is provided, we are in a keyed state with no values (aka a set).
								// We represent a set as an element mapping to a bool representing if it is present or not.
								stateIDToCoder[key] = &coder.Coder{Kind: coder.Bool}
							}
							if kcID != "" {
								kc, err := b.coders.Coder(kcID)
								if err != nil {
									return nil, err
								}
								stateIDToKeyCoder[key] = kc
							}
							sid := StreamID{
								Port:         Port{URL: b.desc.GetStateApiServiceDescriptor().GetUrl()},
								PtransformID: id.to,
							}

							ec, wc, err := b.makeCoderForPCollection(input[0])
							if err != nil {
								return nil, err
							}
							n.UState = NewUserStateAdapter(sid, coder.NewW(ec, wc), stateIDToCoder, stateIDToKeyCoder, stateIDToCombineFn)
						}
					}

					if len(userTimers) > 0 {
						sID := StreamID{Port: Port{URL: b.desc.GetTimerApiServiceDescriptor().GetUrl()}, PtransformID: id.to}

						familyToSpec := map[string]timerFamilySpec{}
						for fam, spec := range userTimers {
							domain := timers.TimeDomain(spec.GetTimeDomain())
							timerCoder, err := b.coders.Coder(spec.GetTimerFamilyCoderId())
							if err != nil {
								return nil, errors.WithContextf(err, "couldn't retreive coder for timer %v in DoFn %v, ID %v", fam, dofn.Name(), n.PID)
							}
							familyToSpec[fam] = newTimerFamilySpec(domain, timerCoder)
						}
						n.TimerTracker = newUserTimerAdapter(sID, familyToSpec)
					}

					for i := 1; i < len(input); i++ {
						// TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs

						ec, wc, err := b.makeCoderForPCollection(input[i])
						if err != nil {
							return nil, err
						}

						sidePB, ok := sides[indexToInputId(i)]
						if !ok {
							return nil, fmt.Errorf("missing side input info for collection %v", input[i])
						}

						mapper, err := unmarshalAndMakeWindowMapping(sidePB.GetWindowMappingFn())
						if err != nil {
							return nil, err
						}

						sid := StreamID{
							Port:         Port{URL: b.desc.GetStateApiServiceDescriptor().GetUrl()},
							PtransformID: id.to,
						}
						sideInputID := fmt.Sprintf("i%v", i) // SideInputID (= local id, "iN")
						side := NewSideInputAdapter(sid, sideInputID, coder.NewW(ec, wc), mapper)
						n.Side = append(n.Side, side)
					}
					u = n
					if urn == urnProcessSizedElementsAndRestrictions {
						outputs := make([]string, len(transform.GetOutputs()))
						i := 0
						for out := range transform.GetOutputs() {
							outputs[i] = out
							i++
						}
						u = &ProcessSizedElementsAndRestrictions{PDo: n, TfId: id.to, outputs: outputs}
					} else if dofn.IsSplittable() {
						u = &SdfFallback{PDo: n}
					}
				}

			case graph.Combine:
				cn := &Combine{UID: b.idgen.New(), Out: out[0]}
				cn.Fn, err = graph.AsCombineFn(fn)
				if err != nil {
					return nil, err
				}
				cn.UsesKey = typex.IsKV(in[0].Type)

				cn.PID = id.to

				switch urn {
				case urnPerKeyCombinePre:
					inputs := unmarshalKeyedValues(transform.GetInputs())
					if len(inputs) != 1 {
						return nil, errors.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
					}
					ec, wc, err := b.makeCoderForPCollection(inputs[0])
					if err != nil {
						return nil, err
					}
					if !coder.IsKV(ec) {
						return nil, errors.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
					}
					u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0], WindowCoder: wc}
				case urnPerKeyCombineMerge:
					ma := &MergeAccumulators{Combine: cn}
					if pc, ok := ma.Out.(*PCollection); ok {
						if eo, ok := pc.Out.(*ExtractOutput); ok {
							// Strip PCollections from between MergeAccumulators and ExtractOutputs
							// as it's a synthetic PCollection.
							b.units = b.units[:len(b.units)-1]
							ma.Out = eo
						}
					}
					u = ma
				case urnPerKeyCombineExtract:
					u = &ExtractOutput{Combine: cn}
				case urnPerKeyCombineConvert:
					u = &ConvertToAccumulators{Combine: cn}
				default: // For unlifted combines
					u = cn
				}
			default:
				panic(fmt.Sprintf("Opcode should be one of ParDo or Combine, but it is: %v", op))
			}

		case graphx.URNIterableSideInputKey:
			u = &FixedKey{UID: b.idgen.New(), Key: []byte(iterableSideInputKey), Out: out[0]}

		case graphx.URNInject:
			c, _, err := b.makeCoderForPCollection(from)
			if err != nil {
				return nil, err
			}
			if !coder.IsKV(c) {
				return nil, errors.Errorf("unexpected inject coder: %v", c)
			}
			valCoder := c.Components[1]
			// JIRA BEAM-12438 - an extra LP coder can get added here, but isn't added
			// on decode. Strip them until we get a better fix.
			if valCoder.Kind == coder.LP {
				// strip unexpected length prefix coder.
				valCoder = valCoder.Components[0]
			}
			u = &Inject{UID: b.idgen.New(), N: (int)(tp.GetInject().GetN()), ValueEncoder: MakeElementEncoder(valCoder), Out: out[0]}

		case graphx.URNExpand:
			var pid string
			for _, id := range transform.GetOutputs() {
				pid = id
			}
			c, _, err := b.makeCoderForPCollection(pid)
			if err != nil {
				return nil, err
			}
			if !coder.IsCoGBK(c) {
				return nil, errors.Errorf("unexpected expand coder: %v", c)
			}

			var decoders []ElementDecoder
			for _, dc := range c.Components[1:] {
				decoders = append(decoders, MakeElementDecoder(dc))
			}
			// Strip PCollections from Expand nodes, as CoGBK metrics are handled by
			// the DataSource that preceeds them.
			trueOut := out[0]
			if pcol, ok := trueOut.(*PCollection); ok {
				trueOut = pcol.Out
			}
			b.units = b.units[:len(b.units)-1]
			u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: trueOut}

		case graphx.URNReshuffleInput:
			_, w, err := b.makeCoderForPCollection(from)
			if err != nil {
				return nil, err
			}
			preservedCoderID := tp.GetReshuffle().GetCoderId()
			pc, err := unmarshalReshuffleCoders(preservedCoderID, tp.GetReshuffle().GetCoderPayloads())
			if err != nil {
				return nil, err
			}
			u = &ReshuffleInput{UID: b.idgen.New(), Seed: rand.Int63(), Coder: coder.NewW(pc, w), Out: out[0]}

		case graphx.URNReshuffleOutput:
			var pid string
			// There's only one output PCollection, and iterating through the map
			// is the only way to extract it.
			for _, id := range transform.GetOutputs() {
				pid = id
			}
			_, w, err := b.makeCoderForPCollection(pid)
			if err != nil {
				return nil, err
			}
			preservedCoderID := tp.GetReshuffle().GetCoderId()
			pc, err := unmarshalReshuffleCoders(preservedCoderID, tp.GetReshuffle().GetCoderPayloads())
			if err != nil {
				return nil, err
			}
			u = &ReshuffleOutput{UID: b.idgen.New(), Coder: coder.NewW(pc, w), Out: out[0]}

		default:
			return nil, errors.Errorf("unexpected payload: %v", &tp)
		}

	case graphx.URNWindow:
		var wp pipepb.WindowIntoPayload
		if err := proto.Unmarshal(payload, &wp); err != nil {
			return nil, errors.Wrapf(err, "invalid WindowInto payload for %v", transform)
		}
		wfn, err := unmarshalWindowFn(wp.GetWindowFn())
		if err != nil {
			return nil, err
		}
		u = &WindowInto{UID: b.idgen.New(), Fn: wfn, Out: out[0]}

	case graphx.URNMapWindows:
		var fn pipepb.FunctionSpec
		if err := proto.Unmarshal(payload, &fn); err != nil {
			return nil, errors.Wrapf(err, "invalid SideInput payload for %v", transform)
		}
		mapper, err := unmarshalAndMakeWindowMapping(&fn)
		if err != nil {
			return nil, err
		}
		u = &MapWindows{UID: b.idgen.New(), Fn: mapper, Out: out[0], FnUrn: fn.GetUrn()}

	case graphx.URNFlatten:
		u = &Flatten{UID: b.idgen.New(), N: len(transform.Inputs), Out: out[0]}

		// Use the same flatten instance for all the inputs links to this transform.
		for i := 0; i < len(transform.Inputs); i++ {
			b.links[linkID{id.to, i}] = u
		}

	case urnDataSink:
		port, cid, err := unmarshalPort(payload)
		if err != nil {
			return nil, err
		}

		sink := &DataSink{UID: b.idgen.New()}
		sink.SID = StreamID{PtransformID: id.to, Port: port}
		sink.Coder, err = b.coders.Coder(cid) // Expected to be windowed coder
		if err != nil {
			return nil, err
		}
		if !coder.IsW(sink.Coder) {
			return nil, errors.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder)
		}
		u = sink

	case graphx.URNToString:
		u = &ToString{UID: b.idgen.New(), Out: out[0]}

	default:
		panic(fmt.Sprintf("Unexpected transform URN: %v", urn))
	}

	b.links[id] = u
	b.units = append(b.units, u)
	return u, nil
}