in pkg/resource/manager.go [45:102]
func NewResourceManager(ctx context.Context, resourceNames []string, wrapper api.Wrapper) (ResourceManager, error) {
// Load that static configuration of the resource
resourceConfig := config.LoadResourceConfig()
resources := make(map[string]Resource)
// For each supported resource, initialize the resource provider and handler
for _, resourceName := range resourceNames {
resourceConfig, ok := resourceConfig[resourceName]
if !ok {
return nil, fmt.Errorf("failed to find resource configuration %s", resourceName)
}
ctrl.Log.Info("initializing resource", "resource name",
resourceName, "resource count", resourceConfig.WorkerCount)
workers := worker.NewDefaultWorkerPool(
resourceConfig.Name,
resourceConfig.WorkerCount,
config.WorkQueueDefaultMaxRetries,
ctrl.Log.WithName(fmt.Sprintf("%s-%s", resourceName, "worker")), ctx)
var resourceHandler handler.Handler
var resourceProvider provider.ResourceProvider
if resourceName == config.ResourceNameIPAddress {
resourceProvider = ip.NewIPv4Provider(ctrl.Log.WithName("ipv4 provider"),
wrapper, workers, resourceConfig)
resourceHandler = handler.NewWarmResourceHandler(ctrl.Log.WithName(resourceName), wrapper,
resourceName, resourceProvider, ctx)
} else if resourceName == config.ResourceNamePodENI {
resourceProvider = branch.NewBranchENIProvider(ctrl.Log.WithName("branch eni provider"),
wrapper, workers, resourceConfig, ctx)
resourceHandler = handler.NewOnDemandHandler(ctrl.Log.WithName(resourceName),
resourceName, resourceProvider)
} else {
return nil, fmt.Errorf("resource type is not defnied %s", resourceName)
}
err := workers.StartWorkerPool(resourceProvider.ProcessAsyncJob)
if err != nil {
return nil, fmt.Errorf("unable to start the workers for resource %s", resourceName)
}
resources[resourceName] = Resource{
Handler: resourceHandler,
ResourceProvider: resourceProvider,
}
ctrl.Log.Info("successfully initialized resource handler and provider",
"resource name", resourceName)
}
return &Manager{
resource: resources,
}, nil
}