decisionai_plugin/common/util/kafka_util.py (38 lines of code) (raw):
from kafka.partitioner.default import DefaultPartitioner
import random
import threading
class RoundRobinPartitioner(DefaultPartitioner):
__topic_counter_map = dict()
__DEFAULT_TOPIC = "default_topic"
@classmethod
def __call__(cls, key, all_partitions, available):
"""
Get the partition corresponding to key
:param key: partitioning key
:param all_partitions: list of all partitions sorted by partition ID
:param available: list of available partitions in no particular order
:return: one of the values from all_partitions or available
"""
if key is None:
next_value = cls.__next_value()
if available:
return next_value % len(available)
return next_value % len(all_partitions)
return super().__call__(key, all_partitions, available)
@classmethod
def __next_value(cls, topic=None):
if topic is None:
topic = cls.__DEFAULT_TOPIC
atomic_counter = cls.__topic_counter_map.get(topic, None)
if atomic_counter is None:
cls.__topic_counter_map[topic] = FastReadCounter()
atomic_counter = cls.__topic_counter_map.get(topic)
return atomic_counter.get_and_increment()
def __str__(self):
return type(self).__name__ + str(self.__dict__)
class FastReadCounter:
def __init__(self):
self.value = int(random.random() * 2147483647)
self._lock = threading.Lock()
def increment(self):
with self._lock:
self.value += 1
self.value &= 2147483647
def get_and_increment(self):
with self._lock:
self.value += 1
self.value &= 2147483647
return self.value