skywalking/plugins/sw_grpc.py (393 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from asyncio import iscoroutine
from concurrent.futures import Executor
from typing import Any, Awaitable, Callable, List, NamedTuple, Optional, Sequence, Tuple, Union
from skywalking import Component, Layer, config
from skywalking.trace.carrier import Carrier
from skywalking.trace.context import NoopContext, get_context
from skywalking.trace.span import NoopSpan
from skywalking.trace.tags import TagGrpcMethod, TagGrpcStatusCode, TagGrpcUrl
link_vector = ['https://grpc.io/docs/languages/python']
support_matrix = {'grpcio': {'>=3.8': ['1.*']}}
note = """"""
def _get_factory_and_method(rpc_handler: Any) -> Tuple[Callable[..., Any], Callable[..., Any]]:
import grpc
if rpc_handler.unary_unary:
return grpc.unary_unary_rpc_method_handler, rpc_handler.unary_unary
elif rpc_handler.unary_stream:
return grpc.unary_stream_rpc_method_handler, rpc_handler.unary_stream
elif rpc_handler.stream_unary:
return grpc.stream_unary_rpc_method_handler, rpc_handler.stream_unary
elif rpc_handler.stream_stream:
return grpc.stream_stream_rpc_method_handler, rpc_handler.stream_stream
else:
raise RuntimeError('RPC handler implementation does not exist')
def _restore_carrier(handler_call_details: Any) -> Carrier:
metadata_map = dict(handler_call_details.invocation_metadata or ())
carrier = Carrier()
for item in carrier:
val = metadata_map.get(item.key)
if val is not None:
item.val = val
return carrier
def get_method_name(details: Any) -> str:
method_name = details.method
if type(method_name) is bytes:
return method_name.decode()
if type(method_name) is str:
return method_name
return str(method_name)
def install_sync() -> None:
import grpc
from grpc import _channel
def install_server() -> None:
_grpc_server = grpc.server
def _make_invoke_intercept_method(
is_response_streaming: bool,
next_handler_method: Callable[..., Any],
handler_call_details: grpc.HandlerCallDetails,
) -> Callable[[Any, grpc.ServicerContext], Any]:
def invoke_intercept_method(request_or_iterator: Any, context: grpc.ServicerContext) -> Any:
carrier = _restore_carrier(handler_call_details)
method_name = get_method_name(handler_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_entry_span(op=method_name, carrier=carrier)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.component = Component.Grpc
span.peer = context.peer()
try:
return next_handler_method(request_or_iterator, context)
except grpc.RpcError as e:
if hasattr(e, 'code'):
span.tag(TagGrpcStatusCode(e.code()))
else:
span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
span.raised()
raise e
def invoke_intercept_method_response_streaming(
request_or_iterator: Any, context: grpc.ServicerContext
) -> Any:
carrier = _restore_carrier(handler_call_details)
method_name = get_method_name(handler_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_entry_span(op=method_name, carrier=carrier)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.component = Component.Grpc
span.peer = context.peer()
try:
yield from next_handler_method(request_or_iterator, context)
except grpc.RpcError as e:
if hasattr(e, 'code'):
span.tag(TagGrpcStatusCode(e.code()))
else:
span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
span.raised()
raise e
return invoke_intercept_method_response_streaming if is_response_streaming else invoke_intercept_method
class ServerInterceptor(grpc.ServerInterceptor):
def intercept_service(
self,
continuation: Callable[[grpc.HandlerCallDetails], grpc.RpcMethodHandler],
handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
next_handler = continuation(handler_call_details)
handler_factory, next_handler_method = _get_factory_and_method(next_handler)
return handler_factory(
_make_invoke_intercept_method(
bool(next_handler.response_streaming), next_handler_method, handler_call_details
),
request_deserializer=next_handler.request_deserializer,
response_serializer=next_handler.response_serializer,
)
def _sw_grpc_server(
thread_pool: Optional[Executor] = None,
handlers: Optional[Sequence[grpc.GenericRpcHandler]] = None,
interceptors: Optional[Sequence[Any]] = None,
options: Optional[Any] = None,
maximum_concurrent_rpcs: Optional[int] = None,
compression: Optional[grpc.Compression] = None,
xds: bool = False,
):
_sw_interceptors = [ServerInterceptor()]
if interceptors is not None:
_sw_interceptors.extend(interceptors)
return _grpc_server(
thread_pool,
handlers,
_sw_interceptors,
options,
maximum_concurrent_rpcs,
compression,
xds,
)
grpc.server = _sw_grpc_server
def install_client() -> None:
_grpc_channel = _channel.Channel
class _ClientCallDetails(NamedTuple):
method: str
timeout: Optional[float]
metadata: Optional[Sequence[Tuple[str, Union[str, bytes]]]]
credentials: Optional[grpc.CallCredentials]
wait_for_ready: Optional[bool]
compression: Any
class _SWClientCallDetails(_ClientCallDetails, grpc.ClientCallDetails):
pass
class _ClientInterceptor(
grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor,
):
def __init__(self, target: str):
self.target = target
def _intercept(
self,
continuation: Callable[..., Any],
client_call_details: grpc.ClientCallDetails,
request: Any,
):
method_name = get_method_name(client_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_exit_span(op=method_name, peer=self.target, component=Component.Grpc)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.tag(TagGrpcUrl(self.target))
carrier = span.inject()
metadata = list(client_call_details.metadata or [])
for item in carrier:
metadata.append((item.key, item.val))
new_client_call_details = _SWClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression,
)
return continuation(new_client_call_details, request)
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept(continuation, client_call_details, request)
def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept(continuation, client_call_details, request)
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self._intercept(continuation, client_call_details, request_iterator)
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self._intercept(continuation, client_call_details, request_iterator)
def _sw_grpc_channel_factory(target: str, *args: Any, **kwargs: Any):
c = _grpc_channel(target, *args, **kwargs)
if target == config.agent_collector_backend_services:
return c
return grpc.intercept_channel(c, _ClientInterceptor(target))
_channel.Channel = _sw_grpc_channel_factory
install_client()
install_server()
def install_async() -> None:
import grpc
from grpc.aio import _channel as _aio_channel
def install_async_server() -> None:
_grpc_aio_server = grpc.aio.server
def _make_invoke_intercept_method(
is_response_streaming: bool,
next_handler_method: Callable[..., Any],
handler_call_details: grpc.HandlerCallDetails,
) -> Callable[[Any, grpc.aio.ServicerContext[Any, Any]], Awaitable[Any]]:
async def invoke_intercept_method(
request_or_iterator: Any, context: grpc.aio.ServicerContext[Any, Any]
) -> Any:
carrier = _restore_carrier(handler_call_details)
method_name = get_method_name(handler_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_entry_span(op=method_name, carrier=carrier)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.component = Component.Grpc
span.peer = context.peer()
try:
return await next_handler_method(request_or_iterator, context)
except grpc.RpcError as e:
if hasattr(e, 'code'):
span.tag(TagGrpcStatusCode(e.code()))
else:
span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
span.raised()
raise e
async def invoke_intercept_method_response_streaming(
request_or_iterator: Any, context: grpc.aio.ServicerContext[Any, Any]
) -> Any:
carrier = _restore_carrier(handler_call_details)
method_name = get_method_name(handler_call_details)
with (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_entry_span(op=method_name, carrier=carrier)
) as span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.component = Component.Grpc
span.peer = context.peer()
try:
coroutine_or_asyncgen = next_handler_method(request_or_iterator, context)
async for r in (
await coroutine_or_asyncgen if iscoroutine(coroutine_or_asyncgen) else coroutine_or_asyncgen
):
yield r
except grpc.RpcError as e:
if hasattr(e, 'code'):
span.tag(TagGrpcStatusCode(e.code()))
else:
span.tag(TagGrpcStatusCode(grpc.StatusCode.UNKNOWN))
span.raised()
raise e
return invoke_intercept_method_response_streaming if is_response_streaming else invoke_intercept_method
class ServerInterceptor(grpc.aio.ServerInterceptor):
async def intercept_service(
self,
continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler]],
handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler:
next_handler = await continuation(handler_call_details)
handler_factory, next_handler_method = _get_factory_and_method(next_handler)
return handler_factory(
_make_invoke_intercept_method(
bool(next_handler.response_streaming), next_handler_method, handler_call_details
),
request_deserializer=next_handler.request_deserializer,
response_serializer=next_handler.response_serializer,
)
def _sw_grpc_aio_server(
migration_thread_pool: Optional[Executor] = None,
handlers: Optional[Sequence[grpc.GenericRpcHandler]] = None,
interceptors: Optional[Sequence[Any]] = None,
options: Optional[grpc.aio.ChannelArgumentType] = None,
maximum_concurrent_rpcs: Optional[int] = None,
compression: Optional[grpc.Compression] = None,
):
_sw_interceptors = [ServerInterceptor()]
if interceptors is not None:
_sw_interceptors.extend(interceptors)
return _grpc_aio_server(
migration_thread_pool,
handlers,
_sw_interceptors,
options,
maximum_concurrent_rpcs,
compression,
)
grpc.aio.server = _sw_grpc_aio_server
def install_async_client() -> None:
_aio_grpc_channel = _aio_channel.Channel
class _AioClientCallDetails(NamedTuple):
method: str
timeout: Optional[float]
metadata: Optional[grpc.aio.Metadata]
credentials: Optional[grpc.CallCredentials]
wait_for_ready: Optional[bool]
class _SWAioClientCallDetails(_AioClientCallDetails, grpc.aio.ClientCallDetails):
pass
class _AioClientInterceptor:
def __init__(self, target: str):
self.target = target
async def _intercept(
self,
continuation: Callable[..., Any],
client_call_details: grpc.aio.ClientCallDetails,
request: Any,
):
method_name = get_method_name(client_call_details)
span = (
NoopSpan(NoopContext())
if config.ignore_grpc_method_check(method_name)
else get_context().new_exit_span(op=method_name, peer=self.target, component=Component.Grpc)
)
with span:
span.layer = Layer.RPCFramework
span.tag(TagGrpcMethod(method_name))
span.tag(TagGrpcUrl(self.target))
carrier = span.inject()
metadata = client_call_details.metadata or grpc.aio.Metadata()
for item in carrier:
metadata.add(item.key, item.val)
new_client_call_details = _SWAioClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
)
return await continuation(new_client_call_details, request)
class _AioClientUnaryUnaryInterceptor(_AioClientInterceptor, grpc.aio.UnaryUnaryClientInterceptor):
async def intercept_unary_unary(
self,
continuation: Callable[
[grpc.aio.ClientCallDetails, grpc.aio._typing.RequestType], grpc.aio._call.UnaryUnaryCall
],
client_call_details: grpc.aio.ClientCallDetails,
request: grpc.aio._typing.RequestType,
) -> grpc.aio._call.UnaryUnaryCall:
return await self._intercept(continuation, client_call_details, request)
class _AioClientUnaryStreamInterceptor(_AioClientInterceptor, grpc.aio.UnaryStreamClientInterceptor):
async def intercept_unary_stream(
self,
continuation: Callable[
[grpc.aio.ClientCallDetails, grpc.aio._typing.RequestType], grpc.aio._call.UnaryStreamCall
],
client_call_details: grpc.aio.ClientCallDetails,
request: grpc.aio._typing.RequestType,
) -> grpc.aio._call.UnaryStreamCall:
return await self._intercept(continuation, client_call_details, request)
class _AioClientStreamUnaryInterceptor(_AioClientInterceptor, grpc.aio.StreamUnaryClientInterceptor):
async def intercept_stream_unary(
self,
continuation: Callable[
[grpc.aio.ClientCallDetails, grpc.aio._typing.RequestType], grpc.aio._call.StreamUnaryCall
],
client_call_details: grpc.aio.ClientCallDetails,
request_iterator: grpc.aio._typing.RequestIterableType,
) -> grpc.aio._call.StreamUnaryCall:
return await self._intercept(continuation, client_call_details, request_iterator)
class _AioClientStreamStreamInterceptor(_AioClientInterceptor, grpc.aio.StreamStreamClientInterceptor):
async def intercept_stream_stream(
self,
continuation: Callable[
[grpc.aio.ClientCallDetails, grpc.aio._typing.RequestType], grpc.aio._call.StreamStreamCall
],
client_call_details: grpc.aio.ClientCallDetails,
request_iterator: grpc.aio._typing.RequestIterableType,
) -> grpc.aio._call.StreamStreamCall:
return await self._intercept(continuation, client_call_details, request_iterator)
def _sw_grpc_aio_channel_factory(
target: str,
options: grpc.aio.ChannelArgumentType,
credentials: Optional[grpc.ChannelCredentials],
compression: Optional[grpc.Compression],
interceptors: Optional[Sequence[grpc.aio.ClientInterceptor]],
):
if target == config.agent_collector_backend_services:
return _aio_grpc_channel(target, options, credentials, compression, interceptors)
_sw_interceptors: List[grpc.aio.ClientInterceptor] = [
_AioClientUnaryUnaryInterceptor(target),
_AioClientUnaryStreamInterceptor(target),
_AioClientStreamUnaryInterceptor(target),
_AioClientStreamStreamInterceptor(target),
]
_sw_interceptors.extend(interceptors or [])
return _aio_grpc_channel(target, options, credentials, compression, _sw_interceptors)
_aio_channel.Channel = _sw_grpc_aio_channel_factory
install_async_client()
install_async_server()
def install():
install_sync()
install_async()