in pytest_rally/elasticsearch.py [0:0]
def wait_until_port_is_free(self, timeout=120):
start = time.perf_counter()
end = start + timeout
while time.perf_counter() < end:
c = socket.socket()
connect_result = c.connect_ex(("127.0.0.1", self.http_port))
# noinspection PyBroadException
try:
if connect_result == errno.ECONNREFUSED:
c.close()
return
else:
c.close()
time.sleep(0.5)
except Exception:
pass
raise TimeoutError(f"Port [{self.http_port}] is occupied after [{timeout}] seconds")