resdb_driver/connection.py (79 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import time
from collections import namedtuple
from datetime import datetime, timedelta
from requests import Session
from requests.exceptions import ConnectionError
from .exceptions import HTTP_EXCEPTIONS, TransportError
BACKOFF_DELAY = 0.5 # seconds
HttpResponse = namedtuple("HttpResponse", ("status_code", "headers", "data"))
class Connection:
"""! A Connection object to make HTTP requests to a particular node."""
def __init__(self, *, node_url: str, headers: dict = None):
"""! Initializes a :class:`~resdb_driver.connection.Connection`
instance.
@param node_url (str): Url of the node to connect to.
@param headers (dict): Optional headers to send with each request.
@return An instance of the Connection class
"""
self.node_url = node_url
self.session = Session()
if headers:
self.session.headers.update(headers)
self._retries = 0
self.backoff_time = None
def request(
self,
method: str,
*,
path: str = None,
json: dict = None,
params: dict = None,
headers: dict = None,
timeout: int = None,
backoff_cap: int = None,
**kwargs
) -> HttpResponse:
"""! Performs an HTTP request with the given parameters. Implements exponential backoff.
If `ConnectionError` occurs, a timestamp equal to now +
the default delay (`BACKOFF_DELAY`) is assigned to the object.
The timestamp is in UTC. Next time the function is called, it either
waits till the timestamp is passed or raises `TimeoutError`.
If `ConnectionError` occurs two or more times in a row,
the retry count is incremented and the new timestamp is calculated
as now + the default delay multiplied by two to the power of the
number of retries.
If a request is successful, the backoff timestamp is removed,
the retry count is back to zero.
@param method (str): HTTP method (e.g.: ``'GET'``).
@param path (str): API endpoint path (e.g.: ``'/transactions'``).
@param json (dict): JSON data to send along with the request.
@param params (dict): Dictionary of URL (query) parameters.
@param headers (dict): Optional headers to pass to the request.
@param timeout (int): Optional timeout in seconds.
@param backoff_cap (int): The maximal allowed backoff delay in seconds to be assigned to a node.
@param kwargs: Optional keyword arguments.
@return Response of the HTTP request.
"""
backoff_timedelta = self.get_backoff_timedelta()
if timeout is not None and timeout < backoff_timedelta:
raise TimeoutError
if backoff_timedelta > 0:
time.sleep(backoff_timedelta)
connExc = None
timeout = timeout if timeout is None else timeout - backoff_timedelta
try:
response = self._request(
method=method,
timeout=timeout,
url=self.node_url + path if path else self.node_url,
json=json,
params=params,
headers=headers,
**kwargs,
)
except ConnectionError as err:
connExc = err
raise err
finally:
self.update_backoff_time(
success=connExc is None, backoff_cap=backoff_cap)
return response
def get_backoff_timedelta(self) -> float:
if self.backoff_time is None:
return 0
return (self.backoff_time - datetime.utcnow()).total_seconds()
def update_backoff_time(self, success, backoff_cap=None):
if success:
self._retries = 0
self.backoff_time = None
else:
utcnow = datetime.utcnow()
backoff_delta = BACKOFF_DELAY * 2**self._retries
if backoff_cap is not None:
backoff_delta = min(backoff_delta, backoff_cap)
self.backoff_time = utcnow + timedelta(seconds=backoff_delta)
self._retries += 1
def _request(self, **kwargs) -> HttpResponse:
response = self.session.request(**kwargs)
text = response.text
try:
json = response.json()
except ValueError:
json = None
if not (200 <= response.status_code < 300):
exc_cls = HTTP_EXCEPTIONS.get(response.status_code, TransportError)
raise exc_cls(response.status_code, text, json, kwargs["url"])
data = json if json is not None else text
return HttpResponse(response.status_code, response.headers, data)