internal/testutil/testutil.go (338 lines of code) (raw):
package testutil
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
goruntime "runtime"
"strconv"
"testing"
"time"
"github.com/go-logr/logr"
"github.com/go-logr/logr/testr"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/envtest"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
apiv1 "github.com/Azure/eno/api/v1"
testv1 "github.com/Azure/eno/internal/controllers/reconciliation/fixtures/v1"
"github.com/Azure/eno/internal/execution"
"github.com/Azure/eno/internal/manager"
)
func NewClient(t testing.TB, objs ...client.Object) client.Client {
return NewClientWithInterceptors(t, nil, objs...)
}
func NewClientWithInterceptors(t testing.TB, ict *interceptor.Funcs, objs ...client.Object) client.Client {
scheme := runtime.NewScheme()
require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme))
require.NoError(t, corev1.SchemeBuilder.AddToScheme(scheme))
builder := fake.NewClientBuilder().
WithScheme(scheme).
WithObjects(objs...).
WithStatusSubresource(&apiv1.ResourceSlice{}, &apiv1.Composition{}, &apiv1.Symphony{})
if ict != nil {
builder.WithInterceptorFuncs(*ict)
}
return builder.Build()
}
func NewReadOnlyClient(t testing.TB, objs ...runtime.Object) client.Client {
scheme := runtime.NewScheme()
require.NoError(t, apiv1.SchemeBuilder.AddToScheme(scheme))
require.NoError(t, corev1.SchemeBuilder.AddToScheme(scheme))
builder := fake.NewClientBuilder().
WithScheme(scheme).
WithRuntimeObjects(objs...).
WithStatusSubresource(&apiv1.ResourceSlice{}, &apiv1.Composition{})
builder.WithInterceptorFuncs(interceptor.Funcs{
Create: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.CreateOption) error {
return errors.New("no writes allowed")
},
Update: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.UpdateOption) error {
return errors.New("no writes allowed")
},
Patch: func(ctx context.Context, client client.WithWatch, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return errors.New("no writes allowed")
},
Delete: func(ctx context.Context, client client.WithWatch, obj client.Object, opts ...client.DeleteOption) error {
return errors.New("no writes allowed")
},
SubResourceUpdate: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
return errors.New("no writes allowed")
},
SubResourcePatch: func(ctx context.Context, client client.Client, subResourceName string, obj client.Object, patch client.Patch, opts ...client.SubResourcePatchOption) error {
return errors.New("no writes allowed")
},
})
return builder.Build()
}
func NewContext(t *testing.T) context.Context {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
})
return logr.NewContext(ctx, testr.NewWithOptions(t, testr.Options{Verbosity: 2}))
}
type TestManagerOption func(*manager.Options)
func (o TestManagerOption) apply(opts *manager.Options) {
o(opts)
}
func WithPodNamespace(ns string) TestManagerOption {
return TestManagerOption(func(o *manager.Options) {
o.SynthesizerPodNamespace = ns
})
}
func WithCompositionNamespace(ns string) TestManagerOption {
return TestManagerOption(func(o *manager.Options) {
o.CompositionNamespace = ns
})
}
// NewManager starts one or two envtest environments depending on the env.
// This should work seamlessly when run locally assuming binaries have been fetched with setup-envtest.
// In CI the second environment is used to compatibility test against a matrix of k8s versions.
// This compatibility testing is tightly coupled to the github action and not expected to work locally.
func NewManager(t *testing.T, testOpts ...TestManagerOption) *Manager {
t.Parallel()
_, b, _, _ := goruntime.Caller(0)
root := filepath.Join(filepath.Dir(b), "..", "..")
testCrdDir := filepath.Join(root, "internal", "controllers", "reconciliation", "fixtures", "v1", "config", "crd")
env := &envtest.Environment{
CRDDirectoryPaths: []string{
filepath.Join(root, "api", "v1", "config", "crd"),
testCrdDir,
},
ErrorIfCRDPathMissing: true,
AttachControlPlaneOutput: os.Getenv("ACTIONS_RUNNER_DEBUG") != "" || os.Getenv("ACTIONS_STEP_DEBUG") != "",
// We can't use KUBEBUILDER_ASSETS when also setting DOWNSTREAM_KUBEBUILDER_ASSETS
// because the envvar overrides BinaryAssetsDirectory
BinaryAssetsDirectory: os.Getenv("UPSTREAM_KUBEBUILDER_ASSETS"),
}
t.Cleanup(func() {
err := env.Stop()
if err != nil {
panic(err)
}
})
var cfg *rest.Config
var err error
for i := 0; i < 2; i++ {
cfg, err = env.Start()
if err != nil {
t.Logf("failed to start test environment: %s", err)
continue
}
break
}
require.NoError(t, err)
options := &manager.Options{
Rest: cfg,
HealthProbeAddr: "127.0.0.1:0",
MetricsAddr: "127.0.0.1:0",
SynthesizerPodNamespace: "default",
CompositionNamespace: "default",
CompositionSelector: labels.Everything(),
}
for _, o := range testOpts {
o.apply(options)
}
mgr, err := manager.NewTest(logr.FromContextOrDiscard(NewContext(t)), options)
require.NoError(t, err)
require.NoError(t, testv1.SchemeBuilder.AddToScheme(mgr.GetScheme())) // test-specific CRDs
m := &Manager{
Manager: mgr,
RestConfig: cfg,
DownstreamRestConfig: cfg, // possible override below
DownstreamClient: mgr.GetClient(),
DownstreamEnv: env,
}
dir := os.Getenv("DOWNSTREAM_KUBEBUILDER_ASSETS")
if dir == "" {
return m // only one env needed
}
version, _ := strconv.Atoi(os.Getenv("DOWNSTREAM_VERSION_MINOR"))
downstreamEnv := &envtest.Environment{
BinaryAssetsDirectory: dir,
ErrorIfCRDPathMissing: true,
AttachControlPlaneOutput: os.Getenv("ACTIONS_RUNNER_DEBUG ") != "" || os.Getenv("ACTIONS_STEP_DEBUG ") != "",
}
// Only newer clusters can use envtest to install CRDs
if version >= 21 {
t.Logf("managing downstream cluster CRD with envtest because version >= 21")
downstreamEnv.CRDDirectoryPaths = append(downstreamEnv.CRDDirectoryPaths, testCrdDir)
}
// k8s <1.13 will not start if these flags are set
if version < 13 {
conf := downstreamEnv.ControlPlane.GetAPIServer().Configure()
conf.Disable("service-account-signing-key-file")
conf.Disable("service-account-issuer")
}
t.Cleanup(func() {
err := downstreamEnv.Stop()
if err != nil {
panic(err)
}
})
for i := 0; i < 2; i++ {
m.DownstreamRestConfig, err = downstreamEnv.Start()
if err != nil {
t.Logf("failed to start downstream test environment: %s", err)
continue
}
break
}
require.NoError(t, err)
m.DownstreamEnv = downstreamEnv
m.DownstreamClient, err = client.New(m.DownstreamRestConfig, client.Options{Scheme: mgr.GetScheme()})
require.NoError(t, err)
// Log apiserver version
disc, err := discovery.NewDiscoveryClientForConfig(m.DownstreamRestConfig)
if err == nil {
version, err := disc.ServerVersion()
if err == nil {
t.Logf("downstream control plane version: %s", version.String())
}
}
// We install old (v1beta1) CRDs ourselves because envtest assumes v1
if version < 21 {
t.Logf("managing downstream cluster CRD ourselves (not with envtest) because version < 21")
raw, err := os.ReadFile(filepath.Join(root, "internal", "controllers", "reconciliation", "fixtures", "v1", "config", "enotest.azure.io_testresources-old.yaml"))
require.NoError(t, err)
res := &unstructured.Unstructured{}
require.NoError(t, yaml.Unmarshal(raw, res))
cli, err := client.New(m.DownstreamRestConfig, client.Options{})
require.NoError(t, err)
require.NoError(t, cli.Create(context.Background(), res))
}
return m
}
type Manager struct {
ctrl.Manager
RestConfig *rest.Config
DownstreamRestConfig *rest.Config // may or may not == RestConfig
DownstreamClient client.Client // may or may not == Manager.GetClient()
DownstreamEnv *envtest.Environment
}
func (m *Manager) Start(t *testing.T) {
go func() {
err := m.Manager.Start(NewContext(t))
if err != nil {
// can't t.Fail here since we're in a different goroutine
panic(fmt.Sprintf("error while starting manager: %s", err))
}
}()
t.Logf("warming caches")
m.Manager.GetCache().WaitForCacheSync(context.Background())
t.Logf("warmed caches")
}
func (m *Manager) GetCurrentResourceSlices(ctx context.Context) ([]*apiv1.ResourceSlice, error) {
cli := m.Manager.GetAPIReader()
comps := &apiv1.CompositionList{}
err := cli.List(ctx, comps)
if err != nil {
return nil, err
}
if l := len(comps.Items); l != 1 {
return nil, fmt.Errorf("expected one composition, found %d", l)
}
if comps.Items[0].Synthesizing() {
return nil, fmt.Errorf("composition is still being synthesized")
}
synthesis := comps.Items[0].Status.CurrentSynthesis
if synthesis == nil {
return nil, fmt.Errorf("synthesis hasn't completed yet")
}
returns := make([]*apiv1.ResourceSlice, len(synthesis.ResourceSlices))
for i, ref := range synthesis.ResourceSlices {
slice := &apiv1.ResourceSlice{}
slice.Name = ref.Name
slice.Namespace = comps.Items[0].Namespace
returns[i] = slice
err = cli.Get(ctx, client.ObjectKeyFromObject(slice), slice)
if err != nil {
return nil, err
}
}
return returns, nil
}
var Backoff = wait.Backoff{
Steps: 10,
Duration: 10 * time.Millisecond,
Factor: 2.0,
Jitter: 0.1,
Cap: time.Minute,
}
func Eventually(t testing.TB, fn func() bool) {
t.Helper()
SomewhatEventually(t, time.Second*15, fn)
}
func SomewhatEventually(t testing.TB, dur time.Duration, fn func() bool) {
t.Helper()
start := time.Now()
for {
if time.Since(start) > dur {
t.Fatalf("timeout while waiting for condition")
return
}
if fn() {
return
}
time.Sleep(time.Millisecond * 100)
}
}
func AtLeastVersion(t *testing.T, minor int) bool {
versionStr := os.Getenv("DOWNSTREAM_VERSION_MINOR")
if versionStr == "" {
return true // fail open for local dev
}
version, _ := strconv.Atoi(versionStr)
return version >= minor
}
func WithFakeExecutor(t *testing.T, mgr *Manager, sh execution.SynthesizerHandle) {
cli := mgr.GetAPIReader()
podCtrl := reconcile.Func(func(ctx context.Context, r reconcile.Request) (reconcile.Result, error) {
pod := &corev1.Pod{}
err := cli.Get(ctx, r.NamespacedName, pod)
if err != nil {
return reconcile.Result{}, client.IgnoreNotFound(err)
}
if pod.DeletionTimestamp != nil {
return reconcile.Result{}, nil
}
env := &execution.Env{}
for _, e := range pod.Spec.Containers[0].Env {
switch e.Name {
case "COMPOSITION_NAME":
env.CompositionName = e.Value
case "COMPOSITION_NAMESPACE":
env.CompositionNamespace = e.Value
case "SYNTHESIS_UUID":
env.SynthesisUUID = e.Value
case "IMAGE":
env.Image = e.Value
}
}
e := &execution.Executor{
Reader: cli,
Writer: mgr.GetClient(),
Handler: sh,
}
err = e.Synthesize(ctx, env)
if err != nil {
// Returning an error from the synth would eventually result in a timeout.
// To avoid waiting that long in the tests we can just delete the pod after the first try.
return reconcile.Result{}, mgr.GetClient().Delete(ctx, pod)
}
err = mgr.GetClient().Get(ctx, client.ObjectKeyFromObject(pod), pod)
if err != nil {
return reconcile.Result{}, nil
}
pod.Status.Phase = corev1.PodSucceeded
err = mgr.GetClient().Status().Update(ctx, pod)
if err != nil {
return reconcile.Result{}, nil
}
return reconcile.Result{}, nil
})
_, err := ctrl.NewControllerManagedBy(mgr.Manager).
For(&corev1.Pod{}).
Build(podCtrl)
require.NoError(t, err)
}