elasticapm/transport/http_base.py (40 lines of code) (raw):
# -*- coding: utf-8 -*-
# BSD 3-Clause License
#
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
from elasticapm.conf import constants
from elasticapm.transport.base import Transport
class HTTPTransportBase(Transport):
def __init__(
self,
url,
client,
verify_server_cert=True,
compress_level=5,
headers=None,
timeout=None,
server_cert=None,
server_ca_cert_file=None,
**kwargs
) -> None:
self._url = url
self._verify_server_cert = verify_server_cert
self._server_cert = server_cert
self._server_ca_cert_file = server_ca_cert_file
self._timeout = timeout
self._headers = {
k.encode("ascii") if isinstance(k, str) else k: v.encode("ascii") if isinstance(v, str) else v
for k, v in (headers if headers is not None else {}).items()
}
base, sep, tail = self._url.rpartition(constants.EVENTS_API_PATH)
self._config_url = "".join((base, constants.AGENT_CONFIG_PATH, tail))
self._server_info_url = "".join((base, constants.SERVER_INFO_PATH, tail))
super(HTTPTransportBase, self).__init__(client, compress_level=compress_level, **kwargs)
def send(self, data, forced_flush=False):
"""
Sends a request to a remote APM Server using HTTP POST.
Returns the shortcut URL of the recorded error on Elastic APM
"""
raise NotImplementedError()
def get_config(self, current_version=None, keys=None):
"""
Gets configuration from a remote APM Server
:param current_version: version of the current configuration
:param keys: a JSON-serializable dict to identify this instance, e.g.
{
"service": {
"name": "foo",
"environment": "bar"
}
}
:return: a three-tuple of new version, config dictionary and validity in seconds.
Any element of the tuple can be None.
"""
raise NotImplementedError()
@property
def auth_headers(self):
if self.client.config.api_key:
return {"Authorization": "ApiKey " + self.client.config.api_key}
elif self.client.config.secret_token:
return {"Authorization": "Bearer " + self.client.config.secret_token}
return {}
# left for backwards compatibility
AsyncHTTPTransportBase = HTTPTransportBase