in src/Backend/src/api/tracing.py [0:0]
def setup(self, app: FastAPI) -> None:
"""
Set up the tracer.
"""
initial_resource = get_initial_resource(
project_id=self._project_id,
environment=self._environment,
)
ctx_resources = self.get_resources()
trace_processor = self.get_trace_processor()
# Setup tracing
tracer_provider = TracerProvider(resource=ctx_resources.merge(initial_resource))
tracer_provider.add_span_processor(trace_processor)
trace.set_tracer_provider(tracer_provider)
# Setup distributed tracing
propagate.set_global_textmap(
CompositePropagator([CloudTraceFormatPropagator(), W3CBaggagePropagator()])
)
app.add_event_handler("shutdown", tracer_provider.force_flush)
FastAPIInstrumentor.instrument_app(
app, client_response_hook=self.client_response_hook
)
AioHttpClientInstrumentor().instrument()
RequestsInstrumentor().instrument()
AsyncPGInstrumentor().instrument() # type: ignore[no-untyped-call]
SQLAlchemyInstrumentor().instrument()
GrpcInstrumentorClient().instrument() # type: ignore[no-untyped-call]
GrpcAioInstrumentorClient().instrument() # type: ignore[no-untyped-call]