v2/internal/testcommon/kube_test_context_envtest.go (434 lines of code) (raw):
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/
package testcommon
import (
"context"
"fmt"
"log"
"math"
"math/rand"
"os/exec"
"path/filepath"
"runtime"
"runtime/debug"
"strings"
"sync"
"time"
"github.com/benbjohnson/clock"
"github.com/go-logr/logr"
"github.com/rotisserie/eris"
"golang.org/x/sync/semaphore"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2/textlogger"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"github.com/Azure/azure-service-operator/v2/internal/config"
"github.com/Azure/azure-service-operator/v2/internal/controllers"
"github.com/Azure/azure-service-operator/v2/internal/identity"
"github.com/Azure/azure-service-operator/v2/internal/logging"
"github.com/Azure/azure-service-operator/v2/internal/metrics"
"github.com/Azure/azure-service-operator/v2/internal/reconcilers/arm"
"github.com/Azure/azure-service-operator/v2/internal/reconcilers/generic"
asocel "github.com/Azure/azure-service-operator/v2/internal/util/cel"
"github.com/Azure/azure-service-operator/v2/internal/util/interval"
"github.com/Azure/azure-service-operator/v2/internal/util/kubeclient"
"github.com/Azure/azure-service-operator/v2/internal/util/lockedrand"
"github.com/Azure/azure-service-operator/v2/internal/util/to"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime/conditions"
"github.com/Azure/azure-service-operator/v2/pkg/genruntime/registration"
)
func getRoot() (string, error) {
cmd := exec.Command("git", "rev-parse", "--show-toplevel")
out, err := cmd.Output()
if err != nil {
return "", eris.Wrapf(err, "failed to get root directory")
}
return strings.TrimSpace(string(out)), nil
}
func createSharedEnvTest(cfg testConfig, namespaceResources *namespaceResources) (*runningEnvTest, error) {
log.Printf("Creating shared envtest environment: %s\n", cfgToKey(cfg))
scheme := controllers.CreateScheme()
root, err := getRoot()
if err != nil {
return nil, err
}
crdPath := filepath.Join(root, "v2/out/envtest/crds")
webhookPath := filepath.Join(root, "v2/config/webhook")
environment := envtest.Environment{
ErrorIfCRDPathMissing: true,
CRDDirectoryPaths: []string{
crdPath,
},
CRDInstallOptions: envtest.CRDInstallOptions{
Scheme: scheme,
},
WebhookInstallOptions: envtest.WebhookInstallOptions{
Paths: []string{
webhookPath,
},
},
Scheme: scheme,
}
logger := textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(logging.Debug)))
// TODO: Uncomment the below if we want controller-runtime logs in the tests.
// By default we've disabled controller runtime logs because they're very verbose and usually not useful.
// ctrl.SetLogger(logger)
ctrl.SetLogger(logr.Discard())
log.Println("Starting envtest")
kubeConfig, err := environment.Start()
if err != nil {
return nil, eris.Wrapf(err, "starting envtest environment")
}
stopEnvironment := func() {
stopErr := environment.Stop()
if stopErr != nil {
panic(stopErr)
}
}
var cacheFunc cache.NewCacheFunc
if cfg.TargetNamespaces != nil && cfg.OperatorMode.IncludesWatchers() {
cacheFunc = func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
opts.DefaultNamespaces = make(map[string]cache.Config, len(cfg.TargetNamespaces))
for _, ns := range cfg.TargetNamespaces {
opts.DefaultNamespaces[ns] = cache.Config{}
}
return cache.New(config, opts)
}
}
log.Println("Creating & starting controller-runtime manager")
mgr, err := ctrl.NewManager(kubeConfig, ctrl.Options{
Scheme: scheme,
EventBroadcaster: record.NewBroadcasterForTests(1 * time.Second),
NewClient: func(config *rest.Config, options client.Options) (client.Client, error) {
// We bypass the caching client for tests, see https://github.com/kubernetes-sigs/controller-runtime/issues/343 and
// https://github.com/kubernetes-sigs/controller-runtime/issues/1464 for details. Specifically:
// https://github.com/kubernetes-sigs/controller-runtime/issues/343#issuecomment-469435686 which states:
// "ah, yeah, this is probably a bit of a confusing statement,
// but don't use the manager client in tests. The manager-provided client is designed
// to do the right thing for controllers by default (which is to read from caches, meaning that it's not strongly consistent),
// which means it probably does the wrong thing for tests (which almost certainly want strong consistency)."
// It's possible that if we do https://github.com/Azure/azure-service-operator/issues/1891, we can go back
// to using the default (cached) client, as the main problem with using it is that it can introduce inconsistency
// in test request counts that cause intermittent test failures. Cache related failures usually manifest as
// errors when committing to etcd such as:
// "the object has been modified; please apply your changes to the latest version and try again"
// This generally means that the controller was served an older resourceVersion of a resource (from cache), which
// causes issues with our recording tests.
// Force Cache off for our client
options.Cache = &client.CacheOptions{}
return NewTestClient(config, options)
},
NewCache: cacheFunc,
Metrics: server.Options{
BindAddress: "0", // disable serving metrics, or else we get conflicts listening on same port 8080
},
WebhookServer: webhook.NewServer(webhook.Options{
Port: environment.WebhookInstallOptions.LocalServingPort,
CertDir: environment.WebhookInstallOptions.LocalServingCertDir,
}),
})
if err != nil {
stopEnvironment()
return nil, eris.Wrapf(err, "creating controller-runtime manager")
}
loggerFactory := func(obj metav1.Object) logr.Logger {
result := namespaceResources.Lookup(obj.GetNamespace())
if result == nil {
panic(fmt.Sprintf("no logger registered for %s: %s", obj.GetNamespace(), obj.GetName()))
}
return result.logger
}
var requeueDelay time.Duration
minBackoff := 1 * time.Second
maxBackoff := 1 * time.Minute
if cfg.Replaying {
requeueDelay = 10 * time.Millisecond
minBackoff = 5 * time.Millisecond
maxBackoff = 5 * time.Millisecond
}
// We use a custom indexer here so that we can simulate the caching client behavior for indexing even though
// for our tests we are not using the caching client
testIndexer := NewIndexer(mgr.GetScheme())
indexer := kubeclient.NewAndIndexer(mgr.GetFieldIndexer(), testIndexer)
kubeClient := kubeclient.NewClient(NewClient(mgr.GetClient(), testIndexer))
expressionEvaluator, err := asocel.NewExpressionEvaluator(asocel.Log(logger))
// Note that we don't start expressionEvaluator here because we're in a test context and turning cache eviction
// on is probably overkill.
if err != nil {
return nil, eris.Wrapf(err, "creating expression evaluator")
}
// This means a single evaluator will be used for all envtests. For the purposes of testing that's probably OK...
asocel.RegisterEvaluator(expressionEvaluator)
credentialProviderWrapper := &credentialProviderWrapper{namespaceResources: namespaceResources}
var clientFactory arm.ARMConnectionFactory = func(ctx context.Context, mo genruntime.ARMMetaObject) (arm.Connection, error) {
result := namespaceResources.Lookup(mo.GetNamespace())
if result == nil {
panic(fmt.Sprintf("unable to locate ARM client for namespace %s; tests should only create resources in the namespace they are assigned or have declared via TargetNamespaces",
mo.GetNamespace()))
}
return result.armClientCache.GetConnection(ctx, mo)
}
options := generic.Options{
LoggerFactory: loggerFactory,
Config: cfg.Values,
Options: controller.Options{
// Skip name validation because it uses a package global cache which results in mistakenly
// classifying two controllers with the same name in different EnvTest environments as conflicting
// when in reality they are running in separate apiservers (simulating separate operators).
// In a real Kubernetes deployment that might be a problem, but not in EnvTest.
SkipNameValidation: to.Ptr(true),
// Allow concurrent reconciliation in tests
MaxConcurrentReconciles: 5,
// Use appropriate backoff for mode.
RateLimiter: generic.NewRateLimiter(minBackoff, maxBackoff),
LogConstructor: func(request *reconcile.Request) logr.Logger {
return ctrl.Log
},
},
// Specified here because usually controller-runtime logging would detect panics and log them for us
// but in the case of envtest we disable those logs because they're too verbose.
PanicHandler: func() {
if e := recover(); e != nil {
stack := debug.Stack()
log.Printf("panic: %s\nstack:%s\n", e, stack)
}
},
RequeueIntervalCalculator: interval.NewCalculator(
interval.CalculatorParameters{
//nolint:gosec // do not want cryptographic randomness here
Rand: rand.New(lockedrand.NewSource(time.Now().UnixNano())),
ErrorBaseDelay: minBackoff,
ErrorMaxFastDelay: maxBackoff,
ErrorMaxSlowDelay: maxBackoff,
ErrorVerySlowDelay: maxBackoff,
RequeueDelayOverride: requeueDelay,
}),
}
positiveConditions := conditions.NewPositiveConditionBuilder(clock.New())
if cfg.OperatorMode.IncludesWatchers() {
var objs []*registration.StorageType
objs, err = controllers.GetKnownStorageTypes(
mgr,
clientFactory,
credentialProviderWrapper,
kubeClient,
positiveConditions,
expressionEvaluator,
options)
if err != nil {
return nil, err
}
err = generic.RegisterAll(
mgr,
indexer,
kubeClient,
positiveConditions,
objs,
options)
if err != nil {
stopEnvironment()
return nil, eris.Wrapf(err, "registering reconcilers")
}
}
if cfg.OperatorMode.IncludesWebhooks() {
err = generic.RegisterWebhooks(mgr, controllers.GetKnownTypes())
if err != nil {
stopEnvironment()
return nil, eris.Wrapf(err, "registering webhooks")
}
}
ctx, stopManager := context.WithCancel(context.Background())
go func() {
// this blocks until the input ctx is cancelled
//nolint:shadow // We want shadowing here
err := mgr.Start(ctx)
if err != nil {
panic(fmt.Sprintf("error running controller-runtime manager: %s\n", err.Error()))
}
}()
if cfg.OperatorMode.IncludesWebhooks() {
log.Println("Waiting for webhook server to start")
// Need to block here until things are actually running
chk := mgr.GetWebhookServer().StartedChecker()
timeoutAt := time.Now().Add(15 * time.Second)
for {
err = chk(nil)
if err == nil {
break
}
if time.Now().After(timeoutAt) {
err = eris.Wrap(err, "timed out waiting for webhook server to start")
panic(err.Error())
}
time.Sleep(100 * time.Millisecond)
}
log.Println("Webhook server started")
}
if cfg.OperatorMode.IncludesWatchers() {
log.Println("Waiting for watchers to start")
<-mgr.Elected()
log.Println("Watchers started")
}
cancelFunc := func() {
stopManager()
stopEnvironment()
}
return &runningEnvTest{
KubeConfig: kubeConfig,
KubeClient: kubeClient,
Stop: cancelFunc,
Cfg: cfg,
Callers: 1,
}, nil
}
// sharedEnvTests stores all the envTests we are running
// we run one per config (cfg.Values)
type sharedEnvTests struct {
envtestLock sync.Mutex
concurrencyLimitSemaphore *semaphore.Weighted
envtests map[string]*runningEnvTest
namespaceResources *namespaceResources
}
type testConfig struct {
config.Values
Replaying bool
CountsTowardsLimit bool
}
func cfgToKey(cfg testConfig) string {
return fmt.Sprintf(
"%s/Replaying:%t",
cfg.Values,
cfg.Replaying)
}
func (set *sharedEnvTests) stopAll() {
set.envtestLock.Lock()
defer set.envtestLock.Unlock()
for _, v := range set.envtests {
v.Stop()
if v.Cfg.CountsTowardsLimit {
set.concurrencyLimitSemaphore.Release(1)
}
}
}
func (set *sharedEnvTests) garbageCollect(cfg testConfig, logger logr.Logger) {
envTestKey := cfgToKey(cfg)
set.envtestLock.Lock()
defer set.envtestLock.Unlock()
envTest, ok := set.envtests[envTestKey]
if !ok {
return
}
envTest.Callers -= 1
logger.V(2).Info("EnvTest instance now has", "activeTests", envTest.Callers)
if envTest.Callers != 0 {
return
}
logger.V(2).Info("Shutting down EnvTest instance")
envTest.Stop()
delete(set.envtests, envTestKey)
if cfg.CountsTowardsLimit {
set.concurrencyLimitSemaphore.Release(1)
}
}
func (set *sharedEnvTests) getRunningEnvTest(key string) *runningEnvTest {
set.envtestLock.Lock()
defer set.envtestLock.Unlock()
if envTest, ok := set.envtests[key]; ok {
envTest.Callers += 1
return envTest
}
return nil
}
func (set *sharedEnvTests) getEnvTestForConfig(ctx context.Context, cfg testConfig, logger logr.Logger) (*runningEnvTest, error) {
envTestKey := cfgToKey(cfg)
envTest := set.getRunningEnvTest(envTestKey)
if envTest != nil {
return envTest, nil
}
// The order of these locks matters: Have to make sure we have spare capacity before take the shared lock
if cfg.CountsTowardsLimit {
logger.V(2).Info("Acquiring envtest concurrency semaphore")
err := set.concurrencyLimitSemaphore.Acquire(ctx, 1)
if err != nil {
return nil, err
}
}
set.envtestLock.Lock()
defer set.envtestLock.Unlock()
logger.V(2).Info("Starting envtest")
// no envtest exists for this config; make one
//nolint: contextcheck // 2022-09 @unrepentantgeek Seems to be a false positive
newEnvTest, err := createSharedEnvTest(cfg, set.namespaceResources)
if err != nil {
return nil, eris.Wrap(err, "unable to create shared envtest environment")
}
set.envtests[envTestKey] = newEnvTest
return newEnvTest, nil
}
type runningEnvTest struct {
KubeConfig *rest.Config
KubeClient kubeclient.Client
Stop context.CancelFunc
Cfg testConfig
Callers int
}
// each test is run in its own namespace
// in order for the controller to access the
// right ARM client and logger we store them in here
type perNamespace struct {
armClientCache *arm.ARMClientCache
credentialProvider identity.CredentialProvider
logger logr.Logger
}
type namespaceResources struct {
// accessed from many controllers at once so needs to be threadsafe
lock sync.Mutex
clients map[string]*perNamespace
}
func (nr *namespaceResources) Add(namespace string, resources *perNamespace) {
nr.lock.Lock()
defer nr.lock.Unlock()
if _, ok := nr.clients[namespace]; ok {
panic(fmt.Sprintf("bad test configuration: multiple tests using the same namespace %s", namespace))
}
nr.clients[namespace] = resources
}
func (nr *namespaceResources) Lookup(namespace string) *perNamespace {
nr.lock.Lock()
defer nr.lock.Unlock()
return nr.clients[namespace]
}
func (nr *namespaceResources) Remove(namespace string) {
nr.lock.Lock()
defer nr.lock.Unlock()
delete(nr.clients, namespace)
}
func createEnvtestContext() (BaseTestContextFactory, context.CancelFunc) {
perNamespaceResources := &namespaceResources{
lock: sync.Mutex{},
clients: make(map[string]*perNamespace),
}
cpus := runtime.NumCPU()
concurrencyLimit := math.Max(float64(cpus/4), 1)
envTests := sharedEnvTests{
envtestLock: sync.Mutex{},
concurrencyLimitSemaphore: semaphore.NewWeighted(int64(concurrencyLimit)),
envtests: make(map[string]*runningEnvTest),
namespaceResources: perNamespaceResources,
}
create := func(perTestContext PerTestContext, cfg config.Values) (*KubeBaseTestContext, error) {
testCfg := testConfig{
Values: cfg,
Replaying: perTestContext.AzureClientRecorder.IsReplaying(),
CountsTowardsLimit: perTestContext.CountsTowardsParallelLimits,
}
envtest, err := envTests.getEnvTestForConfig(perTestContext.Ctx, testCfg, perTestContext.logger)
if err != nil {
return nil, err
}
{
defaultCred := identity.NewDefaultCredential(
perTestContext.AzureClient.Creds(),
cfg.PodNamespace,
perTestContext.AzureSubscription,
nil,
)
credentialProvider := identity.NewCredentialProvider(defaultCred, envtest.KubeClient, nil)
// register resources needed by controller for namespace
armClientCache := arm.NewARMClientCache(
credentialProvider,
envtest.KubeClient,
cfg.Cloud(),
perTestContext.HTTPClient,
metrics.NewARMClientMetrics())
resources := &perNamespace{
armClientCache: armClientCache,
credentialProvider: credentialProvider,
logger: perTestContext.logger,
}
namespace := perTestContext.Namespace
perNamespaceResources.Add(namespace, resources)
perTestContext.T.Cleanup(func() { perNamespaceResources.Remove(namespace) })
for _, otherNs := range cfg.TargetNamespaces {
otherNs := otherNs
perNamespaceResources.Add(otherNs, resources)
perTestContext.T.Cleanup(func() { perNamespaceResources.Remove(otherNs) })
}
}
if perTestContext.CountsTowardsParallelLimits {
perTestContext.T.Cleanup(func() {
envTests.garbageCollect(testCfg, perTestContext.logger)
})
}
return &KubeBaseTestContext{
PerTestContext: perTestContext,
KubeConfig: envtest.KubeConfig,
}, nil
}
cleanup := func() {
envTests.stopAll()
}
return create, cleanup
}