func main()

in images/controller/cmd/image_puller/image_puller.go [39:193]


func main() {

	log.Printf("Starting image puller")

	// Set from downward API.
	namespace := os.Getenv("NAMESPACE")
	if len(namespace) == 0 {
		log.Fatal("Missing NAMESPACE env.")
	}

	// Set from downward API.
	nodeName := os.Getenv("NODE_NAME")
	if len(nodeName) == 0 {
		log.Fatal("Missing NODE_NAME env.")
	}

	templatePath := os.Getenv("TEMPLATE_PATH")
	if len(templatePath) == 0 {
		templatePath = "/run/image-puller/template/image-pull-job.yaml.tmpl"
	}

	// optional polling mode.
	loop := false
	if os.Getenv("IMAGE_PULL_LOOP") == "true" {
		loop = true
	}

	// configure docker with gcloud credentials
	cmd := exec.Command("gcloud", "auth", "configure-docker", "-q")
	stdoutStderr, err := cmd.CombinedOutput()
	if err != nil {
		log.Fatalf("failed to configure docker with gcloud: %s, %v", string(stdoutStderr), err)
	}

	// Obtain SA email
	saEmail, err := broker.GetServiceAccountFromMetadataServer()
	if err != nil {
		log.Fatalf("failed to get service account email: %v", err)
	}

	var wg sync.WaitGroup

	for {
		token, err := broker.GetServiceAccountTokenFromMetadataServer(saEmail)
		if err != nil {
			log.Fatalf("failed to get service account token: %v", err)
			if loop {
				time.Sleep(loopInterval * time.Second)
			} else {
				break
			}
		}

		images, err := findImageTags(namespace)
		if err != nil {
			log.Fatal(err)
		}

		nodeImages, err := broker.GetImagesOnNode()
		if err != nil {
			log.Fatal(err)
		}

		currJobs, err := broker.GetJobs(namespace, "app=image-pull")
		if err != nil {
			log.Fatal(err)
		}

		// Process images in parallel
		wg.Add(len(images))
		for _, image := range images {

			go func(image, token string) {
				// Fetch image details for images in the form of: "gcr.io.*:tag"
				if image[:6] == "gcr.io" {
					imageWithDigest, err := getImageDigest(image, token)
					if err != nil {
						log.Printf("error fetching image digest: %s, %v", image, err)
					} else {
						// Check if image is already on node.
						imageOnNode := false
						for _, nodeImage := range nodeImages {
							if fmt.Sprintf("%s@%s", nodeImage.Repository, nodeImage.Digest) == imageWithDigest {
								imageOnNode = true
							}
						}

						if !imageOnNode {
							// Check to see if job is active.
							jobFound := false
							for _, job := range currJobs {
								if metaValue, ok := job.Metadata["annotations"]; ok {
									annotations := metaValue.(map[string]interface{})
									if imagePullAnnotation, ok := annotations["pod.broker/image-pull"]; ok {
										if imagePullAnnotation.(string) == fmt.Sprintf("%s,%s", nodeName, imageWithDigest) {
											// Found a job with matching image digest.
											jobFound = true
										}
									} else {
										log.Printf("missing pod.broker/image-pull annotation on job")
									}
								} else {
									log.Printf("failed to get job annotations")
								}
							}

							if !jobFound {
								tag := getTagFromImage(image)
								if err := makeImagePullJob(imageWithDigest, tag, nodeName, namespace, templatePath); err != nil {
									log.Printf("failed to make job: %v", err)
								}
							}
						}
					}

				} else {
					// Non-gcr image
					log.Printf("skipping pull of non-gcr image: %s", image)
				}
				wg.Done()
			}(image, token)
		}

		wg.Wait()

		// Delete completed jobs.
		for _, job := range currJobs {
			if metaValue, ok := job.Metadata["annotations"]; ok {
				annotations := metaValue.(map[string]interface{})
				if imagePullAnnotation, ok := annotations["pod.broker/image-pull"]; ok {
					if strings.Split(imagePullAnnotation.(string), ",")[0] == nodeName {
						// Found job for node.
						jobName := job.Metadata["name"].(string)
						if job.Status.Succeeded > 0 {
							log.Printf("deleting completed job: %s", jobName)
							cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl delete job -n %s %s 1>&2", namespace, jobName))
							stdoutStderr, err := cmd.CombinedOutput()
							if err != nil {
								log.Printf("error calling kubectl to delete job: %v\n%s", err, string(stdoutStderr))
							}
						}
					}
				} else {
					log.Printf("missing pod.broker/image-pull annotation on job")
				}
			}
		}

		if loop {
			time.Sleep(loopInterval * time.Second)
		} else {
			break
		}
	}
}