in src/modules/send_telemetry_data.py [0:0]
def send_telemetry_data(self) -> None:
"""
Sends telemetry data to the specified destination.
"""
if self.module_params["telemetry_data_destination"] in [
TelemetryDataDestination.KUSTO.value,
TelemetryDataDestination.LOG_ANALYTICS.value,
]:
self.log(logging.INFO, "Validating parameters for telemetry data destination ")
if not self.validate_params():
self.result["status"] = (
"Invalid parameters for telemetry data destination. Data will not be sent."
)
return
self.log(
logging.INFO,
f"Sending telemetry data to {self.module_params['telemetry_data_destination']}",
)
try:
method_name = (
"send_telemetry_data_to_"
+ f"{self.module_params['telemetry_data_destination']}"
)
getattr(self, method_name)(json.dumps(self.result["telemetry_data"]))
self.result[
"message"
] += f"Telemetry data sent to {self.module_params['telemetry_data_destination']}. "
self.result.update(
{
"status": TestStatus.SUCCESS.value,
"data_sent": True,
}
)
except Exception as ex:
self.handle_error(ex)
else:
self.log(
logging.ERROR,
"Invalid telemetry data destination specified "
+ f"{self.module_params['telemetry_data_destination']}",
)