MockServer/server_controller.py (180 lines of code) (raw):
from fastapi import FastAPI, Response, UploadFile, File, HTTPException, Request, Body
from fastapi.responses import StreamingResponse, RedirectResponse, FileResponse, JSONResponse
from starlette.responses import Response as StarletteResponse
from starlette.datastructures import MutableHeaders
from starlette.types import Message
import json
import os
import asyncio
from typing import Optional, Any
import gzip
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
def create_app():
app = FastAPI()
@app.middleware("http")
async def log_request_info(request: Request, call_next):
"""Log request and response information"""
# Get request details before processing
method = request.method
path = request.url.path
client = request.client.host if request.client else "unknown"
protocol = "HTTP/2" if request.headers.get("upgrade") == "h2c" else "HTTP/1.1"
# Process the request
response = await call_next(request)
# Log the summary
logger.info(
f"Request Summary: {protocol} | {method} {path} | "
f"Client: {client} | "
f"Status: {response.status_code}"
)
return response
@app.get("/hello")
async def root(body: Optional[Any] = Body(None)):
"""
Simple hello world endpoint using FastAPI's Body parameter for proper HTTP/2 handling
"""
return {"message": "Hello World!"}
@app.get("/cookie/set")
async def set_cookie():
"""
Set a test cookie with a fixed value
"""
response = JSONResponse({"status": "cookie_set"})
response.set_cookie(
key="test_cookie",
value="cookie_value_123",
max_age=3600,
path="/",
domain=None,
secure=False,
httponly=True
)
return response
@app.get("/cookie/verify")
async def verify_cookie(request: Request):
"""
Verify if the test cookie exists and has the correct value
"""
cookie = request.cookies.get("test_cookie")
if cookie == "cookie_value_123":
return {"status": "valid_cookie"}
return {"status": "invalid_cookie"}
@app.api_route("/echo", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "HEAD", "OPTIONS"])
async def echo(request: Request):
"""
Echo back request details including headers, method, and body
For binary data, only return the content length
For HEAD requests, return same headers as GET but no body
"""
# Get headers (excluding connection headers that FastAPI handles)
headers = dict(request.headers)
excluded_headers = ['connection', 'content-length', 'transfer-encoding']
headers = {k: v for k, v in headers.items() if k.lower() not in excluded_headers}
# Get request body
body = await request.body()
content_type = request.headers.get('content-type', '').lower()
# Handle body based on content type
if body:
if any(t in content_type for t in ['text', 'json', 'xml', 'form-data', 'x-www-form-urlencoded']):
body_content = body.decode('utf-8', errors='replace')
else:
body_content = f"<binary data of length {len(body)} bytes>"
else:
body_content = None
response_data = {
"method": request.method,
"url": str(request.url),
"headers": headers,
"query_params": dict(request.query_params),
"body": body_content
}
response_headers = {
"X-Echo-Server": "FastAPI"
}
if request.method == "OPTIONS":
response_headers.update({
"Access-Control-Allow-Origin": headers.get("origin", "*"),
"Access-Control-Allow-Methods": headers.get("access-control-request-method", "GET, POST, PUT, DELETE, PATCH, HEAD, OPTIONS"),
"Access-Control-Allow-Headers": headers.get("access-control-request-headers", "*"),
"Access-Control-Max-Age": "86400"
})
return JSONResponse(
content=response_data,
headers=response_headers
)
@app.post("/upload/post/slow")
async def upload_file_slow(file: UploadFile = File(...)):
"""Handle file upload with simulated slow processing"""
chunk_size = 200 * 1024
total_size = 0
while chunk := await file.read(chunk_size):
total_size += len(chunk)
await asyncio.sleep(1) # Simulate slow processing
return {
"filename": file.filename,
"content_type": file.content_type,
"size": total_size
}
@app.put("/upload/put/slow")
async def upload_file_slow(request: Request):
"""Handle file upload with simulated slow processing"""
chunk_size = 200 * 1024
total_size = 0
content_type = request.headers.get("Content-Type", "application/octet-stream")
# Read the body data in chunks
async for chunk in request.stream():
total_size += len(chunk)
await asyncio.sleep(1) # Simulate slow processing
return {
"content_type": content_type,
"size": total_size
}
@app.get("/download/1MB_data_at_200KBps_speed")
async def download_slow(body: Optional[Any] = Body(None)):
"""
Generate and serve a file with controlled download speed
"""
async def generate_slow_content():
chunk_size = 200 * 1024 # bytes per second
remaining_size = 1 * 1024 * 1024 # total bytes
while remaining_size > 0:
# Calculate the actual chunk size for this iteration
current_chunk_size = min(chunk_size, remaining_size)
yield os.urandom(current_chunk_size)
remaining_size -= current_chunk_size
# Wait 1 second before sending next chunk
await asyncio.sleep(1)
return StreamingResponse(
generate_slow_content(),
media_type="application/octet-stream",
headers={
"Content-Disposition": "attachment; filename=slow_1mb_200kbps.bin",
"X-Download-Size": "1MB",
"X-Download-Speed": "200kbps"
}
)
@app.get("/stream")
async def stream(body: Optional[Any] = Body(None)):
"""Stream a response in chunks with delays"""
async def generate_stream():
for i in range(10):
await asyncio.sleep(1) # Simulate processing delay
yield f"chunk {i}\n".encode()
return StreamingResponse(
generate_stream(),
media_type="text/plain"
)
@app.get("/redirect")
async def redirect(body: Optional[Any] = Body(None)):
"""
Redirect to /echo endpoint with 302 status code
"""
return RedirectResponse(
url="/echo",
status_code=302,
headers={"X-Original-Path": "/redirect"}
)
@app.get("/redirect_to")
async def redirect_to(from_url: str = None):
"""
Redirect to the specified URL provided in the 'from' parameter
If no URL is provided, returns a 400 error
"""
if not from_url:
raise HTTPException(status_code=400, detail="Missing 'from' parameter")
return RedirectResponse(url=from_url, status_code=302)
@app.get("/redirect_chain")
async def redirect_chain(body: Optional[Any] = Body(None)):
"""
Create a redirect chain: /redirect_chain -> /redirect -> /echo
"""
return RedirectResponse(
url="/redirect",
status_code=302,
headers={
"X-Original-Path": "/redirect_chain",
"Connection": "keep-alive",
"Keep-Alive": "timeout=5, max=1000"
}
)
@app.get("/get/gzip_response")
async def gzip_response(body: Optional[Any] = Body(None)):
"""Return a gzipped JSON response"""
content = {"message": "This is a gzipped response"}
buf = gzip.compress(json.dumps(content).encode())
return Response(
content=buf,
media_type="application/json",
headers={"Content-Encoding": "gzip"}
)
@app.get("/timeout/request")
async def timeout_request():
# Sleep for 2 seconds to simulate a slow response
await asyncio.sleep(2)
return {"message": "Response after delay"}
return app
app = create_app()