in skywalking/plugins/sw_grpc.py [0:0]
def install_sync() -> None:
import grpc
from grpc import _channel
def install_server() -> None:
_grpc_server = grpc.server
def _make_invoke_intercept_method(
is_response_streaming: bool,
next_handler_method: Callable[..., Any],
handler_call_details: grpc.HandlerCallDetails,
) -> Callable[[Any, grpc.ServicerContext], Any]:
def invoke_intercept_method(request_or_iterator: Any, context: grpc.ServicerContext) -> Any:
carrier = _restore_carrier(handler_call_details)
method_name = get_method_name(handler_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_entry_span(op=method_name, carrier=carrier)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.component = Component.Grpc
span.peer = context.peer()
try:
return next_handler_method(request_or_iterator, context)
except grpc.RpcError as e:
if hasattr(e, 'code'):
span.tag(TagGrpcStatusCode(e.code()))
else:
span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
span.raised()
raise e
def invoke_intercept_method_response_streaming(
request_or_iterator: Any, context: grpc.ServicerContext
) -> Any:
carrier = _restore_carrier(handler_call_details)
method_name = get_method_name(handler_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_entry_span(op=method_name, carrier=carrier)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.component = Component.Grpc
span.peer = context.peer()
try:
yield from next_handler_method(request_or_iterator, context)
except grpc.RpcError as e:
if hasattr(e, 'code'):
span.tag(TagGrpcStatusCode(e.code()))
else:
span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
span.raised()
raise e
return invoke_intercept_method_response_streaming if is_response_streaming else invoke_intercept_method
class ServerInterceptor(grpc.ServerInterceptor):
def intercept_service(
self,
continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler],
handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
next_handler = continuation(handler_call_details)
handler_factory, next_handler_method = _get_factory_and_method(next_handler)
return handler_factory(
_make_invoke_intercept_method(
bool(next_handler.response_streaming), next_handler_method, handler_call_details
),
request_deserializer=next_handler.request_deserializer,
response_serializer=next_handler.response_serializer,
)
def _sw_grpc_server(
thread_pool: Optional[Executor] = None,
handlers: Optional[Sequence[grpc.GenericRpcHandler]] = None,
interceptors: Optional[Sequence[Any]] = None,
options: Optional[Any] = None,
maximum_concurrent_rpcs: Optional[int] = None,
compression: Optional[grpc.Compression] = None,
xds: bool = False,
):
_sw_interceptors = [ServerInterceptor()]
if interceptors is not None:
_sw_interceptors.extend(interceptors)
return _grpc_server(
thread_pool,
handlers,
_sw_interceptors,
options,
maximum_concurrent_rpcs,
compression,
xds,
)
grpc.server = _sw_grpc_server
def install_client() -> None:
_grpc_channel = _channel.Channel
class _ClientCallDetails(NamedTuple):
method: str
timeout: Optional[float]
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]]
credentials: Optional[grpc.CallCredentials]
wait_for_ready: Optional[bool]
compression: Any
class _SWClientCallDetails(_ClientCallDetails, grpc.ClientCallDetails):
pass
class _ClientInterceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
def __init__(self, target: str):
self.target = target
def _intercept(
self,
continuation: Callable[..., Any],
client_call_details: grpc.ClientCallDetails,
request: Any,
):
method_name = get_method_name(client_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_exit_span(op=method_name, peer=self.target, component=Component.Grpc)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.tag(TagGrpcUrl(self.target))
carrier = span.inject()
metadata = list(client_call_details.metadata or [])
for item in carrier:
metadata.append((item.key, item.val))
new_client_call_details = _SWClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression,
)
return continuation(new_client_call_details, request)
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept(continuation, client_call_details, request)
def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept(continuation, client_call_details, request)
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self._intercept(continuation, client_call_details, request_iterator)
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self._intercept(continuation, client_call_details, request_iterator)
def _sw_grpc_channel_factory(target: str, *args: Any, **kwargs: Any):
c = _grpc_channel(target, *args, **kwargs)
if target == config.agent_collector_backend_services:
return c
return grpc.intercept_channel(c, _ClientInterceptor(target))
_channel.Channel = _sw_grpc_channel_factory
install_client()
install_server()