in src/buildstream_plugins/sources/docker.py [0:0]
def _request(self, subpath, extra_headers=None, stream=False, _reauthorized=False):
if not extra_headers:
extra_headers = {}
headers = {"content-type": "application/json"}
headers.update(extra_headers)
url = urljoin(self.endpoint, "v2", subpath)
response = requests.get(url, headers=headers, stream=stream, timeout=self.api_timeout, auth=self.auth)
if response.status_code == requests.codes["unauthorized"] and not _reauthorized:
# This request requires (re)authorization. See:
# https://docs.docker.com/registry/spec/auth/token/
auth_challenge = response.headers["Www-Authenticate"]
self.auth.refresh_token(auth_challenge)
return self._request(subpath, extra_headers=extra_headers, _reauthorized=True)
else:
response.raise_for_status()
return response