in elasticapm/transport/http.py [0:0]
def send(self, data, forced_flush=False, custom_url=None, custom_headers=None):
response = None
headers = self._headers.copy() if self._headers else {}
headers.update(self.auth_headers)
if custom_headers:
headers.update(custom_headers)
else:
headers.update(
{
b"Content-Type": b"application/x-ndjson",
b"Content-Encoding": b"gzip",
}
)
url = custom_url or self._url
if forced_flush:
url = f"{url}?flushed=true"
try:
try:
response = self.http.urlopen(
"POST", url, body=data, headers=headers, timeout=self._timeout, preload_content=False
)
logger.debug("Sent request, url=%s size=%.2fkb status=%s", url, len(data) / 1024.0, response.status)
except Exception as e:
if isinstance(e, MaxRetryError) and isinstance(e.reason, TimeoutError):
message = "Connection to APM Server timed out " "(url: %s, timeout: %s seconds)" % (
self._url,
self._timeout,
)
else:
message = "Unable to reach APM Server: %s (url: %s)" % (e, self._url)
raise TransportException(message, data)
body = response.read()
if response.status >= 400:
if response.status == 429: # rate-limited
message = "Temporarily rate limited: "
else:
message = "HTTP %s: " % response.status
message += body.decode("utf8", errors="replace")[:10000]
raise TransportException(message, data)
return response.headers.get("Location")
finally:
if response:
response.close()