python/rocketmq/exponential_backoff_retry_policy.py (38 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import math from datetime import timedelta from google.protobuf.duration_pb2 import Duration class ExponentialBackoffRetryPolicy: """A class implementing exponential backoff retry policy.""" def __init__(self, max_attempts, initial_backoff, max_backoff, backoff_multiplier): """Initialize an ExponentialBackoffRetryPolicy instance. :param max_attempts: Maximum number of retry attempts. :param initial_backoff: Initial delay duration before the first retry. :param max_backoff: Maximum delay duration between retries. :param backoff_multiplier: Multiplier that determines the delay factor between retries. """ self._max_attempts = max_attempts self.initial_backoff = initial_backoff self.max_backoff = max_backoff self.backoff_multiplier = backoff_multiplier def get_max_attempts(self): """Get maximum number of retry attempts. :return: Maximum number of retry attempts. """ return self._max_attempts def inherit_backoff(self, retry_policy): """Inherit backoff parameters from another retry policy. :param retry_policy: The retry policy to inherit from. :return: An instance of ExponentialBackoffRetryPolicy with inherited parameters. :raise ValueError: If the strategy of the retry policy is not ExponentialBackoff. """ if retry_policy.strategy_case != "ExponentialBackoff": raise ValueError("Strategy must be exponential backoff") return self._inherit_backoff(retry_policy.exponential_backoff) def _inherit_backoff(self, retry_policy): """Inherit backoff parameters from another retry policy. :param retry_policy: The retry policy to inherit from. :return: An instance of ExponentialBackoffRetryPolicy with inherited parameters. """ return ExponentialBackoffRetryPolicy(self._max_attempts, retry_policy.initial.ToTimedelta(), retry_policy.max.ToTimedelta(), retry_policy.multiplier) def get_next_attempt_delay(self, attempt): """Calculate the delay before the next retry attempt. :param attempt: The number of the current attempt. :return: The delay before the next attempt. """ delay_seconds = min( self.initial_backoff.total_seconds() * math.pow(self.backoff_multiplier, 1.0 * (attempt - 1)), self.max_backoff.total_seconds()) return timedelta(seconds=delay_seconds) if delay_seconds >= 0 else timedelta(seconds=0) @staticmethod def immediately_retry_policy(max_attempts): """Create a retry policy that makes immediate retries. :param max_attempts: Maximum number of retry attempts. :return: An instance of ExponentialBackoffRetryPolicy with no delay between retries. """ return ExponentialBackoffRetryPolicy(max_attempts, timedelta(seconds=0), timedelta(seconds=0), 1) def to_protobuf(self): """Convert the ExponentialBackoffRetryPolicy instance to protobuf. :return: A protobuf message that represents the ExponentialBackoffRetryPolicy instance. """ exponential_backoff = { 'Multiplier': self.backoff_multiplier, 'Max': Duration.FromTimedelta(self.max_backoff), 'Initial': Duration.FromTimedelta(self.initial_backoff) } return { 'MaxAttempts': self._max_attempts, 'ExponentialBackoff': exponential_backoff }