images/controller/cmd/image_puller/image_puller.go (254 lines of code) (raw):
/*
Copyright 2019 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"crypto/sha1"
"fmt"
"io"
"log"
"os"
"os/exec"
"path"
"regexp"
"strings"
"sync"
"text/template"
"time"
"github.com/Masterminds/sprig"
broker "selkies.io/controller/pkg"
)
const loopInterval = 2
func main() {
log.Printf("Starting image puller")
// Set from downward API.
namespace := os.Getenv("NAMESPACE")
if len(namespace) == 0 {
log.Fatal("Missing NAMESPACE env.")
}
// Set from downward API.
nodeName := os.Getenv("NODE_NAME")
if len(nodeName) == 0 {
log.Fatal("Missing NODE_NAME env.")
}
templatePath := os.Getenv("TEMPLATE_PATH")
if len(templatePath) == 0 {
templatePath = "/run/image-puller/template/image-pull-job.yaml.tmpl"
}
// optional polling mode.
loop := false
if os.Getenv("IMAGE_PULL_LOOP") == "true" {
loop = true
}
// configure docker with gcloud credentials
cmd := exec.Command("gcloud", "auth", "configure-docker", "-q")
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Fatalf("failed to configure docker with gcloud: %s, %v", string(stdoutStderr), err)
}
// Obtain SA email
saEmail, err := broker.GetServiceAccountFromMetadataServer()
if err != nil {
log.Fatalf("failed to get service account email: %v", err)
}
var wg sync.WaitGroup
for {
token, err := broker.GetServiceAccountTokenFromMetadataServer(saEmail)
if err != nil {
log.Fatalf("failed to get service account token: %v", err)
if loop {
time.Sleep(loopInterval * time.Second)
} else {
break
}
}
images, err := findImageTags(namespace)
if err != nil {
log.Fatal(err)
}
nodeImages, err := broker.GetImagesOnNode()
if err != nil {
log.Fatal(err)
}
currJobs, err := broker.GetJobs(namespace, "app=image-pull")
if err != nil {
log.Fatal(err)
}
// Process images in parallel
wg.Add(len(images))
for _, image := range images {
go func(image, token string) {
// Fetch image details for images in the form of: "gcr.io.*:tag"
if image[:6] == "gcr.io" {
imageWithDigest, err := getImageDigest(image, token)
if err != nil {
log.Printf("error fetching image digest: %s, %v", image, err)
} else {
// Check if image is already on node.
imageOnNode := false
for _, nodeImage := range nodeImages {
if fmt.Sprintf("%s@%s", nodeImage.Repository, nodeImage.Digest) == imageWithDigest {
imageOnNode = true
}
}
if !imageOnNode {
// Check to see if job is active.
jobFound := false
for _, job := range currJobs {
if metaValue, ok := job.Metadata["annotations"]; ok {
annotations := metaValue.(map[string]interface{})
if imagePullAnnotation, ok := annotations["pod.broker/image-pull"]; ok {
if imagePullAnnotation.(string) == fmt.Sprintf("%s,%s", nodeName, imageWithDigest) {
// Found a job with matching image digest.
jobFound = true
}
} else {
log.Printf("missing pod.broker/image-pull annotation on job")
}
} else {
log.Printf("failed to get job annotations")
}
}
if !jobFound {
tag := getTagFromImage(image)
if err := makeImagePullJob(imageWithDigest, tag, nodeName, namespace, templatePath); err != nil {
log.Printf("failed to make job: %v", err)
}
}
}
}
} else {
// Non-gcr image
log.Printf("skipping pull of non-gcr image: %s", image)
}
wg.Done()
}(image, token)
}
wg.Wait()
// Delete completed jobs.
for _, job := range currJobs {
if metaValue, ok := job.Metadata["annotations"]; ok {
annotations := metaValue.(map[string]interface{})
if imagePullAnnotation, ok := annotations["pod.broker/image-pull"]; ok {
if strings.Split(imagePullAnnotation.(string), ",")[0] == nodeName {
// Found job for node.
jobName := job.Metadata["name"].(string)
if job.Status.Succeeded > 0 {
log.Printf("deleting completed job: %s", jobName)
cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl delete job -n %s %s 1>&2", namespace, jobName))
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error calling kubectl to delete job: %v\n%s", err, string(stdoutStderr))
}
}
}
} else {
log.Printf("missing pod.broker/image-pull annotation on job")
}
}
}
if loop {
time.Sleep(loopInterval * time.Second)
} else {
break
}
}
}
// Returns de-duplicated list of image tags from:
// 1. broker apps spec.defaultRepo:defaultTag
// 2. broker apps spec.images[].newRepo:newTag structure
// 3. user config spec.ImageRepo:imageTag
func findImageTags(namespace string) ([]string, error) {
uniqueImages := make(map[string]bool, 0)
// Fetch all broker apps
appConfigs, err := broker.FetchBrokerAppConfigs(namespace)
if err != nil {
log.Printf("failed to fetch broker app configs: %v", err)
}
for _, appConfig := range appConfigs {
uniqueImages[makeImageName(appConfig.Spec.DefaultRepo, appConfig.Spec.DefaultTag)] = true
for _, imageSpec := range appConfig.Spec.Images {
uniqueImages[makeImageName(imageSpec.NewRepo, imageSpec.NewTag)] = true
}
}
// Fetch all user app configs
userConfigs, err := broker.FetchAppUserConfigs()
if err != nil {
log.Printf("failed to fetch user app configs: %v", err)
} else {
for _, userConfig := range userConfigs {
uniqueImages[makeImageName(userConfig.Spec.ImageRepo, userConfig.Spec.ImageTag)] = true
}
}
images := make([]string, 0)
for image := range uniqueImages {
images = append(images, image)
}
return images, nil
}
func makeImageName(repo, tag string) string {
return fmt.Sprintf("%s:%s", repo, tag)
}
// Find and verify image digest
func getImageDigest(image, accessToken string) (string, error) {
respImage := ""
imageTags, err := broker.ListGCRImageTags(image, accessToken)
if err != nil {
return respImage, err
}
if len(regexp.MustCompile(broker.GCRImageWithTagPattern).FindAllString(image, -1)) > 0 {
// Find image digest from tag.
imageToks := strings.Split(strings.ReplaceAll(image, "gcr.io/", ""), ":")
imageRepo := imageToks[0]
imageTag := imageToks[1]
for digest, meta := range imageTags.Manifest {
for _, tag := range meta.Tag {
if tag == imageTag {
respImage = fmt.Sprintf("gcr.io/%s@%s", imageRepo, digest)
break
}
}
if len(respImage) > 0 {
break
}
}
}
if len(regexp.MustCompile(broker.GCRImageWithDigestPattern).FindAllString(image, -1)) > 0 {
// Verify image digest is in list response.
imageDigest := strings.Split(strings.ReplaceAll(image, "gcr.io/", ""), "@")[1]
if _, ok := imageTags.Manifest[imageDigest]; ok {
respImage = image
}
}
if len(respImage) == 0 {
return respImage, fmt.Errorf("failed to find digest for image: %s", image)
}
return respImage, nil
}
// Extract tag from image repo:tag format, else return empty string.
func getTagFromImage(image string) string {
if len(regexp.MustCompile(broker.GCRImageWithTagPattern).FindAllString(image, -1)) > 0 {
return strings.Split(strings.ReplaceAll(image, "gcr.io/", ""), ":")[1]
} else {
return ""
}
}
// Check if job is currently running.
// If running, return (non-fatal) error.
// If not running, apply job to given namespace.
func makeImagePullJob(image, tag, nodeName, namespace, templatePath string) error {
imageToks := strings.Split(strings.ReplaceAll(image, "gcr.io/", ""), "@sha256:")
imageBase := path.Base(imageToks[0])
digestHash := imageToks[1]
h := sha1.New()
io.WriteString(h, fmt.Sprintf("%s", nodeName))
nodeNameHash := fmt.Sprintf("%x", h.Sum(nil))
nameSuffix := fmt.Sprintf("%s-%s-%s", imageBase, digestHash[:5], nodeNameHash[:5])
log.Printf("creating image pull job: %s, %s, %s", nodeName, image, nameSuffix)
type templateData struct {
NameSuffix string
NodeName string
Image string
Tag string
}
data := templateData{
NameSuffix: nameSuffix,
NodeName: nodeName,
Image: image,
Tag: tag,
}
destDir := path.Join("/run/image-puller", nameSuffix)
if err := os.MkdirAll(destDir, os.ModePerm); err != nil {
return fmt.Errorf("failed to make destDir %s: %v", destDir, err)
}
base := path.Base(templatePath)
t, err := template.New(base).Funcs(sprig.TxtFuncMap()).ParseFiles(templatePath)
if err != nil {
return fmt.Errorf("failed to initialize template: %v", err)
}
dest, _ := os.Create(strings.ReplaceAll(path.Join(destDir, base), ".tmpl", ""))
if err != nil {
return fmt.Errorf("failed to create dest template file: %v", err)
}
if err = t.Execute(dest, &data); err != nil {
return fmt.Errorf("failed to execute template: %v", err)
}
// Apply the job to the cluster
cmd := exec.Command("sh", "-c", fmt.Sprintf("kubectl apply -f %s 1>&2", destDir))
cmd.Dir = path.Dir(destDir)
stdoutStderr, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("error calling kubectl to apply job: %v\n%s", err, string(stdoutStderr))
}
return nil
}