in lambda/rapid/start.go [658:699]
func start(signalCtx context.Context, execCtx *rapidContext) {
watchdog := core.NewWatchdog(execCtx.registrationService.InitFlow(), execCtx.invokeFlow, execCtx.exitPidChan, execCtx.appCtx)
interopServer := execCtx.interopServer
// Start Runtime API Server
err := execCtx.server.Listen()
if err != nil {
log.WithError(err).Panic("Runtime API Server failed to listen")
}
go func() { execCtx.server.Serve(signalCtx) }()
// Note, most of initialization code should run before blocking to receive START,
// code before START runs in parallel with code downloads.
go func() {
for {
reset := <-interopServer.ResetChan()
// In the event of a Reset during init/invoke, CancelFlows cancels execution
// flows and return with the errResetReceived err - this error is special-cased
// and not handled by the init/invoke (unexpected) error handling functions
watchdog.CancelFlows(errResetReceived)
execCtx.resetChan <- reset
}
}()
for {
select {
case start := <-interopServer.StartChan():
handleStart(signalCtx, execCtx, watchdog, start)
case invoke := <-interopServer.InvokeChan():
handleInvoke(signalCtx, execCtx, watchdog, invoke)
case err := <-interopServer.TransportErrorChan():
log.Panicf("Transport error emitted by interop server: %s", err)
case reset := <-execCtx.resetChan:
handleReset(execCtx, watchdog, reset)
case shutdown := <-interopServer.ShutdownChan(): // only in standalone mode
handleShutdown(execCtx, watchdog, shutdown, standaloneShutdownReason)
}
}
}