ftl/common/cache.py (168 lines of code) (raw):
# Copyright 2017 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""This package defines the interface for caching objects."""
import abc
import datetime
import json
import logging
import os
from ftl.common import constants
from containerregistry.client import docker_name
from containerregistry.client import docker_creds
from containerregistry.client.v2_2 import docker_image
from containerregistry.client.v2_2 import docker_session
from containerregistry.client.v2_2 import docker_http
from ftl.common import ftl_util
class Base(object):
"""Base is an abstract base class representing a layer cache.
It provides methods used by builders for accessing and storing layers.
"""
# __enter__ and __exit__ allow use as a context manager.
@abc.abstractmethod
def __enter__(self):
"""Initialize the context."""
def __exit__(self, unused_type, unused_value, unused_traceback):
"""Cleanup the context."""
@abc.abstractmethod
def Get(self, cache_key):
"""Lookup a cached image.
Args:
cache_key: the cache_key of the package descriptor atop our base.
Returns:
the docker_image.Image of the cache hit, or None.
"""
@abc.abstractmethod
def Set(self, cache_key, value):
"""Set an entry in the cache.
Args:
cache_key: the cache_key of the package descriptor atop our base.
value: the docker_image.Image to store into the cache.
"""
class Registry(Base):
"""Registry is a cache implementation that stores layers in a registry.
It stores layers under a 'namespace', with a tag derived from the layer
cache_key. For example: gcr.io/$repo/$namespace:$cache_key
"""
def __init__(
self,
repo,
namespace,
creds,
transport,
ttl,
threads=1,
should_cache=True,
should_upload=True,
mount=None,
use_global=False,
export_stats=False,
export_location=None,
):
super(Registry, self).__init__()
self._repo = repo
self._namespace = namespace
self._creds = creds
_reg_name = '{base}/{namespace}'.format(
base=constants.GLOBAL_CACHE_REGISTRY, namespace=self._namespace)
# TODO(nkubala): default this to true to point builds to global cache
self._use_global = use_global
if use_global:
_reg = docker_name.Registry(_reg_name)
self._global_creds = docker_creds.DefaultKeychain.Resolve(_reg)
self._export_stats = export_stats
self._export_location = export_location
self._transport = transport
self._threads = threads
self._mount = mount or []
self._should_cache = should_cache
self._should_upload = should_upload
self._ttl = ttl
def _tag(self, cache_key, repo=None):
return docker_name.Tag('{repo}/{namespace}:{tag}'.format(
repo=repo or str(self._repo),
namespace=self._namespace,
tag=cache_key))
def Get(self, cache_key):
if not self._should_cache:
logging.info("--no-cache flag set, cache won't be checked")
return
"""Attempt to retrieve value from cache."""
logging.debug('Checking cache for cache_key %s', cache_key)
hit = self._getEntry(cache_key)
if hit:
logging.info('Found cached dependency layer for %s' % cache_key)
try:
if Registry.checkTTL(hit, self._ttl):
return hit
else:
logging.info(
'TTL expired for cached image, \
rebuilding %s' % cache_key)
except docker_http.V2DiagnosticException:
logging.info('Fetching cached dep layer for %s failed, \
rebuilding' % cache_key)
return
else:
logging.info('No cached dependency layer for %s' % cache_key)
def _getEntry(self, cache_key):
"""Retrieve value from cache."""
# check global cache first
cache_results = []
(img, cache_status) = self._validateEntry(
self._getGlobalEntry(cache_key), cache_key)
cache_results.append(
Registry.buildCacheResult("global", cache_key, cache_status))
if img:
logging.info(
'Found dependency layer for %s in global cache' % cache_key)
self._maybeExportCacheResult(cache_results)
return img
# if we get a global cache miss, check the local cache
(img, cache_status) = self._validateEntry(
self._getLocalEntry(cache_key), cache_key)
cache_results.append(
Registry.buildCacheResult("project", cache_key, cache_status))
if img:
logging.info(
'Found dependency layer for %s in local cache' % cache_key)
self._maybeExportCacheResult(cache_results)
return img
def _getGlobalEntry(self, cache_key):
if self._use_global:
key = self._tag(cache_key, constants.GLOBAL_CACHE_REGISTRY)
entry = Registry.getEntryFromCreds(key, self._global_creds,
self._transport)
if not entry:
# TODO(nkubala): standardize this log message so we can
# crawl cloudbuild logs for cache misses
logging.info('Cache miss on global cache for %s', key)
return entry
def _getLocalEntry(self, cache_key):
key = self._tag(cache_key)
entry = Registry.getEntryFromCreds(key, self._creds, self._transport)
if not entry:
logging.info('Cache miss on local cache for %s', key)
return entry
def _validateEntry(self, entry, cache_key):
if entry:
try:
if Registry.checkTTL(entry, self._ttl):
return entry, "HIT"
else:
logging.info('TTL expired for cached image %s' % cache_key)
return None, "HIT_TOO_OLD"
except docker_http.V2DiagnosticException:
logging.info(
'Fetching cached dep layer for %s failed' % cache_key)
return None, "MISS"
def _maybeExportCacheResult(self, results):
if self._export_stats:
cacheStats = {
"cacheStats": results
}
with open(os.path.join(self._export_location,
constants.BUILDER_OUTPUT_FILE), "w") as f:
f.write(json.dumps(cacheStats))
def Set(self, cache_key, value):
if not self._should_upload:
logging.info("--no-upload flag set, images won't be pushed")
return
entry = self._tag(cache_key)
with docker_session.Push(
entry,
self._creds,
self._transport,
threads=self._threads,
mount=self._mount) as session:
session.upload(value)
@staticmethod
def buildCacheResult(cache_level, cache_key, cache_status):
return {
"type": "docker_layer_cache",
"level": cache_level,
"hash": cache_key,
"status": cache_status
}
@staticmethod
def getEntryFromCreds(entry, creds, transport):
"""Given a cache entry and a set of credentials authenticated
to a cache registry, check if the entry exists in the cache."""
with docker_image.FromRegistry(entry, creds, transport) as img:
if img.exists():
logging.info('Found cached base image: %s.' % entry)
return img
logging.info('No cached base image found for entry: %s.' % entry)
@staticmethod
def checkTTL(entry, ttl):
"""Check TTL of cache entry.
Return whether or not the entry is expired."""
last_created = ftl_util.timestamp_to_time(
ftl_util.creation_time(entry))
now = datetime.datetime.now()
return last_created > now - datetime.timedelta(
hours=ttl)