in lambda/rapid/handlers.go [279:404]
func doRuntimeDomainInit(execCtx *rapidContext, sbInfoFromInit interop.SandboxInfoFromInit, phase interop.LifecyclePhase) error {
initStartTime := metering.Monotime()
sendInitStartLogEvent(execCtx, sbInfoFromInit.SandboxType, phase)
defer sendInitReportLogEvent(execCtx, sbInfoFromInit.SandboxType, initStartTime, phase)
execCtx.xray.RecordInitStartTime()
defer execCtx.xray.RecordInitEndTime()
defer func() {
if extensions.AreEnabled() {
logAgentsInitStatus(execCtx)
}
}()
execCtx.runtimeDomainGeneration++
if extensions.AreEnabled() {
runtimeExtensions := agents.ListExternalAgentPaths(defaultAgentLocation,
execCtx.supervisor.RootPath)
if err := doInitExtensions(RuntimeDomain, runtimeExtensions, execCtx, sbInfoFromInit.EnvironmentVariables); err != nil {
return err
}
}
appctx.StoreSandboxType(execCtx.appCtx, sbInfoFromInit.SandboxType)
initFlow := execCtx.registrationService.InitFlow()
// Runtime state machine
runtime := core.NewRuntime(initFlow, execCtx.invokeFlow)
// Registration service keeps track of parties registered in the system and events they are registered for.
// Runtime's use case is generalized, because runtime doesn't register itself, we preregister it in the system;
// runtime is implicitly subscribed for certain lifecycle events.
log.Debug("Preregister runtime")
registrationService := execCtx.registrationService
err := registrationService.PreregisterRuntime(runtime)
if err != nil {
return err
}
bootstrapCmd, bootstrapEnv, bootstrapCwd, bootstrapExtraFiles, err := doRuntimeBootstrap(execCtx, sbInfoFromInit)
if err != nil {
return err
}
runtimeStdoutWriter, runtimeStderrWriter, err := execCtx.logsEgressAPI.GetRuntimeSockets()
if err != nil {
return err
}
log.Debug("Start runtime")
checkCredentials(execCtx, bootstrapEnv)
name := fmt.Sprintf("%s-%d", runtimeProcessName, execCtx.runtimeDomainGeneration)
err = execCtx.supervisor.Exec(context.Background(), &supvmodel.ExecRequest{
Domain: RuntimeDomain,
Name: name,
Cwd: &bootstrapCwd,
Path: bootstrapCmd[0],
Args: bootstrapCmd[1:],
Env: &bootstrapEnv,
Logging: supvmodel.Logging{
Managed: supvmodel.ManagedLogging{
Topic: supvmodel.RuntimeManagedLoggingTopic,
Formats: []supvmodel.ManagedLoggingFormat{
supvmodel.LineBasedManagedLogging,
supvmodel.MessageBasedManagedLogging,
},
},
},
StdoutWriter: runtimeStdoutWriter,
StderrWriter: runtimeStderrWriter,
ExtraFiles: &bootstrapExtraFiles,
})
runtimeDoneStatus := telemetry.RuntimeDoneSuccess
defer func() {
sendInitRuntimeDoneLogEvent(execCtx, sbInfoFromInit.SandboxType, runtimeDoneStatus, phase)
}()
if err != nil {
if fatalError, formattedLog, hasError := sbInfoFromInit.RuntimeBootstrap.CachedFatalError(err); hasError {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalError)
execCtx.eventsAPI.SendImageErrorLog(interop.ImageErrorLogData(formattedLog))
} else {
appctx.StoreFirstFatalError(execCtx.appCtx, fatalerror.InvalidEntrypoint)
}
runtimeDoneStatus = telemetry.RuntimeDoneError
return err
}
execCtx.shutdownContext.createExitedChannel(name)
if err := initFlow.AwaitRuntimeRestoreReady(); err != nil {
runtimeDoneStatus = telemetry.RuntimeDoneError
return err
}
runtimeDoneStatus = telemetry.RuntimeDoneSuccess
// Registration phase finished for agents - no more agents can be registered with the system
registrationService.TurnOff()
if extensions.AreEnabled() {
// Initialize and activate the gate with the number of agent we wait to return ready
if err := initFlow.SetAgentsReadyCount(registrationService.GetRegisteredAgentsSize()); err != nil {
return err
}
if err := initFlow.AwaitAgentsReady(); err != nil {
runtimeDoneStatus = telemetry.RuntimeDoneError
return err
}
}
// Logs API subscription phase finished for agents - no more agents can be subscribed to the Logs API
if execCtx.telemetryAPIEnabled {
execCtx.logsSubscriptionAPI.TurnOff()
execCtx.telemetrySubscriptionAPI.TurnOff()
}
execCtx.initDone = true
return nil
}