in images/controller/cmd/image_puller/image_puller.go [39:193]
func main() {
log.Printf("Starting image puller")
// Set from downward API.
namespace := os.Getenv("NAMESPACE")
if len(namespace) == 0 {
log.Fatal("Missing NAMESPACE env.")
}
// Set from downward API.
nodeName := os.Getenv("NODE_NAME")
if len(nodeName) == 0 {
log.Fatal("Missing NODE_NAME env.")
}
templatePath := os.Getenv("TEMPLATE_PATH")
if len(templatePath) == 0 {
templatePath = "/run/image-puller/template/image-pull-job.yaml.tmpl"
}
// optional polling mode.
loop := false
if os.Getenv("IMAGE_PULL_LOOP") == "true" {
loop = true
}
// configure docker with gcloud credentials
cmd := exec.Command("gcloud", "auth", "configure-docker", "-q")
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Fatalf("failed to configure docker with gcloud: %s, %v", string(stdoutStderr), err)
}
// Obtain SA email
saEmail, err := broker.GetServiceAccountFromMetadataServer()
if err != nil {
log.Fatalf("failed to get service account email: %v", err)
}
var wg sync.WaitGroup
for {
token, err := broker.GetServiceAccountTokenFromMetadataServer(saEmail)
if err != nil {
log.Fatalf("failed to get service account token: %v", err)
if loop {
time.Sleep(loopInterval * time.Second)
} else {
break
}
}
images, err := findImageTags(namespace)
if err != nil {
log.Fatal(err)
}
nodeImages, err := broker.GetImagesOnNode()
if err != nil {
log.Fatal(err)
}
currJobs, err := broker.GetJobs(namespace, "app=image-pull")
if err != nil {
log.Fatal(err)
}
// Process images in parallel
wg.Add(len(images))
for _, image := range images {
go func(image, token string) {
// Fetch image details for images in the form of: "gcr.io.*:tag"
if image[:6] == "gcr.io" {
imageWithDigest, err := getImageDigest(image, token)
if err != nil {
log.Printf("error fetching image digest: %s, %v", image, err)
} else {
// Check if image is already on node.
imageOnNode := false
for _, nodeImage := range nodeImages {
if fmt.Sprintf("%s@%s", nodeImage.Repository, nodeImage.Digest) == imageWithDigest {
imageOnNode = true
}
}
if !imageOnNode {
// Check to see if job is active.
jobFound := false
for _, job := range currJobs {
if metaValue, ok := job.Metadata["annotations"]; ok {
annotations := metaValue.(map[string]interface{})
if imagePullAnnotation, ok := annotations["pod.broker/image-pull"]; ok {
if imagePullAnnotation.(string) == fmt.Sprintf("%s,%s", nodeName, imageWithDigest) {
// Found a job with matching image digest.
jobFound = true
}
} else {
log.Printf("missing pod.broker/image-pull annotation on job")
}
} else {
log.Printf("failed to get job annotations")
}
}
if !jobFound {
tag := getTagFromImage(image)
if err := makeImagePullJob(imageWithDigest, tag, nodeName, namespace, templatePath); err != nil {
log.Printf("failed to make job: %v", err)
}
}
}
}
} else {
// Non-gcr image
log.Printf("skipping pull of non-gcr image: %s", image)
}
wg.Done()
}(image, token)
}
wg.Wait()
// Delete completed jobs.
for _, job := range currJobs {
if metaValue, ok := job.Metadata["annotations"]; ok {
annotations := metaValue.(map[string]interface{})
if imagePullAnnotation, ok := annotations["pod.broker/image-pull"]; ok {
if strings.Split(imagePullAnnotation.(string), ",")[0] == nodeName {
// Found job for node.
jobName := job.Metadata["name"].(string)
if job.Status.Succeeded > 0 {
log.Printf("deleting completed job: %s", jobName)
cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl delete job -n %s %s 1>&2", namespace, jobName))
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error calling kubectl to delete job: %v\n%s", err, string(stdoutStderr))
}
}
}
} else {
log.Printf("missing pod.broker/image-pull annotation on job")
}
}
}
if loop {
time.Sleep(loopInterval * time.Second)
} else {
break
}
}
}