elastic_agent_client/client.py (121 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
from dataclasses import dataclass
from typing import Optional
import elastic_agent_client.generated.elastic_agent_client_pb2 as proto
from elastic_agent_client.generated.elastic_agent_client_future_pb2_grpc import (
ElasticAgentArtifact,
ElasticAgentLog,
ElasticAgentStore,
)
from elastic_agent_client.generated.elastic_agent_client_pb2_grpc import (
ElasticAgentStub,
)
@dataclass
class VersionInfo:
def __init__(
self, name: str, meta: Optional[dict] = None, build_hash: Optional[str] = None
):
self.name = name
self.meta = meta
self.build_hash = build_hash
class V2Options:
def __init__(
self,
max_message_size: Optional[int] = None,
chunking_allowed: Optional[bool] = None,
agent_info: Optional[proto.AgentInfo] = None,
):
self.max_message_size = max_message_size
self.chunking_allowed = chunking_allowed
self.agent_info = agent_info
self.credentials = None
class Unit:
def __init__(
self,
unit_id=None,
unit_type=None,
expected_state=None,
log_level=None,
config=None,
config_idx=None,
features=None,
features_idx=None,
apm=None,
state=None,
state_msg=None,
state_payload=None,
actions=None,
client=None,
diag_hooks=None,
):
self.id: Optional[str] = unit_id
self.unit_type: Optional[proto.UnitType] = unit_type
self.expected_state: Optional[proto.State] = expected_state
self.log_level: Optional[proto.UnitLogLevel] = log_level
self.config: Optional[proto.UnitExpectedConfig] = config
self.config_idx: Optional[int] = config_idx
self.features: Optional[proto.Features] = features
self.features_idx: Optional[int] = features_idx
self.apm: Optional[proto.APMConfig] = apm
self.state: Optional[proto.State] = state
self.state_msg: Optional[str] = state_msg
self.state_payload: Optional[dict] = state_payload
self.actions: Optional[dict] = actions
self.client: Optional[V2] = client
self.diag_hooks: Optional[dict] = diag_hooks
def to_observed(self) -> proto.UnitObserved:
return proto.UnitObserved(
id=self.id,
type=self.unit_type,
config_state_idx=self.config_idx,
state=self.state,
message=self.state_msg,
payload=self.state_payload,
)
class V2:
def __init__(self):
self.target: Optional[str] = None
self.opts: Optional[V2Options] = None
self.token: Optional[str] = None
self.agent_info: Optional[proto.AgentInfo] = None
self.version_info: Optional[VersionInfo] = None
self.version_info_sent: Optional[bool] = None
self.client: Optional[ElasticAgentStub] = None
self.store_client: Optional[ElasticAgentStore] = None
self.artifact_client: Optional[ElasticAgentArtifact] = None
self.log_client: Optional[ElasticAgentLog] = None
self.units: Optional[list[Unit]] = None
self.features_idx: Optional[int] = None
self.component_idx: Optional[int] = None
self.component_config: Optional[proto.Component] = None
self.apm_config: Optional[proto.APMConfig] = None
def __str__(self):
if self.agent_info and self.version_info:
return f"V2 Client: (agent id: {self.agent_info.id}, agent version: {self.agent_info.version}, name: {self.version_info.name}, target: {self.target})"
else:
return "Uninitialized V2 Client"
def sync_component(self, checkin: proto.CheckinExpected):
self.component_config = checkin.component
self.component_idx = checkin.component_idx
def sync_units(self, checkin: proto.CheckinExpected):
if checkin.component:
self.apm_config = checkin.component.apm_config
units = []
for expected_unit in checkin.units:
unit = Unit()
unit.id = expected_unit.id
unit.unit_type = expected_unit.type
unit.expected_state = expected_unit.state
unit.log_level = expected_unit.log_level
unit.config = expected_unit.config
unit.config_idx = expected_unit.config_state_idx
unit.features = None # TODO?
unit.features_idx = 0 # TODO?
unit.apm = None # TODO?
unit.state = expected_unit.state
unit.state_msg = "" # TODO?
unit.state_payload = None # TODO?
unit.actions = None # TODO?
unit.client = self
unit.diag_hooks = {} # TODO?
units.append(unit)
self.units = units