services/ui_backend_service/ui_server.py (118 lines of code) (raw):
import asyncio
import os
import signal
import concurrent
from aiohttp import web
from pyee import AsyncIOEventEmitter
from services.utils import DBConfiguration, logging, ORIGIN_TO_ALLOW_CORS_FROM
from services.metadata_service.server import app as metadata_service_app
# service processes and routes
from .api import (AdminApi, ArtificatsApi, AutoCompleteApi, ConfigApi,
DagApi, FeaturesApi, FlowApi, ListenNotify, LogApi,
MetadataApi, RunApi, RunHeartbeatMonitor, SearchApi, StepApi, TagApi,
TaskApi, TaskHeartbeatMonitor, Websocket, PluginsApi, CardsApi)
from .api.utils import allow_get_requests_only
from .data.cache import CacheStore
from .data.db import AsyncPostgresDB
from .doc import swagger_definitions, swagger_description
from .features import (FEATURE_DB_LISTEN_ENABLE, FEATURE_HEARTBEAT_ENABLE,
FEATURE_WS_ENABLE)
from .frontend import Frontend
from .plugins import init_plugins
PATH_PREFIX = os.environ.get("PATH_PREFIX", "")
DEFAULT_SERVICE_HOST = str(os.environ.get('MF_UI_METADATA_HOST', '0.0.0.0'))
DEFAULT_SERVICE_PORT = os.environ.get('MF_UI_METADATA_PORT', 8083)
DEFAULT_METADATA_SERVICE_URL = "http://{}:{}{}/metadata".format(
DEFAULT_SERVICE_HOST,
DEFAULT_SERVICE_PORT,
PATH_PREFIX)
# Provide defaults for Metaflow Client
os.environ['METAFLOW_SERVICE_URL'] = os.environ.get('METAFLOW_SERVICE_URL', DEFAULT_METADATA_SERVICE_URL)
os.environ['USERNAME'] = os.environ.get('USERNAME', 'none')
os.environ['METAFLOW_CLIENT_CACHE_MAX_SIZE'] = os.environ.get('METAFLOW_CLIENT_CACHE_MAX_SIZE', '0')
os.environ['METAFLOW_DEFAULT_METADATA'] = os.environ.get('METAFLOW_DEFAULT_METADATA', 'service')
# Create database triggers automatically, enabled by default
# Disable with env variable `DB_TRIGGER_CREATE=0`
DB_TRIGGER_CREATE = os.environ.get("DB_TRIGGER_CREATE", "1") == "1"
def app(loop=None, db_conf: DBConfiguration = None):
loop = loop or asyncio.get_event_loop()
_app = web.Application(loop=loop)
app = web.Application(loop=loop) if len(PATH_PREFIX) > 0 else _app
async_db = AsyncPostgresDB('ui')
loop.run_until_complete(async_db._init(db_conf=db_conf, create_triggers=DB_TRIGGER_CREATE))
event_emitter = AsyncIOEventEmitter()
async_db_cache = AsyncPostgresDB('ui:cache')
loop.run_until_complete(async_db_cache._init(db_conf))
cache_store = CacheStore(app=app, db=async_db_cache, event_emitter=event_emitter)
if FEATURE_DB_LISTEN_ENABLE:
async_db_notify = AsyncPostgresDB('ui:notify')
loop.run_until_complete(async_db_notify._init(db_conf))
ListenNotify(app, db=async_db_notify, event_emitter=event_emitter)
if FEATURE_HEARTBEAT_ENABLE:
async_db_heartbeat = AsyncPostgresDB('ui:heartbeat')
loop.run_until_complete(async_db_heartbeat._init(db_conf))
RunHeartbeatMonitor(event_emitter, db=async_db_heartbeat)
TaskHeartbeatMonitor(event_emitter, db=async_db_heartbeat, cache=cache_store)
if FEATURE_WS_ENABLE:
async_db_ws = AsyncPostgresDB('ui:websocket')
loop.run_until_complete(async_db_ws._init(db_conf))
Websocket(app, db=async_db_ws, event_emitter=event_emitter, cache=cache_store)
AutoCompleteApi(app, async_db)
FlowApi(app, async_db)
RunApi(app, async_db, cache_store)
StepApi(app, async_db)
TaskApi(app, async_db, cache_store)
MetadataApi(app, async_db)
ArtificatsApi(app, async_db, cache_store)
TagApi(app, async_db)
SearchApi(app, async_db, cache_store)
DagApi(app, async_db, cache_store)
FeaturesApi(app)
ConfigApi(app)
PluginsApi(app)
CardsApi(app, async_db, cache_store)
LogApi(app, async_db, cache_store)
AdminApi(app, cache_store)
# Add Metadata Service as a sub application so that Metaflow Client
# can use it as a service backend in case none provided via METAFLOW_SERVICE_URL
#
# Metadata service exposed through UI service is intended for read-only use only.
# 'allow_get_requests_only' middleware will only accept GET requests.
app.add_subapp("/metadata", metadata_service_app(
loop=loop, db_conf=db_conf, middlewares=[allow_get_requests_only]))
if os.environ.get("UI_ENABLED", "1") == "1":
# Serve UI bundle only if enabled
# This has to be placed last due to catch-all route
Frontend(app)
if len(PATH_PREFIX) > 0:
_app.add_subapp(PATH_PREFIX, app)
if ORIGIN_TO_ALLOW_CORS_FROM:
import aiohttp_cors
logging.info("We will allows CORS from the origin %s" % ORIGIN_TO_ALLOW_CORS_FROM)
cors = aiohttp_cors.setup(_app, defaults={
ORIGIN_TO_ALLOW_CORS_FROM: aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*",
allow_methods="*",
)
})
# Configure CORS on all routes.
for route in list(_app.router.routes()):
cors.add(route)
logging.info("Metadata service available at {}".format(DEFAULT_METADATA_SERVICE_URL))
async def _init_plugins():
with concurrent.futures.ThreadPoolExecutor() as pool:
await loop.run_in_executor(pool, init_plugins)
asyncio.run_coroutine_threadsafe(_init_plugins(), loop)
return _app
def main():
loop = asyncio.get_event_loop()
# Set exception and signal handlers for async loop. Mainly for logging purposes.
loop.set_exception_handler(async_loop_error_handler)
for sig in (signal.SIGTERM, signal.SIGHUP, signal.SIGINT):
loop.add_signal_handler(sig, lambda sig=sig: async_loop_signal_handler(sig))
the_app = app(loop, DBConfiguration())
handler = web.AppRunner(the_app)
loop.run_until_complete(handler.setup())
f = loop.create_server(handler.server, DEFAULT_SERVICE_HOST, DEFAULT_SERVICE_PORT)
srv = loop.run_until_complete(f)
print("serving on", srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
def async_loop_error_handler(loop, context):
msg = context.get("exception", context["message"])
logging.error("Encountered an exception: {}".format(msg))
def async_loop_signal_handler(signal):
logging.info("Received signal: {}".format(signal))
if __name__ == "__main__":
main()