in lambda/rapidcore/server.go [627:734]
func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
resetCtx, resetCancel := context.WithCancel(context.Background())
defer resetCancel()
timeoutChan := make(chan error)
go func() {
select {
case <-time.After(s.GetInvokeTimeout()):
log.Debug("Invoke() timeout")
timeoutChan <- ErrInvokeTimeout
case <-resetCtx.Done():
log.Debugf("execute finished, autoreset cancelled")
}
}()
initFailures := s.getInitFailuresChan()
if initFailures == nil {
return ErrInitNotStarted
}
releaseErrChan := make(chan error)
releaseSuccessChan := make(chan struct{})
go func() {
// This thread can block in one of two method calls Reserve() & AwaitRelease(),
// corresponding to Init and Invoke phase.
// FastInvoke is intended to be 'async' response stream copying.
// When a timeout occurs, we send a 'Reset' with the timeout reason
// When a Reset is sent, the reset handler in rapid lib cancels existing flows,
// including init/invoke. This causes either initFailure/invokeFailure, and then
// the Reset is handled and processed.
// TODO: however, ideally Reserve() does not block on init, but FastInvoke does
// The logic would be almost identical, except that init failures could manifest
// through return values of FastInvoke and not Reserve()
reserveResp, err := s.Reserve("", "", "")
if err != nil {
log.Infof("ReserveFailed: %s", err)
}
invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+reserveResp.Token.FunctionTimeout.Nanoseconds())
go func() {
if initCompletionResp, err := s.awaitInitialized(); err != nil {
switch err {
case ErrInitResetReceived, ErrInitDoneFailed:
// For init failures, cache the response so they can be checked later
// We check if they have not already been set by a call to /init/error by runtime
if s.getCachedInitErrorResponse() == nil {
errType, errMsg := initCompletionResp.InitErrorType, initCompletionResp.InitErrorMessage.Error()
headers := interop.InvokeResponseHeaders{}
fnError := interop.FunctionError{Type: errType, Message: errMsg}
s.setCachedInitErrorResponse(&interop.ErrorInvokeResponse{Headers: headers, FunctionError: fnError, Payload: []byte{}})
}
// Init failed, so we explicitly shutdown runtime (cleanup unused extensions).
// Because following fast invoke will start new (supressed) Init phase without reset call
s.Shutdown(&interop.Shutdown{DeadlineNs: metering.Monotime() + int64(resetDefaultTimeoutMs*1000*1000)})
}
}
if err := s.FastInvoke(responseWriter, invoke, false); err != nil {
log.Debugf("FastInvoke() error: %s", err)
}
}()
_, err = s.AwaitRelease()
if err != nil && err != ErrReleaseReservationDone {
log.Debugf("AwaitRelease() error: %s", err)
switch err {
case ErrReleaseReservationDone: // not an error, expected return value when Reset is called
if s.getCachedInitErrorResponse() != nil {
// For Init failures, AwaitRelease returns ErrReleaseReservationDone
// because the Reset calls Release & cancels the release context
// We rename the error to ErrInitDoneFailed
releaseErrChan <- ErrInitDoneFailed
}
case ErrInitDoneFailed, ErrInvokeDoneFailed:
// Reset when either init or invoke failrues occur, i.e.
// init/error, invocation/error, Runtime.ExitError, Extension.ExitError
s.Reset(autoresetReasonReleaseFail, resetDefaultTimeoutMs)
releaseErrChan <- err
default:
releaseErrChan <- err
}
return
}
releaseSuccessChan <- struct{}{}
}()
var err error
select {
case timeoutErr := <-timeoutChan:
s.Reset(autoresetReasonTimeout, resetDefaultTimeoutMs)
select {
case releaseErr := <-releaseErrChan: // when AwaitRelease() has errors
log.Debugf("Invoke() release error on Execute() timeout: %s", releaseErr)
case <-releaseSuccessChan: // when AwaitRelease() finishes cleanly
}
err = timeoutErr
case err = <-releaseErrChan:
log.Debug("Invoke() release error")
case <-releaseSuccessChan:
s.Release()
log.Debug("Invoke() success")
}
return err
}