in sdks/go/pkg/beam/core/runtime/exec/translate.go [438:838]
func (b *builder) makeLink(from string, id linkID) (Node, error) {
if n, ok := b.links[id]; ok {
return n, nil
}
// Process all incoming links for the edge and cache them. It thus doesn't matter
// which exact link triggers the Node generation. The link caching is only needed
// to process ParDo side inputs.
transform := b.desc.GetTransforms()[id.to]
urn := transform.GetSpec().GetUrn()
payload := transform.GetSpec().GetPayload()
// TODO(herohde) 1/25/2018: do we need to handle composites?
out, err := b.makePCollections(unmarshalKeyedValues(transform.GetOutputs()))
if err != nil {
return nil, err
}
var u Node
switch urn {
case graphx.URNParDo,
urnPerKeyCombinePre,
urnPerKeyCombineMerge,
urnPerKeyCombineExtract,
urnPerKeyCombineConvert,
urnPairWithRestriction,
urnSplitAndSizeRestrictions,
urnProcessSizedElementsAndRestrictions,
urnTruncateSizedRestrictions:
var data string
var sides map[string]*pipepb.SideInput
var userState map[string]*pipepb.StateSpec
var userTimers map[string]*pipepb.TimerFamilySpec
switch urn {
case graphx.URNParDo,
urnPairWithRestriction,
urnSplitAndSizeRestrictions,
urnProcessSizedElementsAndRestrictions,
urnTruncateSizedRestrictions:
var pardo pipepb.ParDoPayload
if err := proto.Unmarshal(payload, &pardo); err != nil {
return nil, errors.Wrapf(err, "invalid ParDo payload for %v", transform)
}
data = string(pardo.GetDoFn().GetPayload())
sides = pardo.GetSideInputs()
userState = pardo.GetStateSpecs()
userTimers = pardo.GetTimerFamilySpecs()
case urnPerKeyCombinePre, urnPerKeyCombineMerge, urnPerKeyCombineExtract, urnPerKeyCombineConvert:
var cmb pipepb.CombinePayload
if err := proto.Unmarshal(payload, &cmb); err != nil {
return nil, errors.Wrapf(err, "invalid CombinePayload payload for %v", transform)
}
data = string(cmb.GetCombineFn().GetPayload())
default:
// TODO(herohde) 12/4/2017: we see DoFns directly with Dataflow. Handle that
// case here, for now, so that the harness can use this logic.
data = string(payload)
}
// TODO(herohde) 1/28/2018: Once Dataflow's fully off the old way,
// we can simply switch on the ParDo DoFn URN directly.
var tp v1pb.TransformPayload
if err := protox.DecodeBase64(data, &tp); err != nil {
return nil, errors.Wrapf(err, "invalid transform payload for %v", transform)
}
switch tpUrn := tp.GetUrn(); tpUrn {
case graphx.URNDoFn:
op, fn, _, in, _, err := graphx.DecodeMultiEdge(tp.GetEdge())
if err != nil {
return nil, err
}
switch op {
case graph.ParDo:
dofn, err := graph.AsDoFn(fn, graph.MainUnknown)
if err != nil {
return nil, err
}
switch urn {
case urnPairWithRestriction:
u = &PairWithRestriction{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
case urnSplitAndSizeRestrictions:
u = &SplitAndSizeRestrictions{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
case urnTruncateSizedRestrictions:
u = &TruncateSizedRestriction{UID: b.idgen.New(), Fn: dofn, Out: out[0]}
default:
n := &ParDo{UID: b.idgen.New(), Fn: dofn, Inbound: in, Out: out}
n.PID = id.to
input := unmarshalKeyedValues(transform.GetInputs())
if len(userState) > 0 {
stateIDToCoder := make(map[string]*coder.Coder)
stateIDToKeyCoder := make(map[string]*coder.Coder)
stateIDToCombineFn := make(map[string]*graph.CombineFn)
for key, spec := range userState {
var cID string
var kcID string
if rmw := spec.GetReadModifyWriteSpec(); rmw != nil {
cID = rmw.CoderId
} else if bs := spec.GetBagSpec(); bs != nil {
cID = bs.ElementCoderId
} else if cs := spec.GetCombiningSpec(); cs != nil {
cID = cs.AccumulatorCoderId
cmbData := string(cs.GetCombineFn().GetPayload())
var cmbTp v1pb.TransformPayload
if err := protox.DecodeBase64(cmbData, &cmbTp); err != nil {
return nil, errors.Wrapf(err, "invalid transform payload %v for %v", cmbData, transform)
}
_, fn, _, _, _, err := graphx.DecodeMultiEdge(cmbTp.GetEdge())
if err != nil {
return nil, err
}
cfn, err := graph.AsCombineFn(fn)
if err != nil {
return nil, err
}
stateIDToCombineFn[key] = cfn
} else if ms := spec.GetMapSpec(); ms != nil {
cID = ms.ValueCoderId
kcID = ms.KeyCoderId
} else if ss := spec.GetSetSpec(); ss != nil {
kcID = ss.ElementCoderId
} else {
return nil, errors.Errorf("Unrecognized state type %v", spec)
}
if cID != "" {
c, err := b.coders.Coder(cID)
if err != nil {
return nil, err
}
stateIDToCoder[key] = c
} else {
// If no value coder is provided, we are in a keyed state with no values (aka a set).
// We represent a set as an element mapping to a bool representing if it is present or not.
stateIDToCoder[key] = &coder.Coder{Kind: coder.Bool}
}
if kcID != "" {
kc, err := b.coders.Coder(kcID)
if err != nil {
return nil, err
}
stateIDToKeyCoder[key] = kc
}
sid := StreamID{
Port: Port{URL: b.desc.GetStateApiServiceDescriptor().GetUrl()},
PtransformID: id.to,
}
ec, wc, err := b.makeCoderForPCollection(input[0])
if err != nil {
return nil, err
}
n.UState = NewUserStateAdapter(sid, coder.NewW(ec, wc), stateIDToCoder, stateIDToKeyCoder, stateIDToCombineFn)
}
}
if len(userTimers) > 0 {
sID := StreamID{Port: Port{URL: b.desc.GetTimerApiServiceDescriptor().GetUrl()}, PtransformID: id.to}
familyToSpec := map[string]timerFamilySpec{}
for fam, spec := range userTimers {
domain := timers.TimeDomain(spec.GetTimeDomain())
timerCoder, err := b.coders.Coder(spec.GetTimerFamilyCoderId())
if err != nil {
return nil, errors.WithContextf(err, "couldn't retreive coder for timer %v in DoFn %v, ID %v", fam, dofn.Name(), n.PID)
}
familyToSpec[fam] = newTimerFamilySpec(domain, timerCoder)
}
n.TimerTracker = newUserTimerAdapter(sID, familyToSpec)
}
for i := 1; i < len(input); i++ {
// TODO(https://github.com/apache/beam/issues/18602) Handle ViewFns for side inputs
ec, wc, err := b.makeCoderForPCollection(input[i])
if err != nil {
return nil, err
}
sidePB, ok := sides[indexToInputId(i)]
if !ok {
return nil, fmt.Errorf("missing side input info for collection %v", input[i])
}
mapper, err := unmarshalAndMakeWindowMapping(sidePB.GetWindowMappingFn())
if err != nil {
return nil, err
}
sid := StreamID{
Port: Port{URL: b.desc.GetStateApiServiceDescriptor().GetUrl()},
PtransformID: id.to,
}
sideInputID := fmt.Sprintf("i%v", i) // SideInputID (= local id, "iN")
side := NewSideInputAdapter(sid, sideInputID, coder.NewW(ec, wc), mapper)
n.Side = append(n.Side, side)
}
u = n
if urn == urnProcessSizedElementsAndRestrictions {
outputs := make([]string, len(transform.GetOutputs()))
i := 0
for out := range transform.GetOutputs() {
outputs[i] = out
i++
}
u = &ProcessSizedElementsAndRestrictions{PDo: n, TfId: id.to, outputs: outputs}
} else if dofn.IsSplittable() {
u = &SdfFallback{PDo: n}
}
}
case graph.Combine:
cn := &Combine{UID: b.idgen.New(), Out: out[0]}
cn.Fn, err = graph.AsCombineFn(fn)
if err != nil {
return nil, err
}
cn.UsesKey = typex.IsKV(in[0].Type)
cn.PID = id.to
switch urn {
case urnPerKeyCombinePre:
inputs := unmarshalKeyedValues(transform.GetInputs())
if len(inputs) != 1 {
return nil, errors.Errorf("unexpected sideinput to combine: got %d, want 1", len(inputs))
}
ec, wc, err := b.makeCoderForPCollection(inputs[0])
if err != nil {
return nil, err
}
if !coder.IsKV(ec) {
return nil, errors.Errorf("unexpected non-KV coder PCollection input to combine: %v", ec)
}
u = &LiftedCombine{Combine: cn, KeyCoder: ec.Components[0], WindowCoder: wc}
case urnPerKeyCombineMerge:
ma := &MergeAccumulators{Combine: cn}
if pc, ok := ma.Out.(*PCollection); ok {
if eo, ok := pc.Out.(*ExtractOutput); ok {
// Strip PCollections from between MergeAccumulators and ExtractOutputs
// as it's a synthetic PCollection.
b.units = b.units[:len(b.units)-1]
ma.Out = eo
}
}
u = ma
case urnPerKeyCombineExtract:
u = &ExtractOutput{Combine: cn}
case urnPerKeyCombineConvert:
u = &ConvertToAccumulators{Combine: cn}
default: // For unlifted combines
u = cn
}
default:
panic(fmt.Sprintf("Opcode should be one of ParDo or Combine, but it is: %v", op))
}
case graphx.URNIterableSideInputKey:
u = &FixedKey{UID: b.idgen.New(), Key: []byte(iterableSideInputKey), Out: out[0]}
case graphx.URNInject:
c, _, err := b.makeCoderForPCollection(from)
if err != nil {
return nil, err
}
if !coder.IsKV(c) {
return nil, errors.Errorf("unexpected inject coder: %v", c)
}
valCoder := c.Components[1]
// JIRA BEAM-12438 - an extra LP coder can get added here, but isn't added
// on decode. Strip them until we get a better fix.
if valCoder.Kind == coder.LP {
// strip unexpected length prefix coder.
valCoder = valCoder.Components[0]
}
u = &Inject{UID: b.idgen.New(), N: (int)(tp.GetInject().GetN()), ValueEncoder: MakeElementEncoder(valCoder), Out: out[0]}
case graphx.URNExpand:
var pid string
for _, id := range transform.GetOutputs() {
pid = id
}
c, _, err := b.makeCoderForPCollection(pid)
if err != nil {
return nil, err
}
if !coder.IsCoGBK(c) {
return nil, errors.Errorf("unexpected expand coder: %v", c)
}
var decoders []ElementDecoder
for _, dc := range c.Components[1:] {
decoders = append(decoders, MakeElementDecoder(dc))
}
// Strip PCollections from Expand nodes, as CoGBK metrics are handled by
// the DataSource that preceeds them.
trueOut := out[0]
if pcol, ok := trueOut.(*PCollection); ok {
trueOut = pcol.Out
}
b.units = b.units[:len(b.units)-1]
u = &Expand{UID: b.idgen.New(), ValueDecoders: decoders, Out: trueOut}
case graphx.URNReshuffleInput:
_, w, err := b.makeCoderForPCollection(from)
if err != nil {
return nil, err
}
preservedCoderID := tp.GetReshuffle().GetCoderId()
pc, err := unmarshalReshuffleCoders(preservedCoderID, tp.GetReshuffle().GetCoderPayloads())
if err != nil {
return nil, err
}
u = &ReshuffleInput{UID: b.idgen.New(), Seed: rand.Int63(), Coder: coder.NewW(pc, w), Out: out[0]}
case graphx.URNReshuffleOutput:
var pid string
// There's only one output PCollection, and iterating through the map
// is the only way to extract it.
for _, id := range transform.GetOutputs() {
pid = id
}
_, w, err := b.makeCoderForPCollection(pid)
if err != nil {
return nil, err
}
preservedCoderID := tp.GetReshuffle().GetCoderId()
pc, err := unmarshalReshuffleCoders(preservedCoderID, tp.GetReshuffle().GetCoderPayloads())
if err != nil {
return nil, err
}
u = &ReshuffleOutput{UID: b.idgen.New(), Coder: coder.NewW(pc, w), Out: out[0]}
default:
return nil, errors.Errorf("unexpected payload: %v", &tp)
}
case graphx.URNWindow:
var wp pipepb.WindowIntoPayload
if err := proto.Unmarshal(payload, &wp); err != nil {
return nil, errors.Wrapf(err, "invalid WindowInto payload for %v", transform)
}
wfn, err := unmarshalWindowFn(wp.GetWindowFn())
if err != nil {
return nil, err
}
u = &WindowInto{UID: b.idgen.New(), Fn: wfn, Out: out[0]}
case graphx.URNMapWindows:
var fn pipepb.FunctionSpec
if err := proto.Unmarshal(payload, &fn); err != nil {
return nil, errors.Wrapf(err, "invalid SideInput payload for %v", transform)
}
mapper, err := unmarshalAndMakeWindowMapping(&fn)
if err != nil {
return nil, err
}
u = &MapWindows{UID: b.idgen.New(), Fn: mapper, Out: out[0], FnUrn: fn.GetUrn()}
case graphx.URNFlatten:
u = &Flatten{UID: b.idgen.New(), N: len(transform.Inputs), Out: out[0]}
// Use the same flatten instance for all the inputs links to this transform.
for i := 0; i < len(transform.Inputs); i++ {
b.links[linkID{id.to, i}] = u
}
case urnDataSink:
port, cid, err := unmarshalPort(payload)
if err != nil {
return nil, err
}
sink := &DataSink{UID: b.idgen.New()}
sink.SID = StreamID{PtransformID: id.to, Port: port}
sink.Coder, err = b.coders.Coder(cid) // Expected to be windowed coder
if err != nil {
return nil, err
}
if !coder.IsW(sink.Coder) {
return nil, errors.Errorf("unwindowed coder %v on DataSink %v: %v", cid, id, sink.Coder)
}
u = sink
case graphx.URNToString:
u = &ToString{UID: b.idgen.New(), Out: out[0]}
default:
panic(fmt.Sprintf("Unexpected transform URN: %v", urn))
}
b.links[id] = u
b.units = append(b.units, u)
return u, nil
}