aws_advanced_python_wrapper/host_availability.py (51 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
from enum import Enum, auto
from aws_advanced_python_wrapper.utils.messages import Messages
from aws_advanced_python_wrapper.utils.properties import (Properties,
WrapperProperties)
class HostAvailability(Enum):
AVAILABLE = auto()
UNAVAILABLE = auto()
class HostAvailabilityStrategy:
def get_host_availability(self, raw_host_availability) -> HostAvailability:
return raw_host_availability
def set_host_availability(self, host_availability: HostAvailability):
pass # do nothing
class ExponentialBackoffHostAvailabilityStrategy(HostAvailabilityStrategy):
name = "exponential_backoff"
def __init__(self, properties: Properties):
max_retries = WrapperProperties.HOST_AVAILABILITY_STRATEGY_MAX_RETRIES.get_int(properties)
if max_retries < 1:
raise ValueError(Messages.get_formatted("HostAvailabilityStrategy.InvalidMaxRetries", max_retries))
self._max_retries = max_retries
backoff_time = WrapperProperties.HOST_AVAILABILITY_STRATEGY_INITIAL_BACKOFF_TIME.get_int(properties)
if backoff_time < 1:
raise ValueError(Messages.get_formatted("HostAvailabilityStrategy.InvalidInitialBackoffTime", backoff_time))
self._initial_backoff_time_secs = backoff_time
self._not_available_count = 0
self._last_changed = datetime.now()
def get_host_availability(self, raw_host_availability) -> HostAvailability:
if raw_host_availability == HostAvailability.AVAILABLE:
return HostAvailability.AVAILABLE
if self._not_available_count >= self._max_retries:
return HostAvailability.UNAVAILABLE
retry_delay_ms = 2 ** self._not_available_count * self._initial_backoff_time_secs * 1000
earliest_retry = self._last_changed + timedelta(milliseconds=retry_delay_ms)
if earliest_retry < datetime.now():
return HostAvailability.AVAILABLE
return raw_host_availability
def set_host_availability(self, host_availability: HostAvailability):
self._last_changed = datetime.now()
if host_availability == HostAvailability.AVAILABLE:
self._not_available_count = 0
else:
self._not_available_count += 1
def create_host_availability_strategy(properties: Properties):
if properties is None:
return HostAvailabilityStrategy()
default_strategy = WrapperProperties.DEFAULT_HOST_AVAILABILITY_STRATEGY.get(properties)
if default_strategy is None or default_strategy == "":
return HostAvailabilityStrategy()
if default_strategy == ExponentialBackoffHostAvailabilityStrategy.name:
return ExponentialBackoffHostAvailabilityStrategy(properties)
return HostAvailabilityStrategy()