in project/nanoeval/nanoeval/_aiomonitor.py [0:0]
def _lock_and_yield_port() -> Generator[int, None, None]:
ports_dir = Path("/tmp/nanoeval/ports")
ports_dir.mkdir(parents=True, exist_ok=True)
while True:
port = random.randint(10000, 20000)
port_file = ports_dir / str(port)
try:
with open(port_file, "x"):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("localhost", port))
yield port
break
except (FileExistsError, OSError):
continue
finally:
if port_file.exists():
port_file.unlink()