pkg/operator/deploy/deploy.go (605 lines of code) (raw):
package deploy
// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.
import (
"bytes"
"context"
"embed"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"strings"
"text/template"
"time"
"github.com/hashicorp/go-multierror"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
extensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
extensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
appsv1client "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
operatorclient "github.com/openshift/client-go/operator/clientset/versioned"
"github.com/Azure/ARO-RP/pkg/api"
apisubnet "github.com/Azure/ARO-RP/pkg/api/util/subnet"
"github.com/Azure/ARO-RP/pkg/env"
pkgoperator "github.com/Azure/ARO-RP/pkg/operator"
arov1alpha1 "github.com/Azure/ARO-RP/pkg/operator/apis/aro.openshift.io/v1alpha1"
aroclient "github.com/Azure/ARO-RP/pkg/operator/clientset/versioned"
"github.com/Azure/ARO-RP/pkg/operator/controllers/genevalogging"
"github.com/Azure/ARO-RP/pkg/util/clienthelper"
"github.com/Azure/ARO-RP/pkg/util/dynamichelper"
utilkubernetes "github.com/Azure/ARO-RP/pkg/util/kubernetes"
utilpem "github.com/Azure/ARO-RP/pkg/util/pem"
"github.com/Azure/ARO-RP/pkg/util/pullsecret"
"github.com/Azure/ARO-RP/pkg/util/ready"
"github.com/Azure/ARO-RP/pkg/util/restconfig"
)
//go:embed staticresources
var embeddedFiles embed.FS
type Operator interface {
Install(context.Context) error
Update(context.Context) error
CreateOrUpdateCredentialsRequest(context.Context) error
IsReady(context.Context) (bool, error)
Restart(context.Context, []string) error
IsRunningDesiredVersion(context.Context) (bool, error)
EnsureUpgradeAnnotation(context.Context) error
SyncClusterObject(context.Context) error
SetForceReconcile(context.Context, bool) error
}
type operator struct {
log *logrus.Entry
env env.Interface
oc *api.OpenShiftCluster
subscriptiondoc *api.SubscriptionDocument
arocli aroclient.Interface
client clienthelper.Interface
extensionscli extensionsclient.Interface
kubernetescli kubernetes.Interface
operatorcli operatorclient.Interface
dh dynamichelper.Interface
}
func New(log *logrus.Entry, env env.Interface, oc *api.OpenShiftCluster, subscriptionDoc *api.SubscriptionDocument, arocli aroclient.Interface, client clienthelper.Interface, extensionscli extensionsclient.Interface, kubernetescli kubernetes.Interface, operatorcli operatorclient.Interface) (Operator, error) {
restConfig, err := restconfig.RestConfig(env, oc)
if err != nil {
return nil, err
}
dh, err := dynamichelper.New(log, restConfig)
if err != nil {
return nil, err
}
return &operator{
log: log,
env: env,
oc: oc,
arocli: arocli,
client: client,
extensionscli: extensionscli,
kubernetescli: kubernetescli,
operatorcli: operatorcli,
dh: dh,
subscriptiondoc: subscriptionDoc,
}, nil
}
type deploymentData struct {
Image string
Version string
IsLocalDevelopment bool
SupportsPodSecurityAdmission bool
UsesWorkloadIdentity bool
TokenVolumeMountPath string
FederatedTokenFilePath string
}
func (o *operator) SetForceReconcile(ctx context.Context, enable bool) error {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
c, err := o.arocli.AroV1alpha1().Clusters().Get(ctx, arov1alpha1.SingletonClusterName, metav1.GetOptions{})
if err != nil {
return err
}
if enable {
c.Spec.OperatorFlags[pkgoperator.ForceReconciliation] = "true"
} else {
c.Spec.OperatorFlags[pkgoperator.ForceReconciliation] = "false"
}
_, err = o.arocli.AroV1alpha1().Clusters().Update(ctx, c, metav1.UpdateOptions{})
return err
})
}
func templateManifests(data deploymentData) ([][]byte, error) {
templatesRoot, err := template.ParseFS(embeddedFiles, "staticresources/*.yaml")
if err != nil {
return nil, err
}
templatesMaster, err := template.ParseFS(embeddedFiles, "staticresources/master/*")
if err != nil {
return nil, err
}
templatesWorker, err := template.ParseFS(embeddedFiles, "staticresources/worker/*")
if err != nil {
return nil, err
}
templatedFiles := make([][]byte, 0)
templatesArray := []*template.Template{templatesRoot, templatesMaster, templatesWorker}
for _, templates := range templatesArray {
for _, templ := range templates.Templates() {
buff := &bytes.Buffer{}
if err := templ.Execute(buff, data); err != nil {
return nil, err
}
templatedFiles = append(templatedFiles, buff.Bytes())
}
}
return templatedFiles, nil
}
func (o *operator) createDeploymentData(ctx context.Context) (deploymentData, error) {
image := o.env.AROOperatorImage()
// HACK: Override for ARO_IMAGE env variable setup in local-dev mode
version := "latest"
if strings.Contains(image, ":") {
str := strings.Split(image, ":")
version = str[len(str)-1]
}
// Set version correctly if it's overridden
if o.oc.Properties.OperatorVersion != "" {
version = o.oc.Properties.OperatorVersion
image = fmt.Sprintf("%s/aro:%s", o.env.ACRDomain(), version)
}
// Set Pod Security Admission parameters if > 4.10
// this only gets set on PUCM (Everything or OperatorUpdate)
usePodSecurityAdmission, err := pkgoperator.ShouldUsePodSecurityStandard(ctx, o.client)
if err != nil {
return deploymentData{}, err
}
data := deploymentData{
IsLocalDevelopment: o.env.IsLocalDevelopmentMode(),
Image: image,
SupportsPodSecurityAdmission: usePodSecurityAdmission,
Version: version,
}
if o.oc.UsesWorkloadIdentity() {
data.UsesWorkloadIdentity = o.oc.UsesWorkloadIdentity()
data.TokenVolumeMountPath = filepath.Dir(pkgoperator.OperatorTokenFile)
data.FederatedTokenFilePath = pkgoperator.OperatorTokenFile
}
return data, nil
}
func (o *operator) createObjects(ctx context.Context) ([]kruntime.Object, error) {
deploymentData, err := o.createDeploymentData(ctx)
if err != nil {
return nil, err
}
templated, err := templateManifests(deploymentData)
if err != nil {
return nil, err
}
objects := make([]kruntime.Object, 0, len(templated))
for _, v := range templated {
obj, _, err := scheme.Codecs.UniversalDeserializer().Decode(v, nil, nil)
if err != nil {
return nil, err
}
objects = append(objects, obj)
}
return objects, nil
}
func (o *operator) resources(ctx context.Context) ([]kruntime.Object, error) {
// first static resources from Assets
results, err := o.createObjects(ctx)
if err != nil {
return nil, err
}
// then dynamic resources
if o.oc.UsesWorkloadIdentity() {
operatorIdentitySecret, err := o.generateOperatorIdentitySecret()
if err != nil {
return nil, err
}
results = append(results, operatorIdentitySecret)
}
key, cert := o.env.ClusterGenevaLoggingSecret()
gcsKeyBytes, err := utilpem.Encode(key)
if err != nil {
return nil, err
}
gcsCertBytes, err := utilpem.Encode(cert)
if err != nil {
return nil, err
}
ps, err := pullsecret.Build(o.oc, "")
if err != nil {
return nil, err
}
// create a secret here for genevalogging, later we will copy it to
// the genevalogging namespace.
return append(results,
&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: pkgoperator.SecretName,
Namespace: pkgoperator.Namespace,
},
Data: map[string][]byte{
genevalogging.GenevaCertName: gcsCertBytes,
genevalogging.GenevaKeyName: gcsKeyBytes,
corev1.DockerConfigJsonKey: []byte(ps),
},
},
), nil
}
func (o *operator) generateOperatorIdentitySecret() (*corev1.Secret, error) {
var operatorIdentity *api.PlatformWorkloadIdentity // use a pointer to make it easy to check if we found an identity below
for k, i := range o.oc.Properties.PlatformWorkloadIdentityProfile.PlatformWorkloadIdentities {
if k == pkgoperator.OperatorIdentityName {
operatorIdentity = &i
break
}
}
if operatorIdentity == nil {
return nil, fmt.Errorf("operator identity %s not found", pkgoperator.OperatorIdentityName)
}
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: pkgoperator.Namespace,
Name: pkgoperator.OperatorIdentitySecretName,
},
StringData: map[string]string{
"azure_client_id": operatorIdentity.ClientID,
"azure_tenant_id": o.subscriptiondoc.Subscription.Properties.TenantID,
"azure_region": o.oc.Location,
"azure_subscription_id": o.subscriptiondoc.ID,
"azure_federated_token_file": pkgoperator.OperatorTokenFile,
},
}, nil
}
func (o *operator) clusterObject() (*arov1alpha1.Cluster, error) {
vnetID, _, err := apisubnet.Split(o.oc.Properties.MasterProfile.SubnetID)
if err != nil {
return nil, err
}
domain := o.oc.Properties.ClusterProfile.Domain
if !strings.ContainsRune(domain, '.') {
domain += "." + o.env.Domain()
}
ingressIP, err := checkIngressIP(o.oc.Properties.IngressProfiles)
if err != nil {
return nil, err
}
serviceSubnets := []string{
"/subscriptions/" + o.env.SubscriptionID() + "/resourceGroups/" + o.env.ResourceGroup() + "/providers/Microsoft.Network/virtualNetworks/rp-pe-vnet-001/subnets/rp-pe-subnet",
"/subscriptions/" + o.env.SubscriptionID() + "/resourceGroups/" + o.env.ResourceGroup() + "/providers/Microsoft.Network/virtualNetworks/rp-vnet/subnets/rp-subnet",
}
// Avoiding issues with dev environment when gateway is not present
if o.oc.Properties.FeatureProfile.GatewayEnabled {
serviceSubnets = append(serviceSubnets, "/subscriptions/"+o.env.SubscriptionID()+"/resourceGroups/"+o.env.GatewayResourceGroup()+"/providers/Microsoft.Network/virtualNetworks/gateway-vnet/subnets/gateway-subnet")
}
cluster := &arov1alpha1.Cluster{
ObjectMeta: metav1.ObjectMeta{
Name: arov1alpha1.SingletonClusterName,
},
Spec: arov1alpha1.ClusterSpec{
ResourceID: o.oc.ID,
ClusterResourceGroupID: o.oc.Properties.ClusterProfile.ResourceGroupID,
Domain: domain,
ACRDomain: o.env.ACRDomain(),
AZEnvironment: o.env.Environment().Name,
Location: o.env.Location(),
InfraID: o.oc.Properties.InfraID,
ArchitectureVersion: int(o.oc.Properties.ArchitectureVersion),
VnetID: vnetID,
StorageSuffix: o.oc.Properties.StorageSuffix,
GenevaLogging: arov1alpha1.GenevaLoggingSpec{
ConfigVersion: o.env.ClusterGenevaLoggingConfigVersion(),
MonitoringGCSAccount: o.env.ClusterGenevaLoggingAccount(),
MonitoringGCSEnvironment: o.env.ClusterGenevaLoggingEnvironment(),
MonitoringGCSNamespace: o.env.ClusterGenevaLoggingNamespace(),
},
ServiceSubnets: serviceSubnets,
InternetChecker: arov1alpha1.InternetCheckerSpec{
URLs: []string{
fmt.Sprintf("https://%s/", o.env.ACRDomain()),
o.env.Environment().ActiveDirectoryEndpoint,
o.env.Environment().ResourceManagerEndpoint,
o.env.Environment().GenevaMonitoringEndpoint,
},
},
APIIntIP: o.oc.Properties.APIServerProfile.IntIP,
IngressIP: ingressIP,
GatewayPrivateEndpointIP: o.oc.Properties.NetworkProfile.GatewayPrivateEndpointIP,
// Update the OperatorFlags from the version in the RP
OperatorFlags: arov1alpha1.OperatorFlags(o.oc.Properties.OperatorFlags),
},
}
if o.oc.Properties.FeatureProfile.GatewayEnabled && o.oc.Properties.NetworkProfile.GatewayPrivateEndpointIP != "" {
cluster.Spec.GatewayDomains = append(o.env.GatewayDomains(), o.oc.Properties.ImageRegistryStorageAccountName+".blob."+o.env.Environment().StorageEndpointSuffix)
} else {
// covers the case of an admin-disable, we need to update dnsmasq on each node
cluster.Spec.GatewayDomains = make([]string, 0)
}
return cluster, nil
}
func (o *operator) SyncClusterObject(ctx context.Context) error {
resource, err := o.clusterObject()
if err != nil {
return err
}
return o.dh.Ensure(ctx, resource)
}
func (o *operator) Install(ctx context.Context) error {
resources, err := o.resources(ctx)
if err != nil {
return err
}
// If we're installing the Operator for the first time, include the Cluster
// object, otherwise it is updated separately
cluster, err := o.clusterObject()
if err != nil {
return err
}
resources = append(resources, cluster)
return o.applyDeployment(ctx, resources)
}
func (o *operator) Update(ctx context.Context) error {
resources, err := o.resources(ctx)
if err != nil {
return err
}
return o.applyDeployment(ctx, resources)
}
func (o *operator) applyDeployment(ctx context.Context, resources []kruntime.Object) error {
err := dynamichelper.Prepare(resources)
if err != nil {
return err
}
for _, resource := range resources {
err = o.dh.Ensure(ctx, resource)
if err != nil {
return err
}
gvks, _, err := scheme.Scheme.ObjectKinds(resource)
if err != nil {
return err
}
switch gvks[0].GroupKind().String() {
case "CustomResourceDefinition.apiextensions.k8s.io":
acc, err := meta.Accessor(resource)
if err != nil {
return err
}
err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
crd, err := o.extensionscli.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, acc.GetName(), metav1.GetOptions{})
if err != nil {
return false, err
}
return isCRDEstablished(crd), nil
})
if err != nil {
return err
}
err = o.dh.Refresh()
if err != nil {
return err
}
case "Cluster.aro.openshift.io":
// add an owner reference onto our configuration secret. This is
// can only be done once we've got the cluster UID. It is needed to
// ensure that secret updates trigger updates of the appropriate
// controllers
err = retry.OnError(wait.Backoff{
Steps: 60,
Duration: time.Second,
}, func(err error) bool {
// IsForbidden here is intended to catch the following transient
// error: secrets "cluster" is forbidden: cannot set
// blockOwnerDeletion in this case because cannot find
// RESTMapping for APIVersion aro.openshift.io/v1alpha1 Kind
// Cluster: no matches for kind "Cluster" in version
// "aro.openshift.io/v1alpha1"
return kerrors.IsForbidden(err) || kerrors.IsConflict(err)
}, func() error {
cluster, err := o.arocli.AroV1alpha1().Clusters().Get(ctx, arov1alpha1.SingletonClusterName, metav1.GetOptions{})
if err != nil {
return err
}
s, err := o.kubernetescli.CoreV1().Secrets(pkgoperator.Namespace).Get(ctx, pkgoperator.SecretName, metav1.GetOptions{})
if err != nil {
return err
}
err = controllerutil.SetControllerReference(cluster, s, scheme.Scheme)
if err != nil {
return err
}
_, err = o.kubernetescli.CoreV1().Secrets(pkgoperator.Namespace).Update(ctx, s, metav1.UpdateOptions{})
return err
})
if err != nil {
return err
}
}
}
return nil
}
// CreateOrUpdateCredentialsRequest just creates/updates the ARO operator's CredentialsRequest
// rather than doing all of the operator's associated Kubernetes resources.
func (o *operator) CreateOrUpdateCredentialsRequest(ctx context.Context) error {
templ, err := template.ParseFS(embeddedFiles, "staticresources/credentialsrequest.yaml")
if err != nil {
return err
}
buff := &bytes.Buffer{}
err = templ.Execute(buff, nil)
if err != nil {
return err
}
crUnstructured, err := dynamichelper.DecodeUnstructured(buff.Bytes())
if err != nil {
return err
}
return o.dh.Ensure(ctx, crUnstructured)
}
func (o *operator) EnsureUpgradeAnnotation(ctx context.Context) error {
if !o.oc.UsesWorkloadIdentity() {
return nil
}
if o.oc.Properties.PlatformWorkloadIdentityProfile.UpgradeableTo == nil {
return nil
}
upgradeableTo := string(*o.oc.Properties.PlatformWorkloadIdentityProfile.UpgradeableTo)
upgradeableAnnotation := "cloudcredential.openshift.io/upgradeable-to"
patch := &metav1.PartialObjectMetadata{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
upgradeableAnnotation: upgradeableTo,
},
},
}
patchBytes, err := json.Marshal(patch)
if err != nil {
return err
}
_, err = o.operatorcli.OperatorV1().CloudCredentials().Patch(ctx, "cluster", types.MergePatchType, patchBytes, metav1.PatchOptions{})
if err != nil {
return err
}
return nil
}
func (o *operator) IsReady(ctx context.Context) (bool, error) {
deploymentOk := true
var deploymentErr error
deployments := o.kubernetescli.AppsV1().Deployments(pkgoperator.Namespace)
replicasets := o.kubernetescli.AppsV1().ReplicaSets(pkgoperator.Namespace)
pods := o.kubernetescli.CoreV1().Pods(pkgoperator.Namespace)
for _, deployment := range []string{"aro-operator-master", "aro-operator-worker"} {
ok, err := ready.CheckDeploymentIsReady(ctx, deployments, deployment)()
o.log.Infof("deployment %q ok status is: %v, err is: %v", deployment, ok, err)
deploymentOk = deploymentOk && ok
if deploymentErr == nil && err != nil {
deploymentErr = err
}
if ok {
continue
}
d, err := deployments.Get(ctx, deployment, metav1.GetOptions{})
if err != nil {
o.log.Errorf("failed to get deployment %q: %s", deployment, err)
continue
}
j, err := json.Marshal(d.Status)
if err != nil {
o.log.Errorf("failed to serialize deployment %q: %s", deployment, err)
continue
}
o.log.Infof("deployment %q status: %s", deployment, string(j))
// Gather and print status of this deployment's replicasets
rs, err := replicasets.List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", deployment)})
if err != nil {
o.log.Errorf("failed to list replicasets: %s", err)
continue
}
for _, replicaset := range rs.Items {
r, err := replicasets.Get(ctx, replicaset.Name, metav1.GetOptions{})
if err != nil {
o.log.Errorf("failed to get replicaset %s: %s", replicaset.Name, err)
continue
}
j, err := json.Marshal(r.Status)
if err != nil {
o.log.Errorf("failed to serialize replicaset status %q: %s", replicaset.Name, err)
continue
}
o.log.Infof("replicaset %q status: %s", replicaset.Name, string(j))
}
// Gather and print status of this deployment's pods
ps, err := pods.List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("app=%s", deployment)})
if err != nil {
o.log.Errorf("failed to list pods: %s", err)
continue
}
for _, pod := range ps.Items {
p, err := pods.Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
o.log.Errorf("failed to get pod %s: %s", pod.Name, err)
continue
}
j, err := json.Marshal(p.Status)
if err != nil {
o.log.Errorf("failed to serialize pod status %q: %s", pod.Name, err)
continue
}
o.log.Infof("pod %q status: %s", pod.Name, string(j))
}
}
return deploymentOk, deploymentErr
}
func (o *operator) Restart(ctx context.Context, deploymentNames []string) error {
var result error
for _, dn := range deploymentNames {
err := utilkubernetes.Restart(ctx, o.kubernetescli.AppsV1().Deployments(pkgoperator.Namespace), pkgoperator.Namespace, dn)
if err != nil {
result = multierror.Append(result, err)
}
}
return result
}
func checkOperatorDeploymentVersion(ctx context.Context, cli appsv1client.DeploymentInterface, name string, desiredVersion string) (bool, error) {
d, err := cli.Get(ctx, name, metav1.GetOptions{})
switch {
case kerrors.IsNotFound(err):
return false, nil
case err != nil:
return false, err
}
if d.Labels["version"] != desiredVersion {
return false, nil
}
return true, nil
}
func checkPodImageVersion(ctx context.Context, cli corev1client.PodInterface, role string, desiredVersion string) (bool, error) {
podList, err := cli.List(ctx, metav1.ListOptions{LabelSelector: "app=" + role})
switch {
case kerrors.IsNotFound(err):
return false, nil
case err != nil:
return false, err
}
imageTag := "latest"
for _, pod := range podList.Items {
if strings.Contains(pod.Spec.Containers[0].Image, ":") {
str := strings.Split(pod.Spec.Containers[0].Image, ":")
imageTag = str[len(str)-1]
}
}
if imageTag != desiredVersion {
return false, nil
}
return true, nil
}
func (o *operator) IsRunningDesiredVersion(ctx context.Context) (bool, error) {
// Get the desired Version
image := o.env.AROOperatorImage()
desiredVersion := "latest"
if strings.Contains(image, ":") {
str := strings.Split(image, ":")
desiredVersion = str[len(str)-1]
}
if o.oc.Properties.OperatorVersion != "" {
desiredVersion = o.oc.Properties.OperatorVersion
}
// Check if aro-operator-master is running desired version
ok, err := checkOperatorDeploymentVersion(ctx, o.kubernetescli.AppsV1().Deployments(pkgoperator.Namespace), "aro-operator-master", desiredVersion)
if !ok || err != nil {
return ok, err
}
ok, err = checkPodImageVersion(ctx, o.kubernetescli.CoreV1().Pods(pkgoperator.Namespace), "aro-operator-master", desiredVersion)
if !ok || err != nil {
return ok, err
}
// Check if aro-operator-worker is running desired version
ok, err = checkOperatorDeploymentVersion(ctx, o.kubernetescli.AppsV1().Deployments(pkgoperator.Namespace), "aro-operator-worker", desiredVersion)
if !ok || err != nil {
return ok, err
}
ok, err = checkPodImageVersion(ctx, o.kubernetescli.CoreV1().Pods(pkgoperator.Namespace), "aro-operator-worker", desiredVersion)
if !ok || err != nil {
return ok, err
}
return true, nil
}
func checkIngressIP(ingressProfiles []api.IngressProfile) (string, error) {
if len(ingressProfiles) < 1 {
return "", errors.New("no Ingress Profiles found")
}
ingressIP := ingressProfiles[0].IP
if len(ingressProfiles) > 1 {
for _, p := range ingressProfiles {
if p.Name == "default" {
return p.IP, nil
}
}
}
return ingressIP, nil
}
func isCRDEstablished(crd *extensionsv1.CustomResourceDefinition) bool {
m := make(map[extensionsv1.CustomResourceDefinitionConditionType]extensionsv1.ConditionStatus, len(crd.Status.Conditions))
for _, cond := range crd.Status.Conditions {
m[cond.Type] = cond.Status
}
return m[extensionsv1.Established] == extensionsv1.ConditionTrue &&
m[extensionsv1.NamesAccepted] == extensionsv1.ConditionTrue
}