func()

in lambda/rapidcore/server.go [627:734]


func (s *Server) Invoke(responseWriter http.ResponseWriter, invoke *interop.Invoke) error {
	resetCtx, resetCancel := context.WithCancel(context.Background())
	defer resetCancel()

	timeoutChan := make(chan error)
	go func() {
		select {
		case <-time.After(s.GetInvokeTimeout()):
			log.Debug("Invoke() timeout")
			timeoutChan <- ErrInvokeTimeout
		case <-resetCtx.Done():
			log.Debugf("execute finished, autoreset cancelled")
		}
	}()

	initFailures := s.getInitFailuresChan()
	if initFailures == nil {
		return ErrInitNotStarted
	}

	releaseErrChan := make(chan error)
	releaseSuccessChan := make(chan struct{})
	go func() {
		// This thread can block in one of two method calls Reserve() & AwaitRelease(),
		// corresponding to Init and Invoke phase.
		// FastInvoke is intended to be 'async' response stream copying.
		// When a timeout occurs, we send a 'Reset' with the timeout reason
		// When a Reset is sent, the reset handler in rapid lib cancels existing flows,
		// including init/invoke. This causes either initFailure/invokeFailure, and then
		// the Reset is handled and processed.
		// TODO: however, ideally Reserve() does not block on init, but FastInvoke does
		// The logic would be almost identical, except that init failures could manifest
		// through return values of FastInvoke and not Reserve()

		reserveResp, err := s.Reserve("", "", "")
		if err != nil {
			log.Infof("ReserveFailed: %s", err)
		}

		invoke.DeadlineNs = fmt.Sprintf("%d", metering.Monotime()+reserveResp.Token.FunctionTimeout.Nanoseconds())
		go func() {
			if initCompletionResp, err := s.awaitInitialized(); err != nil {
				switch err {
				case ErrInitResetReceived, ErrInitDoneFailed:
					// For init failures, cache the response so they can be checked later
					// We check if they have not already been set by a call to /init/error by runtime
					if s.getCachedInitErrorResponse() == nil {
						errType, errMsg := initCompletionResp.InitErrorType, initCompletionResp.InitErrorMessage.Error()
						headers := interop.InvokeResponseHeaders{}
						fnError := interop.FunctionError{Type: errType, Message: errMsg}
						s.setCachedInitErrorResponse(&interop.ErrorInvokeResponse{Headers: headers, FunctionError: fnError, Payload: []byte{}})
					}

					// Init failed, so we explicitly shutdown runtime (cleanup unused extensions).
					// Because following fast invoke will start new (supressed) Init phase without reset call
					s.Shutdown(&interop.Shutdown{DeadlineNs: metering.Monotime() + int64(resetDefaultTimeoutMs*1000*1000)})
				}
			}

			if err := s.FastInvoke(responseWriter, invoke, false); err != nil {
				log.Debugf("FastInvoke() error: %s", err)
			}
		}()

		_, err = s.AwaitRelease()
		if err != nil && err != ErrReleaseReservationDone {
			log.Debugf("AwaitRelease() error: %s", err)
			switch err {
			case ErrReleaseReservationDone: // not an error, expected return value when Reset is called
				if s.getCachedInitErrorResponse() != nil {
					// For Init failures, AwaitRelease returns ErrReleaseReservationDone
					// because the Reset calls Release & cancels the release context
					// We rename the error to ErrInitDoneFailed
					releaseErrChan <- ErrInitDoneFailed
				}
			case ErrInitDoneFailed, ErrInvokeDoneFailed:
				// Reset when either init or invoke failrues occur, i.e.
				// init/error, invocation/error, Runtime.ExitError, Extension.ExitError
				s.Reset(autoresetReasonReleaseFail, resetDefaultTimeoutMs)
				releaseErrChan <- err
			default:
				releaseErrChan <- err
			}
			return
		}

		releaseSuccessChan <- struct{}{}
	}()

	var err error
	select {
	case timeoutErr := <-timeoutChan:
		s.Reset(autoresetReasonTimeout, resetDefaultTimeoutMs)
		select {
		case releaseErr := <-releaseErrChan: // when AwaitRelease() has errors
			log.Debugf("Invoke() release error on Execute() timeout: %s", releaseErr)
		case <-releaseSuccessChan: // when AwaitRelease() finishes cleanly
		}
		err = timeoutErr
	case err = <-releaseErrChan:
		log.Debug("Invoke() release error")
	case <-releaseSuccessChan:
		s.Release()
		log.Debug("Invoke() success")
	}

	return err
}