dnf/artifact-registry.py (65 lines of code) (raw):
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import dnf
from subprocess import CalledProcessError, DEVNULL, PIPE, run
from urllib.parse import urlparse
token_cmd = '/usr/libexec/ar-token'
class ArtifactRegistry(dnf.Plugin):
"""DNF Plugin for authenticated access to Google Artifact Registry."""
name = 'artifact-registry'
def __init__(self, base, cli):
super(ArtifactRegistry, self).__init__(base, cli)
self.base = base
self.token = None
self.error = False
def config(self):
""" Setup http headers to repos with baseurl option containing pkg.dev. """
for repo in self.base.repos.iter_enabled():
# Check if the 'artifact_registry_oauth' option is set in the repository's config.
if repo.cfg.has_option(repo.id, 'artifact_registry_oauth') and repo.cfg.getboolean(repo.id, 'artifact_registry_oauth'):
self._add_headers(repo)
break # Don't add more than one Authorization header.
# We don't have baseurl option so skip it earlier.
if not hasattr(repo, 'baseurl'):
continue
# Check if any repo urls are for Artifact Registry.
for baseurl in repo.baseurl:
if not self.error: # Check error flag first for efficiency
parsed_url = urlparse(baseurl)
if parsed_url.scheme == 'https' and parsed_url.netloc.endswith('-yum.pkg.dev'):
self._add_headers(repo)
break # Don't add more than one Authorization header.
def _add_headers(self, repo):
token = self._get_token()
if token:
headers = repo.get_http_headers()
new_headers = ('Authorization: Bearer %s' % token,) + headers
repo.set_http_headers(new_headers)
def _get_token(self):
if self.token:
return self.token
config = self.read_config(self.base.conf)
opts = {}
if config.has_section('main'):
# JSON has priority over email.
if config.has_option('main', 'service_account_json'):
opts['service_account_json'] = config.get(
'main', 'service_account_json')
elif config.has_option('main', 'service_account_email'):
opts['service_account_email'] = config.get(
'main', 'service_account_email')
if config.has_option('main', 'debug'):
opts['debug'] = config.getboolean('main', 'debug')
self.token = self._call_helper(**opts)
return self.token
def _call_helper(self, service_account_json=None, service_account_email=None,
debug=False):
args = []
# JSON has priority over email.
if service_account_json:
args.append('--service_account_json=' + service_account_json)
elif service_account_email:
args.append('--service_account_email=' + service_account_email)
if debug:
# Inherit stderr to see debug statements
stderr = None
else:
stderr = DEVNULL
try:
cmd_result = run([token_cmd] + args,
check=True, stdout=PIPE, stderr=stderr)
except CalledProcessError as e:
self.error = True
print('Error trying to obtain Google credentials:', e)
return
return cmd_result.stdout.decode('utf-8')