in deploy/ansible/action_plugins/public_api.py [0:0]
def _request(self, method, path, payload=None):
headers = self._headers.copy()
data = None
if payload:
data = json.dumps(payload)
headers["Content-Type"] = "application/json"
url = self._address + path
r_data = {} # Initialize r_data to avoid referencing an uninitialized variable
try:
r = self._client.open(method, url, data=data, headers=headers, timeout=60)
r_status = r.getcode()
r_headers = dict(r.headers)
data = r.read().decode("utf-8")
r_data = json.loads(data) if data else {}
except HTTPError as e:
r_status = e.code
r_headers = dict(e.headers)
try:
r_data = e.read().decode("utf-8")
except UnicodeDecodeError:
raise AnsibleConnectionFailure(f"HTTPError {r_status}: {r_headers}")
raise AnsibleConnectionFailure(
f"HTTPError {r_status}: {r_headers} Response {r_data}"
)
finally:
if isinstance(r_data, str):
r_data = json.loads(r_data)
file_data = r_data.copy()
with open(self.logLocation, "w") as f:
if file_data.get("access_token"):
file_data.pop("access_token")
json.dump(file_data, f, sort_keys=True, indent=4)
return r_status, r_headers, r_data