in src/modules/send_telemetry_data.py [0:0]
def send_telemetry_data_to_azuredataexplorer(self, telemetry_json_data: str) -> Any:
"""
Sends telemetry data to Azure Data Explorer.
:param telemetry_json_data: The JSON data to be sent to Azure Data Explorer.
:type telemetry_json_data: str
:return: The response from the Kusto API.
:rtype: Any
"""
import pandas as pd
telemetry_json_data = json.loads(telemetry_json_data)
data_frame = pd.DataFrame(
[telemetry_json_data.values()], columns=telemetry_json_data.keys()
)
ingestion_properties = IngestionProperties(
database=self.module_params["adx_database_name"],
table=self.module_params["telemetry_table_name"],
data_format=DataFormat.JSON,
report_level=ReportLevel.FailuresAndSuccesses,
)
kcsb = KustoConnectionStringBuilder.with_aad_managed_service_identity_authentication(
connection_string=self.module_params["adx_cluster_fqdn"],
client_id=self.module_params["adx_client_id"],
)
client = QueuedIngestClient(kcsb)
response = client.ingest_from_dataframe(data_frame, ingestion_properties)
self.log(
logging.INFO,
f"Response from Kusto: {response}",
)
return response