in datafusion_ray/core.py [0:0]
def __init__(self, min_processors: int, max_processors: int):
self.min_processors = min_processors
self.max_processors = max_processors
# a map of processor_key (a random identifier) to stage actor reference
self.pool = {}
# a map of processor_key to listening address
self.addrs = {}
# holds object references from the start_up method for each processor
# we know all processors are listening when all of these refs have
# been waited on. When they are ready we remove them from this set
self.processors_started = set()
# an event that is set when all processors are ready to serve
self.processors_ready = asyncio.Event()
# processors that are started but we need to get their address
self.need_address = set()
# processors that we have the address for but need to start serving
self.need_serving = set()
# processors in use
self.acquired = set()
# processors available
self.available = set()
for _ in range(min_processors):
self._new_processor()
log.info(
f"created ray processor pool (min_processors: {min_processors}, max_processors: {max_processors})"
)