func()

in gke-deploy/deployer/deployer.go [308:561]


func (d *Deployer) Apply(ctx context.Context, clusterName, clusterLocation, clusterProject, config, namespace string, waitTimeout time.Duration, recursive bool) error {
	if d.ServerDryRun {
		fmt.Printf("Applying deployment in server dry run mode.\n")
	} else {
		fmt.Printf("Applying deployment.....\n")
	}

	if (clusterName != "" && clusterLocation == "") || (clusterName == "" && clusterLocation != "") {
		return fmt.Errorf("clusterName and clusterLocation either must both be provided, or neither should be provided")
	}
	if clusterProject == "" && d.UseGcloud {
		currentProject, err := gcp.GetProject(ctx, d.Clients.Gcloud)
		if err != nil {
			return fmt.Errorf("failed to get GCP project: %v", err)
		}
		clusterProject = currentProject
	}

	if clusterName != "" && clusterLocation != "" {
		fmt.Printf("Getting access to cluster %q in %q.\n", clusterName, clusterLocation)
		if err := cluster.AuthorizeAccess(ctx, clusterName, clusterLocation, clusterProject, d.UseGcloud, d.Clients.Gcloud); err != nil {
			if d.UseGcloud {
				account, err2 := gcp.GetAccount(ctx, d.Clients.Gcloud)
				if err2 != nil {
					fmt.Printf("Failed to get GCP account. Swallowing error: %v\n", err)
				}
				if err2 == nil {
					// TODO(joonlim): Find a better way to figure out if accountType is "user", "serviceAccount", or "group".
					accountType := "user"
					if strings.Contains(account, "gserviceaccount.com") {
						accountType = "serviceAccount"
					}

					fmt.Printf("> You may need to grant permission to access to the cluster:\n\n")
					fmt.Printf("   gcloud projects add-iam-policy-binding %s --member=%s:%s --role=roles/container.developer\n\n", clusterProject, accountType, account)
				}
			}
			fmt.Printf("> You may need to grant permission to access to the cluster:\n\n")
			fmt.Printf("   gcloud projects add-iam-policy-binding %s --member=<account-type>:<account> --role=roles/container.developer\n\n", clusterProject)
			return fmt.Errorf("failed to get access to cluster: %v", err)
		}
	}

	if strings.HasPrefix(config, "gs://") {

		tmpDir, err := d.Clients.OS.TempDir(ctx, "", k8sConfigStagingDir)
		if err != nil {
			return fmt.Errorf("failed to create tmp directory: %v", err)
		}
		defer d.Clients.OS.RemoveAll(ctx, tmpDir)
		ss := &gcs.GCS{
			GcsService: d.Clients.GCS,
		}
		err = ss.Download(ctx, config, tmpDir, recursive)
		if err != nil {
			return fmt.Errorf("failed to download configuration files from GCS %q: %v", config, err)
		}
		config = tmpDir
	}

	objs, err := resource.ParseConfigs(ctx, config, d.Clients.OS, recursive)
	if err != nil {
		return fmt.Errorf("failed to parse configuration files: %v", err)
	}
	if len(objs) == 0 {
		return fmt.Errorf("no objects found")
	}
	fmt.Printf("Configuration files to be used: %v\n", objs)

	exists := make(map[string]bool)
	var dups []string
	for _, obj := range objs {
		key := fmt.Sprintf("%v", obj)
		ok := exists[key]
		if ok {
			dups = append(dups, key)
		}
		exists[key] = true
	}
	if len(dups) > 0 {
		fmt.Fprintf(os.Stderr, "\nWARNING: Deploying multiple objects share the same kind and name. Duplicate objects will be overridden:\n")
		for _, obj := range dups {
			fmt.Fprintf(os.Stderr, "%v\n", obj)
		}
		fmt.Fprintln(os.Stderr)
	}

	fmt.Printf("Applying configuration files to cluster.\n")

	// Apply all namespace objects first, if they exist
	filteredObjs := make(resource.Objects, 0, len(objs))
	for _, obj := range objs {
		if resource.ObjectKind(obj) == "Namespace" {
			nsName, err := resource.ObjectName(obj)
			if err != nil {
				return fmt.Errorf("failed to get name of object: %v", err)
			}
			exists, err := cluster.DeployedObjectExists(ctx, "Namespace", nsName, "", d.Clients.Kubectl)
			if err != nil {
				return fmt.Errorf("failed to check if deployed object with kind \"Namespace\" and name %q exists: %v", nsName, err)
			}
			if !exists {
				fmt.Fprintf(os.Stderr, "\nWARNING: It is recommended that namespaces be created by an administrator. Creating namespace %q because it does not exist.\n\n", nsName)
				objString, err := resource.EncodeToYAMLString(obj)
				if err != nil {
					return fmt.Errorf("failed to encode obj to string")
				}
				if err := cluster.ApplyConfigFromString(ctx, objString, "", d.Clients.Kubectl); err != nil {
					return fmt.Errorf("failed to apply Namespace configuration file with name %q to cluster: %v", nsName, err)
				}
			}
		} else {
			// Delete namespace from list of objects to be deployed because it has already been deployed we do not want it to show up in the deployment summary.
			filteredObjs = append(filteredObjs, obj)
		}
	}

	objs = filteredObjs

	// Apply each config file individually vs applying the directory to avoid applying namespaces.
	// Namespace objects are removed from objs at this point.
	ensuredInstallApplicationCRD := false // Only need to do this once, in the case where the user provides more than one Application CR
	for _, obj := range objs {
		objName, err := resource.ObjectName(obj)
		if err != nil {
			return fmt.Errorf("failed to get name of object: %v", err)
		}

		if !ensuredInstallApplicationCRD && resource.ObjectKind(obj) == "Application" {
			if err := crd.EnsureInstallApplicationCRD(ctx, d.Clients.Kubectl); err != nil {
				return fmt.Errorf("failed to ensure installation of Application CRD on target cluster: %v", err)
			}
			ensuredInstallApplicationCRD = true
		}

		objString, err := resource.EncodeToYAMLString(obj)
		if err != nil {
			return fmt.Errorf("failed to encode obj to string")
		}
		// If namespace == "", uses the namespace defined in each config.
		if err := cluster.ApplyConfigFromString(ctx, objString, namespace, d.Clients.Kubectl); err != nil {
			return fmt.Errorf("failed to apply %s configuration file with name %q to cluster: %v", resource.ObjectKind(obj), objName, err)
		}
	}

	deployedObjs := map[string]map[string]resource.Object{}
	summaryObjs := make(resource.Objects, 0, len(objs))
	timedOut := false

	if d.ServerDryRun {
		fmt.Printf("Server-side dry run deployment succeeded.\n\n")
		return nil
	}

	fmt.Printf("\nWaiting for deployed objects to be ready with timeout of %v\n", waitTimeout)
	start := time.Now()
	end := start.Add(waitTimeout)
	periodicMsgInterval := 30 * time.Second
	nextPeriodicMsg := time.Now().Add(periodicMsgInterval)
	ticker := time.NewTicker(5 * time.Second)
	for len(objs) > 0 {

		filteredObjs := make(resource.Objects, 0, len(objs))

		for _, obj := range objs {
			kind := resource.ObjectKind(obj)
			name, err := resource.ObjectName(obj)
			if err != nil {
				return fmt.Errorf("failed to get name of object: %v", err)
			}
			objNamespace := ""
			if namespace == "" {
				ns, err := resource.ObjectNamespace(obj)
				if err != nil {
					return fmt.Errorf("failed to get namespace of object: %v", err)
				}
				objNamespace = ns
			} else {
				objNamespace = namespace
			}
			deployedObj, err := cluster.GetDeployedObject(ctx, kind, name, objNamespace, d.Clients.Kubectl)
			if err != nil {
				return fmt.Errorf("failed to get configuration of deployed object with kind %q and name %q: %v", kind, name, err)
			}
			if deployedObjs[kind] == nil {
				deployedObjs[kind] = map[string]resource.Object{}
			}
			deployedObjs[kind][name] = *deployedObj
			ok, err := resource.IsReady(ctx, deployedObj)
			if err != nil {
				return fmt.Errorf("failed to check if deployed object with kind %q and name %q is ready: %v", kind, name, err)
			}
			if ok {
				dur := time.Now().Sub(start).Round(time.Second / 10) // Round to nearest 0.1 seconds
				fmt.Printf("Deployed object with kind %q and name %q is ready after %v\n", kind, name, dur)
			} else {
				filteredObjs = append(filteredObjs, obj)
			}
		}

		objs = filteredObjs

		if len(objs) == 0 {
			// Break out here to avoid waiting for ticker.
			break
		}
		if time.Now().After(end) {
			timedOut = true
			break
		}
		if time.Now().After(nextPeriodicMsg) {
			fmt.Printf("Still waiting on %d object(s) to be ready: %v\n", len(objs), objs)
			nextPeriodicMsg = nextPeriodicMsg.Add(periodicMsgInterval)
		}
		select {
		case <-ticker.C:
		}
	}

	fmt.Printf("Finished applying deployment.\n\n")

	for _, nameMap := range deployedObjs {
		for k, _ := range nameMap {
			o := nameMap[k]
			summaryObjs = append(summaryObjs, &o)
		}
	}
	summary, err := resource.DeploySummary(ctx, summaryObjs)
	if err != nil {
		return fmt.Errorf("failed to get summary of deployed objects: %v", err)
	}

	fmt.Printf("################################################################################\n")
	fmt.Printf("> Deployed Objects\n\n")
	fmt.Printf("%s\n", summary)

	fmt.Printf("################################################################################\n")

	if clusterProject != "" {
		links, err := d.gkeLinks(clusterProject)
		if err != nil {
			return fmt.Errorf("failed to get GKE links: %v", err)
		}

		fmt.Printf("> GKE\n\n")
		fmt.Printf("%s\n", links)
	}

	if timedOut {
		return fmt.Errorf("timed out after %v while waiting for deployed objects to be ready", waitTimeout)
	}

	return nil
}