in src/buildstream_plugins/sources/docker.py [0:0]
def _request(self, subpath, extra_headers=None, stream=False, _reauthorized=False):
if not extra_headers:
extra_headers = {}
headers = {"content-type": "application/json"}
headers.update(extra_headers)
if self.token:
headers["Authorization"] = "Bearer {}".format(self.token)
url = urljoin(self.endpoint, "v2", subpath)
response = requests.get(url, headers=headers, stream=stream, timeout=self.api_timeout)
if response.status_code == requests.codes["unauthorized"] and not _reauthorized:
# This request requires (re)authorization. See:
# https://docs.docker.com/registry/spec/auth/token/
auth_challenge = response.headers["Www-Authenticate"]
auth_vars = parse_bearer_authorization_challenge(auth_challenge)
self._auth(auth_vars["realm"], auth_vars["service"], auth_vars["scope"])
return self._request(subpath, extra_headers=extra_headers, _reauthorized=True)
else:
response.raise_for_status()
return response