nyc_taxis/track.py (20 lines of code) (raw):
import asyncio
import time
def wait_for_ml_lookback(es, params):
while True:
response = es.ml.get_datafeed_stats(datafeed_id=params["datafeed-id"])
if response["datafeeds"][0]["state"] == "stopped":
break
time.sleep(5)
async def wait_for_ml_lookback_async(es, params):
while True:
response = await es.ml.get_datafeed_stats(datafeed_id=params["datafeed-id"])
if response["datafeeds"][0]["state"] == "stopped":
break
await asyncio.sleep(5)
def register(registry):
async_runner = registry.meta_data.get("async_runner", False)
if async_runner:
registry.register_runner("wait-for-ml-lookback", wait_for_ml_lookback_async, async_runner=True)
else:
registry.register_runner("wait-for-ml-lookback", wait_for_ml_lookback)