src/code-launcher/code_launcher.py (343 lines of code) (raw):
import argparse
import asyncio
import base64
import os
import subprocess
import sys
import uuid
import logging
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from podman.errors.exceptions import PodmanError, APIError
import uvicorn
from opentelemetry import _logs
from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk._logs.export import BatchLogRecordProcessor
from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
OTLPLogExporter,
)
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.logging import LoggingInstrumentor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from src.cmd_executors.executors import ACRCmdExecutor
from src.utilities import utilities, podman_utilities
from cleanroomspec.models.python.model import (
Application,
ApplicationStartType,
ConsentCheckScope,
)
app = FastAPI()
cmd_arguments: argparse.Namespace
application: Application
telemetry_path = os.environ.get("TELEMETRY_MOUNT_PATH", "/mnt/telemetry")
application_name = os.environ.get("APPLICATION_NAME", "no-name")
logger_name = "-".join([application_name, str(uuid.uuid4())[:8]])
# Initialize tracing.
tracer_provider = TracerProvider(
resource=Resource.create(
{
"service.name": f"{logger_name}-code-launcher",
}
),
)
tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
# Sets the global default tracer provider.
trace.set_tracer_provider(tracer_provider)
# Creates a tracer from the global tracer provider.
tracer = trace.get_tracer("code-launcher")
# Initialize logging
logger_provider = LoggerProvider(
resource=Resource.create(
{
"service.name": f"{logger_name}-code-launcher",
}
),
)
logger_provider.add_log_record_processor(
BatchLogRecordProcessor(OTLPLogExporter(insecure=True))
)
_logs.set_logger_provider(logger_provider)
# Create a logger from the global logger provider.
logging.basicConfig(level=logging.INFO)
handler = LoggingHandler(level=logging.NOTSET, logger_provider=logger_provider)
logger = logging.getLogger("code-launcher")
logger.addHandler(handler)
# Create a meter provider
exporter = OTLPMetricExporter()
reader = PeriodicExportingMetricReader(exporter, export_interval_millis=10000)
meter_provider = MeterProvider(
metric_readers=[reader],
resource=Resource.create(
{
"service.name": f"{logger_name}-code-launcher",
}
),
)
metrics.set_meter_provider(meter_provider)
# Add all the external instrumentors that are required.
RequestsInstrumentor().instrument(
tracer_provider=tracer_provider, meter_provider=meter_provider
)
FastAPIInstrumentor.instrument_app(
app, tracer_provider=tracer_provider, meter_provider=meter_provider
)
LoggingInstrumentor().instrument(
set_logging_format=True,
tracer_provider=tracer_provider,
meter_provider=meter_provider,
)
def log_args(logger: logging.Logger, args: argparse.Namespace):
logger.info("Arguments:")
for arg in vars(args):
logger.info(f"{arg}: {getattr(args, arg)}")
def parse_args(cmd_args):
arg_parser = argparse.ArgumentParser(
description="Launch container in non-root namespace"
)
# Telemetry params
arg_parser.add_argument(
"--application-name",
type=str,
default="app",
help="Application Name for telemetry export",
)
arg_parser.add_argument(
"--identity-port",
type=int,
default=8290,
help="The port for the identity sidecar. Defaults to 8290.",
)
# governance params
arg_parser.add_argument(
"--governance-port",
type=int,
default=8300,
help="The port for the governance sidecar. Defaults to 8300.",
)
# secrets params
arg_parser.add_argument(
"--secrets_port",
type=int,
default=9300,
help="The port for the secrets sidecar. Defaults to 9300.",
)
# otelcollector params
arg_parser.add_argument(
"--otelcollector_port",
type=int,
default=4317,
help="The port for the otelcollector sidecar. Defaults to 4317.",
)
# Code launcher port
arg_parser.add_argument(
"--codelauncher_port",
type=int,
default=8200,
help="The port for the otelcollector sidecar. Defaults to 4317.",
)
# TODO (HPrabh): Take this as an array and support multiple applications.
arg_parser.add_argument(
"--application-base-64",
type=str,
default=os.environ.get("APPLICATION_SETTINGS_BASE64"),
help="The application settings in a base64 encoded format.",
)
parsed_args = arg_parser.parse_args(cmd_args)
validate_args(parsed_args, arg_parser)
return parsed_args
def validate_args(cmd_args, arg_parser):
if cmd_args.application_base_64 is None:
arg_parser.error("application details not provided")
application = base64.urlsafe_b64decode(cmd_args.application_base_64)
Application.model_validate_json(application)
@app.exception_handler(PodmanError)
async def podman_error_handler(request: Request, exc: PodmanError):
logger.error(f"An error occurred: {repr(exc)}")
# TODO (HPrabh): Add more specific error handling based on the error type.
return JSONResponse(
status_code=500,
content={
"message": f"An error occurred while processing {request.url.path}",
"error": f"{repr(exc)}",
"details": f"{exc}",
},
)
@app.exception_handler(APIError)
async def podman_api_error_handler(request: Request, exc: APIError):
logger.error(f"An error occurred: {repr(exc)}")
status_code = exc.status_code if exc.status_code is not None else 500
return JSONResponse(
status_code=status_code,
content={
"message": f"An error occurred while processing {request.url.path}",
"error": f"{repr(exc)}",
"details": f"{exc}",
},
)
from src.exceptions.custom_exceptions import ConsentCheckFailure
@app.exception_handler(ConsentCheckFailure)
async def consent_check_failure_handler(request: Request, exc: ConsentCheckFailure):
return JSONResponse(
status_code=403,
content={"code": "ConsentCheckFailed", "message": str(exc)},
)
@app.post("/gov/{application_name}/start")
async def start(application_name):
from src.connectors.httpconnectors import GovernanceHttpConnector
await GovernanceHttpConnector.check_consent(ConsentCheckScope.Execution.value)
if application.name != application_name:
raise HTTPException(status_code=404, detail="Application not found")
try:
await podman_utilities.start_application_container(application, telemetry_path)
return JSONResponse(
status_code=200, content={"message": "Application started successfully."}
)
except Exception as e:
logger.error(
f"Starting application container for {application.name} failed with error {repr(e)}.",
exc_info=True,
)
await GovernanceHttpConnector.put_event(
f"Starting application container for {application.name} failed with error {repr(e)}."
)
raise
@app.get("/gov/{application_name}/status")
async def getStatus(application_name):
from src.exceptions.custom_exceptions import PodmanContainerNotFound
from src.connectors.httpconnectors import GovernanceHttpConnector
try:
return await podman_utilities.get_application_status(application_name)
except PodmanContainerNotFound as e:
await GovernanceHttpConnector.put_event(
f"Get Status for {application.name} failed with error {repr(e)}."
)
raise HTTPException(status_code=404, detail="Application not found")
except Exception as e:
logger.error(
f"Get Status for {application.name} failed with error {repr(e)}.",
exc_info=True,
)
await GovernanceHttpConnector.put_event(
f"Get Status for {application.name} failed with error {repr(e)}."
)
raise
@app.post("/gov/exportLogs")
async def exportLogs():
import shutil
from src.connectors.httpconnectors import (
GovernanceHttpConnector,
)
await GovernanceHttpConnector.check_consent(ConsentCheckScope.Logging.value)
application_telemetry_path = utilities.wait_for_mount_point("application-telemetry")
try:
logger.info(
f"Copying application telemetry data to {application_telemetry_path}"
)
shutil.copytree(
f"{telemetry_path}/application",
f"{application_telemetry_path}",
dirs_exist_ok=True,
)
return JSONResponse(
status_code=200,
content={"message": "Application telemetry data exported successfully."},
)
except Exception as e:
from src.exceptions.custom_exceptions import TelemetryCaptureFailure
await GovernanceHttpConnector.put_event(
f"Exporting application telemetry (logs) failed with error {e}."
)
raise TelemetryCaptureFailure("Failed to export application telemetry") from e
@app.post("/gov/exportTelemetry")
async def exportTelemetry():
import shutil
from src.connectors.httpconnectors import (
GovernanceHttpConnector,
)
await GovernanceHttpConnector.check_consent(ConsentCheckScope.Telemetry.value)
infrastructure_telemetry_path = utilities.wait_for_mount_point(
"infrastructure-telemetry"
)
try:
logger.info(
f"Copying infrastructure telemetry data to {infrastructure_telemetry_path}"
)
shutil.copytree(
f"{telemetry_path}/infrastructure",
f"{infrastructure_telemetry_path}",
dirs_exist_ok=True,
)
return JSONResponse(
status_code=200,
content={"message": "Infrastructure telemetry data exported successfully."},
)
except Exception as e:
from src.exceptions.custom_exceptions import TelemetryCaptureFailure
await GovernanceHttpConnector.put_event(
f"Exporting infrastructure telemetry failed with error {repr(e)}."
)
raise TelemetryCaptureFailure(
"Failed to export infrastructure telemetry"
) from e
async def main(cmd_args):
global cmd_arguments
global application
cmd_arguments = parse_args(cmd_args)
if utilities.wait_for_services_enabled():
# wait for critical infra services to be ready
utilities.wait_for_services_readiness(
[
cmd_arguments.otelcollector_port,
cmd_arguments.governance_port,
cmd_arguments.identity_port,
cmd_arguments.secrets_port,
]
)
try:
import psutil
process = psutil.Popen(
["podman", "system", "service", "--time", "0"],
)
logger.info(f"Subprocess podman start service exitCode: {process.returncode}")
except subprocess.SubprocessError as e:
logger.error(f"Failed to launch subprocess podman start service. Error: {e}")
raise e
await utilities.wait_for_podman_service()
decoded_application = base64.urlsafe_b64decode(
cmd_arguments.application_base_64
).decode()
application = Application.model_validate_json(decoded_application)
logger.info(f"Starting podman for application: {application.model_dump_json()}")
log_args(logger, cmd_arguments)
from src.connectors.httpconnectors import (
IdentityHttpConnector,
GovernanceHttpConnector,
)
IdentityHttpConnector.set_port(cmd_arguments.identity_port)
GovernanceHttpConnector.set_port(cmd_arguments.governance_port)
# execute handlers
for handlers in [ACRCmdExecutor()]:
await handlers.execute(application)
if application.startType == ApplicationStartType.Auto:
try:
await GovernanceHttpConnector.check_consent(
ConsentCheckScope.Execution.value
)
await podman_utilities.start_application_container(
application, telemetry_path
)
except Exception as e:
logger.error(
f"Starting application container for {application.name} failed with error {repr(e)}.",
exc_info=True,
)
await GovernanceHttpConnector.put_event(
f"Starting application container for {application.name} failed with error {repr(e)}."
)
else:
logger.info(
f"Not starting application {application.name} as the start type is not {ApplicationStartType.Auto}."
)
config = uvicorn.Config(
app=app,
host="0.0.0.0",
port=cmd_arguments.codelauncher_port,
log_level="info",
)
_server = uvicorn.Server(config)
await _server.serve()
if __name__ == "__main__":
asyncio.run(main(sys.argv[1:]))