in run_tests/piston_client.py [0:0]
def __init__(self, base_endpoint: str | list[str] = "http://ip-10-53-80-65:3223/api/v2", session=None, max_requests_per_endpoint=1):
self.max_requests_per_endpoint = max_requests_per_endpoint
self.base_endpoints = [base_endpoint] if isinstance(base_endpoint, str) else base_endpoint
if len(self.base_endpoints) == 0:
raise ValueError("No Piston endpoints provided. Please check your PISTON_ENDPOINTS environment variable.")
self.endpoint_ids = {endpoint: i for i, endpoint in enumerate(self.base_endpoints)}
self._session = session
self.endpoint_tokens = asyncio.Queue(maxsize=max_requests_per_endpoint * len(self.base_endpoints))
for _ in range(max_requests_per_endpoint):
for base_endpoint in self.base_endpoints:
self.endpoint_tokens.put_nowait(base_endpoint)
self._endpoint_failures = Counter()
self._unhealthy_endpoints = set()
self._endpoint_failures_lock = asyncio.Lock()