Elastiflix/python-favorite-otel-manual/main.py (144 lines of code) (raw):
from flask import Flask, request
import logging
import redis
import os
import ecs_logging
import datetime
import random
import time
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
# Using grpc exporter since per the instructions in OTel docs this is needed for any endpoint receiving OTLP.
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.flask import FlaskInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
# from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware
from opentelemetry.sdk.resources import Resource
delay_time = os.environ.get("TOGGLE_SERVICE_DELAY")
if delay_time == "" or delay_time is None:
delay_time = 0
delay_time = int(delay_time)
redis_host = os.environ.get("REDIS_HOST") or "localhost"
redis_port = os.environ.get("REDIS_PORT") or 6379
otel_traces_exporter = os.environ.get("OTEL_TRACES_EXPORTER") or "otlp"
otel_metrics_exporter = os.environ.get("OTEL_TRACES_EXPORTER") or "otlp"
environment = os.environ.get("ENVIRONMENT") or "dev"
otel_service_version = os.environ.get("OTEL_SERVICE_VERSION") or "1.0.0"
resource_attributes = (
os.environ.get("OTEL_RESOURCE_ATTRIBUTES") or "service.version=1.0,deployment.environment=production"
)
otel_exporter_otlp_headers = os.environ.get("OTEL_EXPORTER_OTLP_HEADERS")
# fail if secret token not set
if otel_exporter_otlp_headers is None:
raise Exception("OTEL_EXPORTER_OTLP_HEADERS environment variable not set")
# else:
# otel_exporter_otlp_fheaders= f"Authorization=Bearer%20{secret_token}"
otel_exporter_otlp_endpoint = os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
# fail if server url not set
if otel_exporter_otlp_endpoint is None:
raise Exception("OTEL_EXPORTER_OTLP_ENDPOINT environment variable not set")
else:
exporter = OTLPSpanExporter(endpoint=otel_exporter_otlp_endpoint, headers=otel_exporter_otlp_headers)
key_value_pairs = resource_attributes.split(",")
result_dict = {}
for pair in key_value_pairs:
key, value = pair.split("=")
result_dict[key] = value
resourceAttributes = {
"service.name": result_dict["service.name"],
"service.version": result_dict["service.version"],
"deployment.environment": result_dict["deployment.environment"],
# # Add more attributes as needed
}
resource = Resource.create(resourceAttributes)
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
# Sets the global default tracer provider
trace.set_tracer_provider(provider)
# Creates a tracer from the global tracer provider
tracer = trace.get_tracer("favorite")
application_port = os.environ.get("APPLICATION_PORT") or 5000
app = Flask(__name__)
FlaskInstrumentor().instrument_app(app)
# OpenTelemetryMiddleware().instrument()
RequestsInstrumentor().instrument()
RedisInstrumentor().instrument()
# app.wsgi_app = OpenTelemetryMiddleware(app.wsgi_app)
# Get the Logger
logger = logging.getLogger("app")
logger.setLevel(logging.DEBUG)
# Add an ECS formatter to the Handler
handler = logging.StreamHandler()
handler.setFormatter(ecs_logging.StdlibFormatter())
logger.addHandler(handler)
logging.getLogger("werkzeug").setLevel(logging.ERROR)
logging.getLogger("werkzeug").addHandler(handler)
r = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
@app.after_request
def after_request(response):
print(response)
# timestamp in iso8601
timestamp = datetime.datetime.utcnow().isoformat()
logger.info(
"%s %s %s %s %s %s",
timestamp,
request.remote_addr,
request.method,
request.scheme,
request.full_path,
response.status,
extra={
"event.dataset": "favorite.log",
"http.request.method": request.method,
"http.request.path": request.full_path,
"source.ip": request.remote_addr,
"http.response.status_code": response.status,
},
)
return response
@app.route("/")
def hello():
logger.info("Main request successfull")
return "Hello World!"
@app.route("/favorites", methods=["GET"])
def get_favorite_movies():
# add artificial delay if enabled
if delay_time > 0:
time.sleep(max(0, random.gauss(delay_time / 1000, delay_time / 1000 / 10)))
with tracer.start_as_current_span("get_favorite_movies"):
user_id = str(request.args.get("user_id"))
logger.info(
"Getting favorites for user " + user_id,
extra={"event.dataset": "favorite.log", "user.id": request.args.get("user_id")},
)
favorites = r.smembers(user_id)
# convert to list
favorites = list(favorites)
logger.info(
"User " + user_id + " has favorites: " + str(favorites),
extra={"event.dataset": "favorite.log", "user.id": user_id},
)
return {"favorites": favorites}
@app.route("/favorites", methods=["POST"])
def add_favorite_movie():
# add artificial delay if enabled
if delay_time > 0:
time.sleep(max(0, random.gauss(delay_time / 1000, delay_time / 1000 / 10)))
with tracer.start_as_current_span("add_favorite_movies", set_status_on_exception=True) as span:
user_id = str(request.args.get("user_id"))
movie_id = request.json["id"]
logger.info(
"Adding or removing favorites for user " + user_id,
extra={"event.dataset": "favorite.log", "user.id": user_id},
)
# add movie to the user's favorite list. If it already exists, remove it from the list
redisRespone = r.srem(user_id, int(movie_id))
if redisRespone == 0:
r.sadd(user_id, movie_id)
favorites = r.smembers(user_id)
# convert to list
favorites = list(favorites)
logger.info(
"User " + user_id + " has favorites: " + str(favorites),
extra={"event.dataset": "favorite.log", "user.id": user_id},
)
# if enabled, in 50% of the cases, sleep for 2 seconds
sleep_time = os.getenv("TOGGLE_CANARY_DELAY")
if sleep_time is None or sleep_time == "":
sleep_time = 0
sleep_time = int(sleep_time)
if sleep_time > 0 and random.random() < 0.5:
time.sleep(max(0, random.gauss(sleep_time / 1000, sleep_time / 1000 / 10)))
# add label to transaction
logger.info("Canary enabled")
# add label to span using opentelemetry
span.set_attribute("canary", "test-new-feature")
span.set_attribute("quiz_solution", "correlations")
if random.random() < float(os.getenv("TOGGLE_CANARY_FAILURE", 0)):
# throw an exception in 50% of the cases
logger.error("Something went wrong")
# set the span status to error
raise Exception("Something went wrong")
return {"favorites": favorites}
logger.info("App startup")
app.run(host="0.0.0.0", port=application_port)
logger.info("App Stopped")