azext_iot/monitor/handlers/central_handler.py (128 lines of code) (raw):
# coding=utf-8
# --------------------------------------------------------------------------------------------
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------
import csv
import sys
from typing import List
from knack.log import get_logger
from azext_iot.monitor.utility import stop_monitor, get_loop
from azext_iot.central.providers import (
CentralDeviceProvider,
CentralDeviceTemplateProvider,
)
from azext_iot.monitor.handlers import CommonHandler
from azext_iot.monitor.models.arguments import CentralHandlerArguments
from azext_iot.monitor.parsers.central_parser import CentralParser
from azext_iot.monitor.parsers.issue import Issue
logger = get_logger(__name__)
class CentralHandler(CommonHandler):
def __init__(
self,
central_device_provider: CentralDeviceProvider,
central_template_provider: CentralDeviceTemplateProvider,
central_handler_args: CentralHandlerArguments,
central_dns_suffix: str,
):
super(CentralHandler, self).__init__(
common_handler_args=central_handler_args.common_handler_args
)
self._central_device_provider = central_device_provider
self._central_template_provider = central_template_provider
self._central_handler_args = central_handler_args
self._messages = []
self._issues: List[Issue] = []
self._central_dns_suffix = central_dns_suffix
if self._central_handler_args.duration:
loop = get_loop()
# the monitor takes a few seconds to start
loop.call_later(
self._central_handler_args.duration + 5, self._quit_duration_exceeded
)
def validate_message(self, message):
parser = CentralParser(
message=message,
common_parser_args=self._common_handler_args.common_parser_args,
central_device_provider=self._central_device_provider,
central_template_provider=self._central_template_provider,
central_dns_suffix=self._central_dns_suffix,
)
if not self._should_process_device(parser.device_id):
return
if not self._should_process_module(parser.module_id):
return
parsed_message = parser.parse_message()
self._messages.append(parsed_message)
n_messages = len(self._messages)
issues = parser.issues_handler.get_issues_with_minimum_severity(
self._central_handler_args.minimum_severity
)
self._issues.extend(issues)
self._print_progress_update(n_messages)
if self._central_handler_args.style == "scroll" and issues:
[issue.log() for issue in issues]
if (
self._central_handler_args.max_messages
and n_messages >= self._central_handler_args.max_messages
):
self._quit_messages_exceeded()
def generate_startup_string(self, name: str):
device_id = self._central_handler_args.common_handler_args.device_id
duration = self._central_handler_args.duration
max_messages = self._central_handler_args.max_messages
module_id = self._central_handler_args.common_handler_args.module_id
filter_text = ""
if device_id:
filter_text = ".\nFiltering on device: {}".format(device_id)
if module_id:
logger.warning("Module filtering is applicable only for edge devices.")
filter_text += ".\nFiltering on module: {}".format(module_id)
exit_text = ""
if duration and max_messages:
exit_text = ".\nExiting after {} second(s), or {} message(s) have been parsed (whichever happens first).".format(
duration,
max_messages,
)
elif duration:
exit_text = ".\nExiting after {} second(s).".format(duration)
elif max_messages:
exit_text = ".\nExiting after parsing {} message(s).".format(max_messages)
result = "{} telemetry{}{}".format(name, filter_text, exit_text)
return result
def _print_progress_update(self, n_messages: int):
if (n_messages % self._central_handler_args.progress_interval) == 0:
print("Processed {} messages...".format(n_messages), flush=True)
def _print_results(self):
n_messages = len(self._messages)
if not self._issues:
print("No errors detected after parsing {} message(s).".format(n_messages))
return
if self._central_handler_args.style.lower() == "scroll":
return
print("Processing and displaying results.")
issues = [issue.json_repr() for issue in self._issues]
if self._central_handler_args.style.lower() == "json":
self._handle_json_summary(issues)
return
if self._central_handler_args.style.lower() == "csv":
self._handle_csv_summary(issues)
return
def _handle_json_summary(self, issues: List[Issue]):
import json
output = json.dumps(issues, indent=4)
print(output)
def _handle_csv_summary(self, issues: List[Issue]):
fieldnames = ["severity", "details", "message", "device_id", "template_id"]
writer = csv.DictWriter(sys.stdout, fieldnames=fieldnames)
writer.writeheader()
for issue in issues:
writer.writerow(issue)
def _quit_messages_exceeded(self):
message = "Successfully parsed {} message(s).".format(
self._central_handler_args.max_messages
)
print(message, flush=True)
self._print_results()
stop_monitor()
def _quit_duration_exceeded(self):
message = "{} second(s) have elapsed.".format(
self._central_handler_args.duration
)
print(message, flush=True)
self._print_results()
stop_monitor()