in blueprints/cloud-operations/compute-quota-monitoring/src/main.py [0:0]
def fetch(request, delete=False):
'Minimal HTTP client interface for API calls.'
logging.debug(f'fetch {"POST" if request.data else "GET"} {request.url}')
logging.debug(request.data)
try:
if delete:
response = HTTP.delete(request.url, headers=request.headers)
elif not request.data:
response = HTTP.get(request.url, headers=request.headers)
else:
response = HTTP.post(request.url, headers=request.headers,
data=json.dumps(request.data))
except (google.auth.exceptions.RefreshError,
requests.exceptions.ReadTimeout) as e:
raise SystemExit(e.args[0])
try:
rdata = json.loads(response.content)
except json.JSONDecodeError as e:
logging.critical(e)
raise SystemExit(f'Error decoding response: {response.content}')
if response.status_code == 404:
raise NotFound(
f'Resource not found. Error: {rdata.get("error")} URL: {request.url}')
if response.status_code != 200:
logging.critical(rdata)
error = rdata.get('error', {})
raise SystemExit('API error: {} (HTTP {})'.format(
error.get('message', 'error message cannot be decoded'),
error.get('code', 'no code found')))
return json.loads(response.content)