pkg/containerd/store.go (269 lines of code) (raw):
// Initial Copyright (c) 2023 Xenit AB and 2024 The Spegel Authors.
// Portions Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package containerd
import (
"context"
"encoding/json"
"fmt"
"io"
"net/url"
"sort"
"strings"
"github.com/containerd/containerd"
eventtypes "github.com/containerd/containerd/api/events"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/platforms"
"github.com/containerd/typeurl/v2"
"github.com/distribution/distribution/manifest"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)
const (
// DefaultSock is the default containerd socket path.
DefaultSock = "/run/containerd/containerd.sock"
// DefaultNamespace is the default containerd namespace for this client.
DefaultNamespace = "k8s.io"
)
// Store is the interface for all containerd content store artifacts.
type Store interface {
// Subscribe returns a channel of artifacts and a channel of errors.
// Artifacts are sent on the channel as they are discovered.
Subscribe(ctx context.Context) (<-chan Reference, <-chan error)
// List returns a list of artifacts.
List(ctx context.Context) ([]Reference, error)
// Resolve returns the digest for an existing artifact.
Resolve(ctx context.Context, ref string) (digest.Digest, error)
// Size returns the size of the artifact.
Size(ctx context.Context, dgst digest.Digest) (int64, error)
// Bytes returns the artifact bytes.
Bytes(ctx context.Context, dgst digest.Digest) ([]byte, string, error)
// Write writes the artifact bytes to the writer.
Write(ctx context.Context, dst io.Writer, dgst digest.Digest) error
// Verify will verify that the client status is healthy.
Verify(ctx context.Context) error
// All returns a list of digests of all resources referenced in ref.
All(ctx context.Context, ref Reference) ([]string, error)
}
// store provides an interface to the containerd content store.
type store struct {
client *containerd.Client
platform platforms.MatchComparer
// Filters for list and event subscriptions.
// The syntax of these filters is defined here: https://github.com/containerd/containerd/blob/main/filters/filter.go
listFilter string
eventFilter string
}
var _ Store = &store{}
// NewDefaultStore creates a new Store with default values for containerd socket, namespace and hosts configuration path.
func NewDefaultStore(hosts []string) (Store, error) {
return NewStore(DefaultSock, DefaultNamespace, hosts)
}
// NewStore creates a new Store.
func NewStore(sock, ns string, hosts []string) (Store, error) {
if sock == "" {
return nil, fmt.Errorf("containerd socket path cannot be empty")
}
if ns == "" {
return nil, fmt.Errorf("containerd namespace cannot be empty")
}
client, err := containerd.New(sock, containerd.WithDefaultNamespace(ns))
if err != nil {
return nil, fmt.Errorf("could not create containerd client: %w", err)
}
return newStore(hosts, client)
}
func newStore(hosts []string, client *containerd.Client) (*store, error) {
for _, host := range hosts {
_, err := url.Parse(host)
if err != nil {
return nil, err
}
}
return &store{
client: client,
platform: platforms.Default(),
listFilter: getListFilter(hosts),
eventFilter: getEventFilter(hosts),
}, nil
}
// Verify will verify that the containerd service is serving at the configured socket.
func (c *store) Verify(ctx context.Context) error {
ok, err := c.client.IsServing(ctx)
if err != nil {
return err
} else if !ok {
return fmt.Errorf("could not reach containerd service")
}
return nil
}
// Subscribe provides a subscription to containerd events on the configured hosts artifacts.
// It also returns a channel of errors.
func (c *store) Subscribe(ctx context.Context) (<-chan Reference, <-chan error) {
refChan := make(chan Reference)
errChan := make(chan error)
eventsChan, eventsErrChan := c.client.EventService().Subscribe(ctx, c.eventFilter)
go func() {
for event := range eventsChan {
name, err := getEventImageName(event.Event)
if err != nil {
errChan <- err
continue
}
image, err := c.client.GetImage(ctx, name)
if err != nil {
errChan <- err
continue
}
ref, err := ParseReference(image.Name(), image.Target().Digest)
if err != nil {
errChan <- err
} else {
refChan <- ref
}
}
}()
go func() {
for err := range eventsErrChan {
errChan <- err
}
}()
return refChan, errChan
}
// List returns the list of locally found images.
func (c *store) List(ctx context.Context) ([]Reference, error) {
imgs, err := c.client.ListImages(ctx, c.listFilter)
if err != nil {
return nil, err
}
refs := []Reference{}
for _, img := range imgs {
ref, err := ParseReference(img.Name(), img.Target().Digest)
if err != nil {
return nil, err
}
refs = append(refs, ref)
}
return refs, nil
}
// All returns a list of digests of all resources referenced in ref.
func (c *store) All(ctx context.Context, ref Reference) ([]string, error) {
img, err := c.client.ImageService().Get(ctx, ref.Name())
if err != nil {
return nil, err
}
keys := []string{}
err = images.Walk(ctx, images.HandlerFunc(func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
keys = append(keys, desc.Digest.String())
switch desc.MediaType {
case images.MediaTypeDockerSchema2ManifestList, ocispec.MediaTypeImageIndex:
var idx ocispec.Index
b, err := content.ReadBlob(ctx, c.client.ContentStore(), desc)
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &idx); err != nil {
return nil, err
}
var descs []ocispec.Descriptor
for _, m := range idx.Manifests {
if !c.platform.Match(*m.Platform) {
continue
}
descs = append(descs, m)
}
if len(descs) == 0 {
return nil, fmt.Errorf("could not find platform architecture in manifest: %v", desc.Digest)
}
// Platform matching is a bit weird in that multiple platforms can match.
// There is however a "best" match that should be used.
// This logic is used by Containerd to determine which layer to pull so we should use the same logic.
sort.SliceStable(descs, func(i, j int) bool {
if descs[i].Platform == nil {
return false
}
if descs[j].Platform == nil {
return true
}
return c.platform.Less(*descs[i].Platform, *descs[j].Platform)
})
return []ocispec.Descriptor{descs[0]}, nil
case images.MediaTypeDockerSchema2Manifest, ocispec.MediaTypeImageManifest:
var manifest ocispec.Manifest
b, err := content.ReadBlob(ctx, c.client.ContentStore(), desc)
if err != nil {
return nil, err
}
if err := json.Unmarshal(b, &manifest); err != nil {
return nil, err
}
keys = append(keys, manifest.Config.Digest.String())
for _, layer := range manifest.Layers {
keys = append(keys, layer.Digest.String())
}
return nil, nil
default:
return nil, fmt.Errorf("unexpected media type %v for digest: %v", desc.MediaType, desc.Digest)
}
}), img.Target)
if err != nil {
return nil, fmt.Errorf("failed to walk image manifests: %w", err)
}
if len(keys) == 0 {
return nil, fmt.Errorf("no image digests found")
}
return keys, nil
}
// Resolve returns the digest for an existing artifact.
func (c *store) Resolve(ctx context.Context, ref string) (digest.Digest, error) {
cImg, err := c.client.GetImage(ctx, ref)
if err != nil {
return "", err
}
return cImg.Target().Digest, nil
}
// Size returns the size of the artifact.
func (c *store) Size(ctx context.Context, dgst digest.Digest) (int64, error) {
info, err := c.client.ContentStore().Info(ctx, dgst)
if err != nil {
return 0, err
}
return info.Size, nil
}
// Bytes returns the artifact bytes. This method should only be used for manifests.
func (c *store) Bytes(ctx context.Context, dgst digest.Digest) ([]byte, string, error) {
b, err := content.ReadBlob(ctx, c.client.ContentStore(), ocispec.Descriptor{Digest: dgst})
if err != nil {
return nil, "", err
}
var m manifest.Versioned
if err := json.Unmarshal(b, &m); err != nil {
return nil, "", err
}
return b, m.MediaType, nil
}
// Write writes the blob bytes to the writer.
func (c *store) Write(ctx context.Context, dst io.Writer, dgst digest.Digest) error {
ra, err := c.client.ContentStore().ReaderAt(ctx, ocispec.Descriptor{Digest: dgst})
if err != nil {
return err
}
defer func() {
if closeErr := ra.Close(); closeErr != nil {
if err != nil {
err = fmt.Errorf("multiple errors: %v; %v", err, closeErr)
} else {
err = closeErr
}
}
}()
_, err = io.Copy(dst, content.NewReader(ra))
if err != nil {
return err
}
return nil
}
// getEventImageName will get the image name from an event.
func getEventImageName(e typeurl.Any) (string, error) {
evt, err := typeurl.UnmarshalAny(e)
if err != nil {
return "", fmt.Errorf("failed to unmarshal any: %w", err)
}
switch e := evt.(type) {
case *eventtypes.ImageCreate:
return e.Name, nil
case *eventtypes.ImageUpdate:
return e.Name, nil
default:
return "", fmt.Errorf("unsupported event: %v", e)
}
}
func getListFilter(hosts []string) string {
return fmt.Sprintf(`name~="%s"`, strings.Join(getHostNames(hosts), "|"))
}
func getEventFilter(hosts []string) string {
return fmt.Sprintf(`topic~="/images/create|/images/update",event.name~="%s"`, strings.Join(getHostNames(hosts), "|"))
}
func getHostNames(hosts []string) []string {
names := []string{}
for _, host := range hosts {
if !strings.HasPrefix(host, "http://") && !strings.HasPrefix(host, "https://") {
host = "http://" + host
}
u, err := url.Parse(host)
if err != nil {
// Use the host as is.
names = append(names, host)
} else {
names = append(names, u.Host)
}
}
return names
}