in sdks/go/pkg/beam/register/register.go [8215:8455]
func Combiner3[T0, T1, T2 any](accum any) {
registerCombinerTypes(accum)
accumVal := reflect.ValueOf(accum)
var mergeAccumulatorsWrapper func(fn any) reflectx.Func
if _, ok := accum.(mergeAccumulators2x2[T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T0) (T0, error))
return &caller2x2[T0, T0, T0, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) (T0, error))(nil)).Elem(), caller)
mergeAccumulatorsWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T0) (T0, error) {
return fn.(mergeAccumulators2x2[T0]).MergeAccumulators(a0, a1)
})
}
} else if _, ok := accum.(mergeAccumulators2x1[T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T0) T0)
return &caller2x1[T0, T0, T0]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) T0)(nil)).Elem(), caller)
mergeAccumulatorsWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T0) T0 {
return fn.(mergeAccumulators2x1[T0]).MergeAccumulators(a0, a1)
})
}
}
if mergeAccumulatorsWrapper == nil {
panic(fmt.Sprintf("Failed to optimize MergeAccumulators for combiner %v. Failed to infer types", accum))
}
var createAccumulatorWrapper func(fn any) reflectx.Func
if _, ok := accum.(createAccumulator0x2[T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func() (T0, error))
return &caller0x2[T0, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func() (T0, error))(nil)).Elem(), caller)
createAccumulatorWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func() (T0, error) {
return fn.(createAccumulator0x2[T0]).CreateAccumulator()
})
}
} else if _, ok := accum.(createAccumulator0x1[T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func() T0)
return &caller0x1[T0]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func() T0)(nil)).Elem(), caller)
createAccumulatorWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func() T0 {
return fn.(createAccumulator0x1[T0]).CreateAccumulator()
})
}
}
if m := accumVal.MethodByName("CreateAccumulator"); m.IsValid() && createAccumulatorWrapper == nil {
panic(fmt.Sprintf("Failed to optimize CreateAccumulator for combiner %v. Failed to infer types", accum))
}
var addInputWrapper func(fn any) reflectx.Func
if _, ok := accum.(addInput2x2[T0, T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T0) (T0, error))
return &caller2x2[T0, T0, T0, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) (T0, error))(nil)).Elem(), caller)
addInputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T0) (T0, error) {
return fn.(addInput2x2[T0, T0]).AddInput(a0, a1)
})
}
} else if _, ok := accum.(addInput2x1[T0, T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T0) T0)
return &caller2x1[T0, T0, T0]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T0) T0)(nil)).Elem(), caller)
addInputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T0) T0 {
return fn.(addInput2x1[T0, T0]).AddInput(a0, a1)
})
}
} else if _, ok := accum.(addInput2x2[T0, T1]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T1) (T0, error))
return &caller2x2[T0, T1, T0, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T1) (T0, error))(nil)).Elem(), caller)
addInputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T1) (T0, error) {
return fn.(addInput2x2[T0, T1]).AddInput(a0, a1)
})
}
} else if _, ok := accum.(addInput2x1[T0, T1]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T1) T0)
return &caller2x1[T0, T1, T0]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T1) T0)(nil)).Elem(), caller)
addInputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T1) T0 {
return fn.(addInput2x1[T0, T1]).AddInput(a0, a1)
})
}
} else if _, ok := accum.(addInput2x2[T0, T2]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T2) (T0, error))
return &caller2x2[T0, T2, T0, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T2) (T0, error))(nil)).Elem(), caller)
addInputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T2) (T0, error) {
return fn.(addInput2x2[T0, T2]).AddInput(a0, a1)
})
}
} else if _, ok := accum.(addInput2x1[T0, T2]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0, T2) T0)
return &caller2x1[T0, T2, T0]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0, T2) T0)(nil)).Elem(), caller)
addInputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0, a1 T2) T0 {
return fn.(addInput2x1[T0, T2]).AddInput(a0, a1)
})
}
}
if m := accumVal.MethodByName("AddInput"); m.IsValid() && addInputWrapper == nil {
panic(fmt.Sprintf("Failed to optimize AddInput for combiner %v. Failed to infer types", accum))
}
var extractOutputWrapper func(fn any) reflectx.Func
if _, ok := accum.(extractOutput1x2[T0, T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0) (T0, error))
return &caller1x2[T0, T0, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0) (T0, error))(nil)).Elem(), caller)
extractOutputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0) (T0, error) {
return fn.(extractOutput1x2[T0, T0]).ExtractOutput(a0)
})
}
} else if _, ok := accum.(extractOutput1x1[T0, T0]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0) T0)
return &caller1x1[T0, T0]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0) T0)(nil)).Elem(), caller)
extractOutputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0) T0 {
return fn.(extractOutput1x1[T0, T0]).ExtractOutput(a0)
})
}
} else if _, ok := accum.(extractOutput1x2[T0, T1]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0) (T1, error))
return &caller1x2[T0, T1, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0) (T1, error))(nil)).Elem(), caller)
extractOutputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0) (T1, error) {
return fn.(extractOutput1x2[T0, T1]).ExtractOutput(a0)
})
}
} else if _, ok := accum.(extractOutput1x1[T0, T1]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0) T1)
return &caller1x1[T0, T1]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0) T1)(nil)).Elem(), caller)
extractOutputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0) T1 {
return fn.(extractOutput1x1[T0, T1]).ExtractOutput(a0)
})
}
} else if _, ok := accum.(extractOutput1x2[T0, T2]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0) (T2, error))
return &caller1x2[T0, T2, error]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0) (T2, error))(nil)).Elem(), caller)
extractOutputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0) (T2, error) {
return fn.(extractOutput1x2[T0, T2]).ExtractOutput(a0)
})
}
} else if _, ok := accum.(extractOutput1x1[T0, T2]); ok {
caller := func(fn any) reflectx.Func {
f := fn.(func(T0) T2)
return &caller1x1[T0, T2]{fn: f}
}
reflectx.RegisterFunc(reflect.TypeOf((*func(T0) T2)(nil)).Elem(), caller)
extractOutputWrapper = func(fn any) reflectx.Func {
return reflectx.MakeFunc(func(a0 T0) T2 {
return fn.(extractOutput1x1[T0, T2]).ExtractOutput(a0)
})
}
}
if m := accumVal.MethodByName("ExtractOutput"); m.IsValid() && extractOutputWrapper == nil {
panic(fmt.Sprintf("Failed to optimize ExtractOutput for combiner %v. Failed to infer types", accum))
}
wrapperFn := func(fn any) map[string]reflectx.Func {
m := map[string]reflectx.Func{}
if mergeAccumulatorsWrapper != nil {
m["MergeAccumulators"] = mergeAccumulatorsWrapper(fn)
}
if createAccumulatorWrapper != nil {
m["CreateAccumulator"] = createAccumulatorWrapper(fn)
}
if addInputWrapper != nil {
m["AddInput"] = addInputWrapper(fn)
}
if extractOutputWrapper != nil {
m["ExtractOutput"] = extractOutputWrapper(fn)
}
return m
}
reflectx.RegisterStructWrapper(reflect.TypeOf(accum).Elem(), wrapperFn)
}