processors/docker.py (183 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .base import Processor, NotConfiguredException import google.auth from googleapiclient import discovery import time from _vendor.python_docker.registry import Registry class DockerProcessor(Processor): """ Perform actions on Docker registries. Args: hostname (str): Docker registry hostname. username (str, optional): Username for Docker registry. Defaults to SA authentication for Artifact Registry. password (str, optional): Password for Docker registry. image (str): Image to pull/push tag (str, optional): Tag to pull/push, defaults to latest. mode (str): image.copy, image.delete, image.deleteversion, images.list destination_hostname (str, optional): Docker registry hostname. (For copy) destination_username (str, optional): Username for Docker registry. Defaults to SA authentication for Artifact Registry. (For copy) destination_password (str, optional): Password for Docker registry. (For copy) destination_image (str): Image to pull/push destination_tag (str, optional): Tag to pull/push, defaults to latest. tls_verify (bool, optional): Set false to disable TLS verify at source registry. destination_tls_verify (bool, optional): Set false to disable TLS verify at destination registry. """ source_tls_verify = True destination_tls_verify = True def get_default_config_key(): return 'docker' def wait_for_operation_done(self, ar_service, operation_name): end_time = start_time = time.monotonic() while True and (end_time - start_time) < 60: op_request = ar_service.projects().locations().operations().get( name=operation_name).execute() if 'done' in op_request and op_request['done']: if 'error' in op_request: self.logger.error( 'Error while waiting for long running operation %s to complete.' % (operation_name), extra={'error': op_request['error']}) return op_request['error'] response = op_request['response'] del response['@type'] return response time.sleep(2) end_time = time.monotonic() def process(self, output_var='docker'): if 'mode' not in self.config: raise NotConfiguredException('No Docker operation specified.') credentials, credentials_project_id = google.auth.default() mode = self._jinja_expand_string(self.config['mode'], 'mode') image = None tag = None if 'image' not in self.config and mode != 'images.list': raise NotConfiguredException('No Docker image specified.') elif mode != 'images.list': image = self._jinja_expand_string(self.config['image'], 'image') tag = self._jinja_expand_string( self.config['tag'], 'tag') if 'tag' in self.config else 'latest' hostname = self._jinja_expand_string(self.config['hostname'], 'hostname') username = None password = None if '.pkg.dev' not in hostname and 'gcr.io' not in hostname: if 'username' not in self.config or 'password' not in self.config: raise NotConfiguredException( 'No Docker username or password specified.') username = self._jinja_expand_string(self.config['username'], 'username') password = self._jinja_expand_string(self.config['password'], 'password') else: username = "_dcgcr_2_0_0_token" auth_req = google.auth.transport.requests.Request() credentials.refresh(auth_req) password = credentials.token if 'tls_verify' in self.config: self.source_tls_verify = self._jinja_expand_bool_str( self.config['tls_verify'], 'tls_verify') if 'destination_tls_verify' in self.config: self.destination_tls_verify = self._jinja_expand_bool_str( self.config['destination_tls_verify'], 'destination_tls_verify') source_registry = Registry(hostname=hostname, username=username, password=password, verify=self.source_tls_verify) destination_registry = None destination_hostname = None if 'destination_hostname' not in self.config: destination_registry = source_registry destination_hostname = hostname else: destination_hostname = self._jinja_expand_string( self.config['destination_hostname'], 'hostname') destination_username = username destination_password = password if '.pkg.dev' not in destination_hostname and 'gcr.io' not in destination_hostname: if 'destination_username' not in self.config or 'destination_password' not in self.config: raise NotConfiguredException( 'No Docker username or password specified.') if 'destination_username' in self.config: destination_username = self._jinja_expand_string( self.config['destination_username'], 'username') if 'destination_password' in self.config: destination_password = self._jinja_expand_string( self.config['destination_password'], 'password') else: destination_username = "_dcgcr_2_0_0_token" auth_req = google.auth.transport.requests.Request() credentials.refresh(auth_req) destination_password = credentials.token destination_registry = Registry(hostname=destination_hostname, username=destination_username, password=destination_password, verify=self.destination_tls_verify) if mode == 'image.copy': destination_image = self._jinja_expand_string( self.config['destination_image'], 'image') if 'destination_image' in self.config else image destination_tag = self._jinja_expand_string( self.config['destination_tag'], 'tag') if 'destination_tag' in self.config else tag self.logger.info('Pulling image from: %s/%s:%s' % (hostname, image, tag), extra={ 'registry': hostname, 'image': image, 'tag': tag }) source_image = source_registry.pull_image(image, tag, lazy=True) self.logger.info( 'Pushing image to: %s/%s:%s' % (destination_hostname, destination_image, destination_tag), extra={ 'registry': destination_hostname, 'image': destination_image, 'tag': destination_tag }) source_image.name = destination_image source_image.tag = destination_tag destination_registry.push_image(source_image) self.logger.info( 'Pushed image to: %s/%s:%s' % (destination_hostname, destination_image, destination_tag), extra={ 'registry': destination_hostname, 'image': destination_image, 'tag': destination_tag }) return { output_var: { 'registry': destination_hostname, 'image': destination_image, 'tag': destination_tag } } if mode == 'image.delete' or mode == 'image.deleteversion': self.logger.info('Deleting image from: %s/%s:%s' % (hostname, image, tag), extra={ 'registry': hostname, 'image': image, 'tag': tag }) if '.pkg.dev' in hostname: ar_service = discovery.build( 'artifactregistry', 'v1', http=self._get_branded_http(credentials)) location = hostname.replace('https://', '').replace('-docker.pkg.dev', '') image_parts = image.split('/') project = image_parts[0] repository = image_parts[1] image = '/'.join(image_parts[2:]) name = 'projects/%s/locations/%s/repositories/%s/packages/%s' % ( project, location, repository, image) ar_request = ar_service.projects().locations().repositories( ).packages().delete(name=name) if mode == 'image.deleteversion': name = 'projects/%s/locations/%s/repositories/%s/packages/%s/tags/%s' % ( project, location, repository, image, tag) ar_request = ar_service.projects().locations().repositories( ).packages().tags().delete(name=name) ar_response = ar_request.execute() if 'name' in ar_response: self.wait_for_operation_done(ar_service, ar_response['name']) else: source_registry.delete_image(image, tag) if mode == 'images.list': self.logger.info('Listing images from: %s/%s:%s' % (hostname, image, tag), extra={ 'registry': hostname, 'image': image, 'tag': tag }) return {output_var: source_registry.list_images()} return { output_var: None, }