elastic_agent_client/service/actions.py (55 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
import asyncio
import json
import logging
from google.protobuf.json_format import MessageToDict
import elastic_agent_client.generated.elastic_agent_client_pb2 as proto
from elastic_agent_client.client import V2
from elastic_agent_client.handler.action import BaseActionHandler
from elastic_agent_client.util.async_tools import AsyncQueueIterator, BaseService
from elastic_agent_client.util.logger import logger
class ActionsService(BaseService):
name = "actions"
def __init__(self, client: V2, action_handler: BaseActionHandler):
super().__init__(client, "actions")
logger.debug(f"Initializing the {self.name} service")
self.client = client
self.action_handler = action_handler
async def _run(self):
logger.debug(f"Initializing the {self.name} service")
if self.client.client is None:
msg = "gRPC client is not yet set"
raise RuntimeError(msg)
send_queue: asyncio.Queue = asyncio.Queue()
action_stream = self.client.client.Actions(AsyncQueueIterator(send_queue))
logger.info("Sending startup action event")
await send_queue.put(
proto.ActionResponse(
token=self.client.token,
id="init",
status=proto.ActionResponse.SUCCESS,
result=self.init_action_result(),
)
)
logger.info("Listening for action events")
action: proto.ActionRequest
async for action in action_stream:
if logger.isEnabledFor(logging.DEBUG):
action_str = MessageToDict(action)
logger.debug(f"Received action event from actionV2: {action_str}")
try:
await self.action_handler.handle_action(action)
logger.debug(f"Successfully handled action {action_str}")
except Exception as e:
logger.exception(f"Failed to do action: {action}", e)
await send_queue.put(
proto.ActionResponse(
token=self.client.token,
id=action.id,
status=proto.ActionResponse.FAILED,
result=self.generic_action_failure(),
)
)
def init_action_result(self):
return json.dumps({}).encode()
def generic_action_failure(self):
return json.dumps({"error": "Action failed"}).encode()