skywalking/client/__init__.py (137 lines of code) (raw):

# # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import os import json import platform import socket from abc import ABC, abstractmethod from typing import List from skywalking import config from skywalking.loggings import logger from skywalking.protocol.common.Common_pb2 import KeyStringValuePair class ServiceManagementClient(ABC): """ Used to register service and instance to OAP. """ def __init__(self): self.sent_properties_counter = 0 @abstractmethod def send_instance_props(self) -> None: """ Unique to each protocol, send instance properties to OAP. """ raise NotImplementedError() def refresh_instance_props(self) -> None: """ Periodically refresh the instance properties to prevent loss on OAP TTL records expiration. Default: 30 * 10 seconds """ self.sent_properties_counter += 1 if self.sent_properties_counter % config.agent_collector_properties_report_period_factor == 0: self.send_instance_props() @staticmethod def get_instance_properties() -> List[dict]: """ Get current running Python interpreter's system properties. Returns: [{'key': str, 'value': str}, ...] """ try: properties = [ {'key': 'language', 'value': 'python'}, {'key': 'OS Name', 'value': os.name}, {'key': 'Process No.', 'value': str(os.getpid())}, {'key': 'hostname', 'value': socket.gethostname()}, {'key': 'ipv4', 'value': '; '.join(socket.gethostbyname_ex(socket.gethostname())[2])}, {'key': 'python_implementation', 'value': platform.python_implementation()}, {'key': 'python_version', 'value': platform.python_version()}, ] except Exception as e: # noqa logger.exception('Failed to get OS info, fallback to basic properties.') properties = [ {'key': 'language', 'value': 'python'}, {'key': 'Process No.', 'value': str(os.getpid())}, ] namespace = config.agent_namespace if namespace: properties.append({'key': 'namespace', 'value': namespace}) instance_properties_json = config.agent_instance_properties_json if instance_properties_json: # load instance properties from json string json_properties = json.loads(instance_properties_json) for key, value in json_properties.items(): properties.append({'key': key, 'value': value}) return properties def get_instance_properties_proto(self) -> List[KeyStringValuePair]: """ Converts to protobuf format. Returns: [KeyStringValuePair, ...] """ return [KeyStringValuePair(key=prop['key'], value=prop['value']) for prop in self.get_instance_properties()] def send_heart_beat(self) -> None: """ Each protocol must implement this method to send heart beat to OAP. Returns: None """ raise NotImplementedError() class Service(ABC): @abstractmethod def report(self, segment: bytes) -> None: raise NotImplementedError() class TraceSegmentReportService(Service): @abstractmethod def report(self, generator): raise NotImplementedError() class MeterReportService(Service): @abstractmethod def report(self, generator): raise NotImplementedError() class LogDataReportService(Service): @abstractmethod def report(self, generator): raise NotImplementedError() class ProfileTaskChannelService(Service): @abstractmethod def do_query(self): raise NotImplementedError() @abstractmethod def report(self, generator): raise NotImplementedError() # Asyncio Implementation class ServiceManagementClientAsync(ABC): """ Used to register service and instance to OAP, for Asyncio. """ def __init__(self): self.sent_properties_counter = 0 @abstractmethod async def send_instance_props(self) -> None: """ Unique to each protocol, send instance properties to OAP. """ raise NotImplementedError() async def refresh_instance_props(self) -> None: """ Periodically refresh the instance properties to prevent loss on OAP TTL records expiration. Default: 30 * 10 seconds """ self.sent_properties_counter += 1 if self.sent_properties_counter % config.agent_collector_properties_report_period_factor == 0: await self.send_instance_props() @staticmethod def get_instance_properties() -> List[dict]: """ Get current running Python interpreter's system properties. Returns: [{'key': str, 'value': str}, ...] """ try: properties = [ {'key': 'language', 'value': 'python'}, {'key': 'OS Name', 'value': os.name}, {'key': 'Process No.', 'value': str(os.getpid())}, {'key': 'hostname', 'value': socket.gethostname()}, {'key': 'ipv4', 'value': '; '.join(socket.gethostbyname_ex(socket.gethostname())[2])}, {'key': 'python_implementation', 'value': platform.python_implementation()}, {'key': 'python_version', 'value': platform.python_version()}, ] except Exception as e: # noqa logger.exception('Failed to get OS info, fallback to basic properties.') properties = [ {'key': 'language', 'value': 'python'}, {'key': 'Process No.', 'value': str(os.getpid())}, ] namespace = config.agent_namespace if namespace: properties.append({'key': 'namespace', 'value': namespace}) instance_properties_json = config.agent_instance_properties_json if instance_properties_json: # load instance properties from json string json_properties = json.loads(instance_properties_json) for key, value in json_properties.items(): properties.append({'key': key, 'value': value}) return properties def get_instance_properties_proto(self) -> List[KeyStringValuePair]: """ Converts to protobuf format. Returns: [KeyStringValuePair, ...] """ return [KeyStringValuePair(key=prop['key'], value=prop['value']) for prop in self.get_instance_properties()] async def send_heart_beat(self) -> None: """ Each protocol must implement this method to send heart beat to OAP. Returns: None """ raise NotImplementedError() class ServiceAsync(ABC): @abstractmethod async def report(self, segment: bytes) -> None: raise NotImplementedError() class TraceSegmentReportServiceAsync(ServiceAsync): @abstractmethod async def report(self, generator): raise NotImplementedError() class MeterReportServiceAsync(ServiceAsync): @abstractmethod async def report(self, generator): raise NotImplementedError() class LogDataReportServiceAsync(ServiceAsync): @abstractmethod async def report(self, generator): raise NotImplementedError() class ProfileTaskChannelServiceAsync(ServiceAsync): @abstractmethod async def do_query(self): raise NotImplementedError() @abstractmethod async def report(self, generator): raise NotImplementedError()