def install_sync()

in skywalking/plugins/sw_grpc.py [0:0]


def install_sync() -> None:
    import grpc
    from grpc import _channel

    def install_server() -> None:
        _grpc_server = grpc.server

        def _make_invoke_intercept_method(
            is_response_streaming: bool,
            next_handler_method: Callable[..., Any],
            handler_call_details: grpc.HandlerCallDetails,
        ) -> Callable[[Any, grpc.ServicerContext], Any]:

            def invoke_intercept_method(request_or_iterator: Any, context: grpc.ServicerContext) -> Any:

                carrier = _restore_carrier(handler_call_details)
                method_name = get_method_name(handler_call_details)
                with (
                    NoopSpan(NoopContext())
                    if config.ignore_grpc_method_check(method_name)
                    else get_context().new_entry_span(op=method_name, carrier=carrier)
                ) as span:
                    span.layer = Layer.RPCFramework
                    span.tag(TagGrpcMethod(method_name))
                    span.component = Component.Grpc
                    span.peer = context.peer()
                    try:
                        return next_handler_method(request_or_iterator, context)
                    except grpc.RpcError as e:
                        if hasattr(e, 'code'):
                            span.tag(TagGrpcStatusCode(e.code()))
                        else:
                            span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
                        span.raised()
                        raise e

            def invoke_intercept_method_response_streaming(
                request_or_iterator: Any, context: grpc.ServicerContext
            ) -> Any:
                carrier = _restore_carrier(handler_call_details)
                method_name = get_method_name(handler_call_details)
                with (
                    NoopSpan(NoopContext())
                    if config.ignore_grpc_method_check(method_name)
                    else get_context().new_entry_span(op=method_name, carrier=carrier)
                ) as span:
                    span.layer = Layer.RPCFramework
                    span.tag(TagGrpcMethod(method_name))
                    span.component = Component.Grpc
                    span.peer = context.peer()
                    try:
                        yield from next_handler_method(request_or_iterator, context)
                    except grpc.RpcError as e:
                        if hasattr(e, 'code'):
                            span.tag(TagGrpcStatusCode(e.code()))
                        else:
                            span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
                        span.raised()
                        raise e

            return invoke_intercept_method_response_streaming if is_response_streaming else invoke_intercept_method

        class ServerInterceptor(grpc.ServerInterceptor):

            def intercept_service(
                self,
                continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler],
                handler_call_details: grpc.HandlerCallDetails,
            ) -> grpc.RpcMethodHandler:
                next_handler = continuation(handler_call_details)

                handler_factory, next_handler_method = _get_factory_and_method(next_handler)

                return handler_factory(
                    _make_invoke_intercept_method(
                        bool(next_handler.response_streaming), next_handler_method, handler_call_details
                    ),
                    request_deserializer=next_handler.request_deserializer,
                    response_serializer=next_handler.response_serializer,
                )

        def _sw_grpc_server(
            thread_pool: Optional[Executor] = None,
            handlers: Optional[Sequence[grpc.GenericRpcHandler]] = None,
            interceptors: Optional[Sequence[Any]] = None,
            options: Optional[Any] = None,
            maximum_concurrent_rpcs: Optional[int] = None,
            compression: Optional[grpc.Compression] = None,
            xds: bool = False,
        ):
            _sw_interceptors = [ServerInterceptor()]
            if interceptors is not None:
                _sw_interceptors.extend(interceptors)
            return _grpc_server(
                thread_pool,
                handlers,
                _sw_interceptors,
                options,
                maximum_concurrent_rpcs,
                compression,
                xds,
            )

        grpc.server = _sw_grpc_server

    def install_client() -> None:
        _grpc_channel = _channel.Channel

        class _ClientCallDetails(NamedTuple):
            method: str
            timeout: Optional[float]
            metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]]
            credentials: Optional[grpc.CallCredentials]
            wait_for_ready: Optional[bool]
            compression: Any

        class _SWClientCallDetails(_ClientCallDetails, grpc.ClientCallDetails):
            pass

        class _ClientInterceptor(
            grpc.UnaryUnaryClientInterceptor,
            grpc.UnaryStreamClientInterceptor,
            grpc.StreamUnaryClientInterceptor,
            grpc.StreamStreamClientInterceptor,
        ):
            def __init__(self, target: str):
                self.target = target

            def _intercept(
                self,
                continuation: Callable[..., Any],
                client_call_details: grpc.ClientCallDetails,
                request: Any,
            ):
                method_name = get_method_name(client_call_details)
                with (
                    NoopSpan(NoopContext())
                    if config.ignore_grpc_method_check(method_name)
                    else get_context().new_exit_span(op=method_name, peer=self.target, component=Component.Grpc)
                ) as span:
                    span.layer = Layer.RPCFramework
                    span.tag(TagGrpcMethod(method_name))
                    span.tag(TagGrpcUrl(self.target))
                    carrier = span.inject()
                    metadata = list(client_call_details.metadata or [])
                    for item in carrier:
                        metadata.append((item.key, item.val))
                    new_client_call_details = _SWClientCallDetails(
                        client_call_details.method,
                        client_call_details.timeout,
                        metadata,
                        client_call_details.credentials,
                        client_call_details.wait_for_ready,
                        client_call_details.compression,
                    )
                    return continuation(new_client_call_details, request)

            def intercept_unary_unary(self, continuation, client_call_details, request):
                return self._intercept(continuation, client_call_details, request)

            def intercept_unary_stream(self, continuation, client_call_details, request):
                return self._intercept(continuation, client_call_details, request)

            def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
                return self._intercept(continuation, client_call_details, request_iterator)

            def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
                return self._intercept(continuation, client_call_details, request_iterator)

        def _sw_grpc_channel_factory(target: str, *args: Any, **kwargs: Any):
            c = _grpc_channel(target, *args, **kwargs)
            if target == config.agent_collector_backend_services:
                return c
            return grpc.intercept_channel(c, _ClientInterceptor(target))

        _channel.Channel = _sw_grpc_channel_factory

    install_client()
    install_server()