services/metadata_service/api/admin.py (71 lines of code) (raw):

import boto3 import json import os import psycopg2 import psycopg2.extras from multidict import MultiDict from aiohttp import web from botocore.client import Config from services.data.postgres_async_db import AsyncPostgresDB from services.utils import ( get_traceback_str ) from services.metadata_service.api.utils import METADATA_SERVICE_VERSION, \ METADATA_SERVICE_HEADER, web_response class AuthApi(object): def __init__(self, app): app.router.add_route("GET", "/auth/token", self.get_authorization_token) app.router.add_route("GET", "/ping", self.ping) app.router.add_route("GET", "/version", self.version) app.router.add_route("GET", "/healthcheck", self.healthcheck) async def version(self, request): """ --- description: Returns the version of the metadata service tags: - Admin produces: - 'text/plain' responses: "200": description: successful operation. Return the version number "405": description: invalid HTTP Method """ return web.Response(text=str(METADATA_SERVICE_VERSION)) async def ping(self, request): """ --- description: This end-point allow to test that service is up. tags: - Admin produces: - 'text/plain' responses: "202": description: successful operation. Return "pong" text "405": description: invalid HTTP Method """ return web.Response(text="pong", headers=MultiDict( {METADATA_SERVICE_HEADER: METADATA_SERVICE_VERSION})) async def healthcheck(self, request): """ --- description: This end-point allow to test that service is up and connected to the db tags: - Admin produces: - 'application/json' responses: "202": description: successful operation. "405": description: invalid HTTP Method "500": description: unable to connect to DB, this node is not considered healthy and shouldn't receive traffic """ status = {} status_code = 200 with ( await AsyncPostgresDB.get_instance().pool.cursor( cursor_factory=psycopg2.extras.DictCursor ) ) as cur: await cur.execute("SELECT 1") records = await cur.fetchall() if len(records) > 0: status["status"] = "UP" else: status["status"] = "DOWN" status_code = 500 cur.close() return web_response(status=status_code, body=json.dumps(status)) async def get_authorization_token(self, request): """ --- description: this is used exclusively for sandbox auth tags: - Auth produces: - text/plain responses: "200": description: successfully returned certs "403": description: no token for you "405": description: invalid HTTP Method "500": description: internal server error """ try: role_arn = os.environ.get("MF_USER_IAM_ROLE") region_name = os.environ.get("MF_REGION", "us-west-2") endpoint_url = os.environ.get( "MF_STS_ENDPOINT", "https://sts.us-west-2.amazonaws.com" ) config = Config(connect_timeout=1, read_timeout=1) sts_connection = boto3.client( "sts", config=config, region_name=region_name, endpoint_url=endpoint_url ) assumed_role = sts_connection.assume_role( RoleArn=role_arn, RoleSessionName="acct_role" ) credentials = {} credentials["aws_access_key_id"] = assumed_role["Credentials"][ "AccessKeyId" ] credentials["aws_secret_access_key"] = assumed_role["Credentials"][ "SecretAccessKey" ] credentials["aws_session_token"] = assumed_role["Credentials"][ "SessionToken" ] return web.Response(status=200, body=json.dumps(credentials)) except Exception as ex: body = {"err_msg": str(ex), "traceback": get_traceback_str()} return web.Response(status=500, body=json.dumps(body))