in statefun-sdk-go/v3/pkg/statefun/handler.go [174:253]
func (h *handler) invoke(ctx context.Context, toFunction *protocol.ToFunction) (from *protocol.FromFunction, err error) {
batch := toFunction.GetInvocation()
self := addressFromInternal(batch.Target)
function, exists := h.module[self.FunctionType]
defer func() {
if r := recover(); r != nil {
switch r := r.(type) {
case error:
err = fmt.Errorf("failed to execute invocation for %s: %w", batch.Target, r)
default:
log.Fatal(r)
}
}
}()
if !exists {
return nil, fmt.Errorf("unknown function type %s", self.FunctionType)
}
storageFactory := newStorageFactory(batch, h.stateSpecs[self.FunctionType])
if missing := storageFactory.getMissingSpecs(); missing != nil {
log.Printf("missing state specs for function type %v", self)
for _, spec := range missing {
log.Printf("registering missing specs %v", spec)
}
return &protocol.FromFunction{
Response: &protocol.FromFunction_IncompleteInvocationContext_{
IncompleteInvocationContext: &protocol.FromFunction_IncompleteInvocationContext{
MissingValues: missing,
},
},
}, nil
}
storage := storageFactory.getStorage()
response := &protocol.FromFunction_InvocationResponse{}
for _, invocation := range batch.Invocations {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
sContext := statefunContext{
Mutex: new(sync.Mutex),
self: self,
storage: storage,
response: response,
}
var cancel context.CancelFunc
sContext.Context, cancel = context.WithCancel(ctx)
if invocation.Caller != nil {
caller := addressFromInternal(invocation.Caller)
sContext.caller = &caller
}
msg := Message{
target: batch.Target,
typedValue: invocation.Argument,
}
err = function.Invoke(&sContext, msg)
cancel()
if err != nil {
return
}
}
}
response.StateMutations = storage.getStateMutations()
from = &protocol.FromFunction{
Response: &protocol.FromFunction_InvocationResult{
InvocationResult: response,
},
}
return
}