fleet-argocd-plugin/fleetclient/fleetclient.go (299 lines of code) (raw):

// Copyright 2024 Google LLC // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package fleetclient import ( "bytes" "context" "fmt" "strings" "text/template" "time" fleet "google.golang.org/api/gkehub/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" ) const ( // Fleet API service poll interval. reconcileInterval = 10 * time.Second // Template for the Kubernetes Secret name, {{.MembershipID}}.{{.Region}}.{{.ProjectNum}}. clusterSecretNameTemplate = "%s.%s.%s" // Template for the Kubernetes Secret manifest. clusterSecretTemplate = ` apiVersion: v1 kind: Secret metadata: name: {{.Name}} namespace: argocd labels: argocd.argoproj.io/secret-type: cluster annotations: fleet.gke.io/managed-by-fleet-plugin: "true" type: Opaque stringData: name: {{.Name}} server: {{.ConnectGatewayURL}} config: | { "execProviderConfig": { "command": "argocd-k8s-auth", "args": ["gcp"], "apiVersion": "client.authentication.k8s.io/v1beta1" }, "tlsClientConfig": { "insecure": false, "caData": "" } } ` ) // FleetSync is a client that periodically polls the GKE Fleet API and caches fleet information. type FleetSync struct { svc *fleet.Service // GCP project number of fleet host project. ProjectNum string // A cached map from Membership full resource name to a list of Scope IDs. MembershipTenancyMapCache map[string][]string // A cached map from Scope IDs to a list of Membership full resource names. ScopeTenancyMapCache map[string][]string } // NewFleetSync creates a new FleetSync and starts its periodical reconciliation. func NewFleetSync(ctx context.Context, projectNum string) (*FleetSync, error) { service, err := fleet.NewService(ctx) if err != nil { return nil, err } c := &FleetSync{ svc: service, ProjectNum: projectNum, } // Build the initial fleet topology before handling RPCs. if err := c.Refresh(ctx); err != nil { return nil, err } c.startReconcile(ctx) return c, nil } func (c *FleetSync) startReconcile(ctx context.Context) { go func() { for { time.Sleep(reconcileInterval) c.Refresh(ctx) } }() } // Result encapsulates the response from the fleet service. type Result struct { ServerURL string `json:"server"` Name string `json:"name"` NameShort strong `json:"nameShort"` } // PluginResults returns the results of the plugin. func (c *FleetSync) PluginResults(ctx context.Context, scopeID string) ([]Result, error) { if c.MembershipTenancyMapCache == nil || c.ScopeTenancyMapCache == nil { return nil, fmt.Errorf("fleet is empty") } var results []Result // Scope mode. Only include memberships in the specified scope. if scopeID != "" { if c.ScopeTenancyMapCache[scopeID] == nil { return nil, fmt.Errorf("unknown scope ID to the Fleet plugin: %s", scopeID) } for _, name := range c.ScopeTenancyMapCache[scopeID] { results = append(results, resultFromMembership(name, c.ProjectNum)) } return results, nil } // Include all member clusters in the Fleet. for name := range c.MembershipTenancyMapCache { results = append(results, resultFromMembership(name, c.ProjectNum)) } return results, nil } func resultFromMembership(name, projectNum string) Result { parts := strings.Split(name, "/") region, membershipID := parts[3], parts[5] return Result{ ServerURL: connectGatewayURL(projectNum, region, membershipID), Name: fmt.Sprintf(clusterSecretNameTemplate, membershipID, region, projectNum), NameShort: fmt.Sprintf(membershipID), } } func connectGatewayURL(projectNum, region, membershipID string) string { if region == "global" { return fmt.Sprintf("https://connectgateway.googleapis.com/v1/projects/%s/locations/%s/gkeMemberships/%s", projectNum, region, membershipID) } return fmt.Sprintf("https://%s-connectgateway.googleapis.com/v1/projects/%s/locations/%s/gkeMemberships/%s", region, projectNum, region, membershipID) } // Refresh polls fleet API, rebuilds the local cached fleet topology map, and updates cluster secrets. func (c *FleetSync) Refresh(ctx context.Context) error { mems, err := c.listMemberships(ctx, c.ProjectNum) if err != nil { return fmt.Errorf("failed to list memberships: %w", err) } scopes, err := c.listScopes(ctx, c.ProjectNum) if err != nil { return fmt.Errorf("failed to list scopes: %w", err) } mbs, err := c.listMembershipBindings(ctx, c.ProjectNum) if err != nil { return fmt.Errorf("failed to list membership bindings: %w", err) } // Build one map from Memberships to a list of Scopes that the membership cluster is associated with, // and one reverse indexed map from Scopes to Memberships. memTenancyMap := make(map[string][]string) for _, mem := range mems { membershipName := mem.Name memTenancyMap[membershipName] = make([]string, 0) } scopeTenancyMap := make(map[string][]string) for _, s := range scopes { scopeID := s.Name scopeTenancyMap[scopeID] = make([]string, 0) } for _, binding := range mbs { // bindingName is in the format of // `projects/{project}/locations/{location}/memberships/{membership}/bindings/{membershipbinding}` bindingName := binding.Name parts := strings.Split(bindingName, "/") if len(parts) != 8 || parts[0] != "projects" || parts[2] != "locations" || parts[4] != "memberships" || parts[6] != "bindings" { fmt.Println("Invalid binding resource name format: %s", bindingName) continue } // Add the scope to the list for this membership membership := strings.Join(parts[:6], "/") scopeParts := strings.Split(binding.Scope, "/") if len(scopeParts) == 0 { fmt.Println("Invalid scope in binding (%s): %s", bindingName, binding.Scope) continue } scope := scopeParts[len(scopeParts)-1] memTenancyMap[membership] = append(memTenancyMap[membership], scope) scopeTenancyMap[scope] = append(scopeTenancyMap[scope], membership) } // Refresh cache. c.MembershipTenancyMapCache = memTenancyMap c.ScopeTenancyMapCache = scopeTenancyMap // Update cluster Secrets. if err := c.reconcileClusterSecrets(ctx); err != nil { return fmt.Errorf("failed to reconcile cluster secrets: %w", err) } return nil } func (c *FleetSync) reconcileClusterSecrets(ctx context.Context) error { // Create a Kubernetes clientset to apply resources. config, err := rest.InClusterConfig() if err != nil { return fmt.Errorf("failed to get in cluster config: %w", err) } clientset, err := kubernetes.NewForConfig(config) if err != nil { return fmt.Errorf("failed to create Kubernetes clientset: %w", err) } // Construct a map of cluster secrets, from name to manifest. clusterSecrets := make(map[string]string) for membership := range c.MembershipTenancyMapCache { parts := strings.Split(membership, "/") secretName := fmt.Sprintf(clusterSecretNameTemplate, parts[5], parts[3], c.ProjectNum) param := struct { Name string ConnectGatewayURL string }{ Name: secretName, ConnectGatewayURL: connectGatewayURL(c.ProjectNum, parts[3], parts[5]), } tmpl, err := template.New("secret").Parse(clusterSecretTemplate) if err != nil { return fmt.Errorf("failed to parse template: %w", err) } var secretManifest bytes.Buffer err = tmpl.Execute(&secretManifest, param) if err != nil { fmt.Println("Error creating Secret manifest:", err) continue } clusterSecrets[secretName] = secretManifest.String() } fmt.Println("Reconciling Cluster Secrets: %v", clusterSecrets) // Apply the Secret to the cluster. err = applySecrets(ctx, clientset, clusterSecrets) if err != nil { return fmt.Errorf("failed to apply secret: %w", err) } // Prune cluster secrets that are no longer existing in the Fleet. return pruneSecrets(ctx, clientset, clusterSecrets) } func applySecrets(ctx context.Context, clientset *kubernetes.Clientset, clusterSecrets map[string]string) error { secretsClient := clientset.CoreV1().Secrets("argocd") for _, manifest := range clusterSecrets { secret, err := secretFromManifest(manifest) if err != nil { return fmt.Errorf("error converting manifest %q to a k8s secret: %v", manifest, err) } _, err = secretsClient.Create(ctx, secret, metav1.CreateOptions{}) if err != nil { // Check if "already exists", then update. if !errors.IsAlreadyExists(err) { return fmt.Errorf("error creating secret: %v", err) } _, err = secretsClient.Update(ctx, secret, metav1.UpdateOptions{}) if err != nil { return fmt.Errorf("error updating secret: %v", err) } } } fmt.Println("Successfully applied Secrets.") return nil } func pruneSecrets(ctx context.Context, clientset *kubernetes.Clientset, clusterSecrets map[string]string) error { secretsClient := clientset.CoreV1().Secrets("argocd") listOptions := metav1.ListOptions{ LabelSelector: "argocd.argoproj.io/secret-type=cluster", } existingSecrets, err := secretsClient.List(ctx, listOptions) if err != nil { return fmt.Errorf("failed to list secrets: %w", err) } for _, secret := range existingSecrets.Items { // Skip secrets that are not managed by the fleet plugin. if secret.Annotations["fleet.gke.io/managed-by-fleet-plugin"] != "true" { continue } if _, exists := clusterSecrets[secret.Name]; !exists { // Secret no longer corresponds to a membership, delete it. err := secretsClient.Delete(ctx, secret.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("failed to delete secret: %w", err) } } } fmt.Println("Successfully pruned Secrets.") return nil } func secretFromManifest(manifest string) (*corev1.Secret, error) { // Universal deserializer can handle various Kubernetes object formats scheme := runtime.NewScheme() corev1.AddToScheme(scheme) decode := serializer.NewCodecFactory(scheme).UniversalDeserializer().Decode obj, _, err := decode([]byte(manifest), nil, nil) if err != nil { return nil, fmt.Errorf("error decoding manifest %q: %v", manifest, err) } // Type assertion to ensure it's a corev1.Secret secret, ok := obj.(*corev1.Secret) if !ok { return nil, fmt.Errorf("decoded object is not of type Secret") } return secret, nil } // listMemberships fetches the memberships under a given parent. func (c *FleetSync) listMemberships(ctx context.Context, project string) ([]*fleet.Membership, error) { var ret []*fleet.Membership parent := fmt.Sprintf("projects/%s/locations/-", project) call := c.svc.Projects.Locations.Memberships.List(parent) err := call.Pages(ctx, func(resp *fleet.ListMembershipsResponse) error { ret = append(ret, resp.Resources...) return nil }) if err != nil { return nil, err } return ret, nil } // listScopes fetches the scopes under a given parent. func (c *FleetSync) listScopes(ctx context.Context, project string) ([]*fleet.Scope, error) { var ret []*fleet.Scope parent := fmt.Sprintf("projects/%s/locations/global", project) call := c.svc.Projects.Locations.Scopes.List(parent) err := call.Pages(ctx, func(resp *fleet.ListScopesResponse) error { ret = append(ret, resp.Scopes...) return nil }) if err != nil { return nil, err } return ret, nil } // listMembershipBindings fetches the membership bindings under a given parent. func (c *FleetSync) listMembershipBindings(ctx context.Context, project string) ([]*fleet.MembershipBinding, error) { var ret []*fleet.MembershipBinding parent := fmt.Sprintf("projects/%s/locations/-/memberships/-", project) call := c.svc.Projects.Locations.Memberships.Bindings.List(parent) err := call.Pages(ctx, func(resp *fleet.ListMembershipBindingsResponse) error { ret = append(ret, resp.MembershipBindings...) return nil }) if err != nil { return nil, err } return ret, nil }