in internal/plugin/manager/pluginmanager.go [215:270]
func InitPluginManager(ctx context.Context, instanceID string) (*PluginManager, error) {
galog.Infof("Initializing plugin manager for instance %q", instanceID)
pluginManager.setInstanceID(instanceID)
// Cleanup old plugin state in a separate goroutine. This operation is not
// critical for plugin manager initialization and should not block it.
go func() {
if err := pluginManager.cleanupOldState(ctx, filepath.Dir(baseState())); err != nil {
galog.Errorf("Failed to cleanup old plugin state: %v", err)
}
}()
plugins, err := load(agentPluginState())
if err != nil {
return nil, fmt.Errorf("unable to load existing plugin state: %w", err)
}
pluginManager.plugins = plugins
if err := RegisterCmdHandler(ctx); err != nil {
return nil, fmt.Errorf("failed to register plugin command handler: %w", err)
}
wg := sync.WaitGroup{}
pluginManager.pendingPluginRevisionsMu.Lock()
for _, p := range plugins {
pluginManager.inProgressPluginRequests[p.Name] = true
wg.Add(1)
go func(p *Plugin) {
// Regardless of the outcome, we should remove the plugin from the pending
// list as request is no longer in process for this plugin.
defer func() {
pluginManager.pendingPluginRevisionsMu.Lock()
defer pluginManager.pendingPluginRevisionsMu.Unlock()
delete(pluginManager.inProgressPluginRequests, p.Name)
wg.Done()
}()
if err := connectOrReLaunch(ctx, p); err != nil {
galog.Errorf("Failed to connect or relaunch plugin %q: %v", p.FullName(), err)
} else {
pluginManager.startPluginSchedulers(ctx, p)
}
}(p)
}
pluginManager.IsInitialized.Store(true)
pluginManager.pendingPluginRevisionsMu.Unlock()
wg.Wait()
if isUDSSupported() {
pluginManager.protocol = udsProtocol
}
return pluginManager, nil
}