in lambda/rapid/start.go [167:260]
func doInit(ctx context.Context, execCtx *rapidContext, watchdog *core.Watchdog) error {
execCtx.xray.RecordInitStartTime()
defer execCtx.xray.RecordInitEndTime()
if extensions.AreEnabled() {
defer func() {
logAgentsInitStatus(execCtx)
}()
if err := doInitExtensions(execCtx, watchdog); err != nil {
return err
}
}
initFlow := execCtx.registrationService.InitFlow()
// Runtime state machine
runtime := core.NewRuntime(initFlow, execCtx.invokeFlow)
// Registration service keeps track of parties registered in the system and events they are registered for.
// Runtime's use case is generalized, because runtime doesn't register itself, we preregister it in the system;
// runtime is implicitly subscribed for certain lifecycle events.
log.Debug("Preregister runtime")
registrationService := execCtx.registrationService
if err := registrationService.PreregisterRuntime(runtime); err != nil {
return err
}
bootstrap := execCtx.bootstrap
bootstrapCmd, err := bootstrap.Cmd()
if err != nil {
if fatalError, formattedLog, hasError := bootstrap.CachedFatalError(err); hasError {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalError)
execCtx.platformLogger.Printf("%s", formattedLog)
} else {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalerror.InvalidEntrypoint)
}
return err
}
bootstrapEnv := bootstrap.Env(execCtx.environment)
bootstrapCwd, err := bootstrap.Cwd()
if err != nil {
if fatalError, formattedLog, hasError := bootstrap.CachedFatalError(err); hasError {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalError)
execCtx.platformLogger.Printf("%s", formattedLog)
} else {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalerror.InvalidWorkingDir)
}
return err
}
bootstrapExtraFiles := bootstrap.ExtraFiles()
runtimeCmd := runtimecmd.NewCustomRuntimeCmd(ctx, bootstrapCmd, bootstrapCwd, bootstrapEnv, execCtx.runtimeStdoutWriter, execCtx.runtimeStderrWriter, bootstrapExtraFiles)
log.Debug("Start runtime")
err = runtimeCmd.Start()
if err != nil {
if fatalError, formattedLog, hasError := bootstrap.CachedFatalError(err); hasError {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalError)
execCtx.platformLogger.Printf("%s", formattedLog)
} else {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalerror.InvalidEntrypoint)
}
return err
}
registrationService.GetRuntime().Pid = watchdog.GoWait(runtimeCmd, fatalerror.RuntimeExit)
if err := initFlow.AwaitRuntimeReady(); err != nil {
return err
}
// Registration phase finished for agents - no more agents can be registered with the system
registrationService.TurnOff()
if extensions.AreEnabled() {
// Initialize and activate the gate with the number of agent we wait to return ready
if err := initFlow.SetAgentsReadyCount(registrationService.GetRegisteredAgentsSize()); err != nil {
return err
}
if err := initFlow.AwaitAgentsReady(); err != nil {
return err
}
}
// Logs API subscription phase finished for agents - no more agents can be subscribed to the Logs API
if execCtx.telemetryAPIEnabled {
execCtx.logsSubscriptionAPI.TurnOff()
}
execCtx.initDone = true
return nil
}