in LAD-AMA-Common/telegraf_utils/telegraf_config_handler.py [0:0]
def parse_config(data, me_url, mdsd_url, is_lad, az_resource_id, subscription_id, resource_group, region, virtual_machine_name):
"""
Main parser method to convert Metrics config from extension configuration to telegraf configuration
:param data: Parsed Metrics Configuration from which telegraf config is created
:param me_url: The url to which telegraf will send metrics to for MetricsExtension
:param mdsd_url: The url to which telegraf will send metrics to for MDSD
:param is_lad: Boolean value for whether the extension is Lad or not (AMA)
:param az_resource_id: Azure Resource ID value for the VM
:param subscription_id: Azure Subscription ID value for the VM
:param resource_group: Azure Resource Group value for the VM
:param region: Azure Region value for the VM
:param virtual_machine_name: Azure Virtual Machine Name value (Only in the case for VMSS) for the VM
"""
storage_namepass_list = []
storage_namepass_str = ""
vmi_rate_counters_list = ["LogicalDisk\\BytesPerSecond", "LogicalDisk\\ReadBytesPerSecond", "LogicalDisk\\ReadsPerSecond", "LogicalDisk\\WriteBytesPerSecond", "LogicalDisk\\WritesPerSecond", "LogicalDisk\\TransfersPerSecond", "Network\\ReadBytesPerSecond", "Network\\WriteBytesPerSecond"]
MetricsExtensionNamepsace = metrics_constants.metrics_extension_namespace
has_mdsd_output = False
has_me_output = False
if len(data) == 0:
raise Exception("Empty config data received.")
if me_url is None or mdsd_url is None:
raise Exception("No url provided for Influxdb output plugin to ME, AMA.")
telegraf_json = {}
counterConfigIdMap = {}
for item in data:
sink = item["sink"]
if "mdsd" in sink:
has_mdsd_output = True
if "me" in sink:
has_me_output = True
counter = item["displayName"]
if counter in name_map:
plugin = name_map[counter]["plugin"]
is_vmi = plugin.endswith("_vmi")
telegraf_plugin = plugin
if is_vmi:
splitResult = plugin.split('_')
telegraf_plugin = splitResult[0]
if counter not in counterConfigIdMap:
counterConfigIdMap[counter] = []
configIds = counterConfigIdMap[counter]
configurationIds = item["configurationId"]
for configId in configurationIds:
if configId not in configIds:
configIds.append(configId)
omiclass = ""
if is_lad:
omiclass = counter.split("->")[0]
else:
omiclass = name_map[counter]["module"]
if omiclass not in telegraf_json:
telegraf_json[omiclass] = {}
if plugin not in telegraf_json[omiclass]:
telegraf_json[omiclass][plugin] = {}
telegraf_json[omiclass][plugin][name_map[counter]["field"]] = {}
if is_lad:
telegraf_json[omiclass][plugin][name_map[counter]["field"]]["displayName"] = counter.split("->")[1]
else:
telegraf_json[omiclass][plugin][name_map[counter]["field"]]["displayName"] = counter
telegraf_json[omiclass][plugin][name_map[counter]["field"]]["interval"] = item["interval"]
if is_lad:
telegraf_json[omiclass][plugin][name_map[counter]["field"]]["ladtablekey"] = name_map[counter]["ladtablekey"]
if "op" in name_map[counter]:
telegraf_json[omiclass][plugin][name_map[counter]["field"]]["op"] = name_map[counter]["op"]
"""
Sample converted telegraf conf dict -
"network": {
"net": {
"bytes_total": {"interval": "15s","displayName": "Network total bytes","ladtablekey": "/builtin/network/bytestotal"},
"drop_total": {"interval": "15s","displayName": "Network collisions","ladtablekey": "/builtin/network/totalcollisions"},
"err_in": {"interval": "15s","displayName": "Packets received errors","ladtablekey": "/builtin/network/totalrxerrors"},
"packets_sent": {"interval": "15s","displayName": "Packets sent","ladtablekey": "/builtin/network/packetstransmitted"},
}
},
"filesystem": {
"disk": {
"used_percent": {"interval": "15s","displayName": "Filesystem % used space","ladtablekey": "/builtin/filesystem/percentusedspace"},
"used": {"interval": "15s","displayName": "Filesystem used space","ladtablekey": "/builtin/filesystem/usedspace"},
"free": {"interval": "15s","displayName": "Filesystem free space","ladtablekey": "/builtin/filesystem/freespace"},
"inodes_free_percent": {"interval": "15s","displayName": "Filesystem % free inodes","ladtablekey": "/builtin/filesystem/percentfreeinodes"},
},
"diskio": {
"writes_filesystem": {"interval": "15s","displayName": "Filesystem writes/sec","ladtablekey": "/builtin/filesystem/writespersecond","op": "rate"},
"total_transfers_filesystem": {"interval": "15s","displayName": "Filesystem transfers/sec","ladtablekey": "/builtin/filesystem/transferspersecond","op": "rate"},
"reads_filesystem": {"interval": "15s","displayName": "Filesystem reads/sec","ladtablekey": "/builtin/filesystem/readspersecond","op": "rate"},
}
},
"""
if len(telegraf_json) == 0:
raise Exception("Unable to parse telegraf config into intermediate dictionary.")
excess_diskio_plugin_list_lad = ["total_transfers_filesystem", "read_bytes_filesystem", "total_bytes_filesystem", "write_bytes_filesystem", "reads_filesystem", "writes_filesystem"]
excess_diskio_field_drop_list_str = ""
int_file = {"filename":"intermediate.json", "data": json.dumps(telegraf_json)}
output = []
output.append(int_file)
for omiclass in telegraf_json:
input_str = ""
ama_rename_str = ""
metricsext_rename_str = ""
lad_specific_rename_str = ""
rate_specific_aggregator_str = ""
aggregator_str = ""
for plugin in telegraf_json[omiclass]:
config_file = {"filename" : omiclass+".conf"}
# Arbitrary max value for finding min
min_interval = "999999999s"
is_vmi = plugin.endswith("_vmi")
is_vmi_rate_counter = False
for field in telegraf_json[omiclass][plugin]:
if not is_vmi_rate_counter:
is_vmi_rate_counter = telegraf_json[omiclass][plugin][field]["displayName"] in vmi_rate_counters_list
# if is_vmi_rate_counter:
# min_interval = "1s"
if is_vmi or is_vmi_rate_counter:
splitResult = plugin.split('_')
telegraf_plugin = splitResult[0]
input_str += "[[inputs." + telegraf_plugin + "]]\n"
# plugin = plugin[:-4]
else:
input_str += "[[inputs." + plugin + "]]\n"
# input_str += " "*2 + "name_override = \"" + omiclass + "\"\n"
# If it's a lad config then add the namepass fields for sending totals to storage
# always skip lad plugin names as they should be dropped from ME
lad_plugin_name = plugin + "_total"
if lad_plugin_name not in storage_namepass_list:
storage_namepass_list.append(lad_plugin_name)
if is_lad:
lad_specific_rename_str += "\n[[processors.rename]]\n"
lad_specific_rename_str += " "*2 + "namepass = [\"" + lad_plugin_name + "\"]\n"
elif is_vmi or is_vmi_rate_counter:
if plugin not in storage_namepass_list:
storage_namepass_list.append(plugin + "_mdsd")
else:
ama_plugin_name = plugin + "_mdsd_la_perf"
ama_rename_str += "\n[[processors.rename]]\n"
ama_rename_str += " "*2 + "namepass = [\"" + ama_plugin_name + "\"]\n"
if ama_plugin_name not in storage_namepass_list:
storage_namepass_list.append(ama_plugin_name)
namespace = MetricsExtensionNamepsace
if is_vmi or is_vmi_rate_counter:
namespace = "insights.virtualmachine"
if is_vmi_rate_counter:
# Adding "_rated" as a substring for vmi rate metrics to avoid renaming collisions
plugin_name = plugin + "_rated"
else:
plugin_name = plugin
metricsext_rename_str += "\n[[processors.rename]]\n"
metricsext_rename_str += " "*2 + "namepass = [\"" + plugin_name + "\"]\n"
metricsext_rename_str += "\n" + " "*2 + "[[processors.rename.replace]]\n"
metricsext_rename_str += " "*4 + "measurement = \"" + plugin_name + "\"\n"
metricsext_rename_str += " "*4 + "dest = \"" + namespace + "\"\n"
fields = ""
ops_fields = ""
non_ops_fields = ""
non_rate_aggregate = False
ops = ""
rate_aggregate = False
for field in telegraf_json[omiclass][plugin]:
fields += "\"" + field + "\", "
if is_vmi or is_vmi_rate_counter :
if "MB" in field:
fields += "\"" + field.replace('MB','Bytes') + "\", "
#Use the shortest interval time for the whole plugin
new_interval = telegraf_json[omiclass][plugin][field]["interval"]
if int(new_interval[:-1]) < int(min_interval[:-1]):
min_interval = new_interval
#compute values for aggregator options
if "op" in telegraf_json[omiclass][plugin][field]:
if telegraf_json[omiclass][plugin][field]["op"] == "rate":
rate_aggregate = True
ops = "\"rate\", \"rate_min\", \"rate_max\", \"rate_count\", \"rate_sum\", \"rate_mean\""
if is_lad:
ops_fields += "\"" + telegraf_json[omiclass][plugin][field]["ladtablekey"] + "\", "
else:
ops_fields += "\"" + telegraf_json[omiclass][plugin][field]["displayName"] + "\", "
else:
non_rate_aggregate = True
if is_lad:
non_ops_fields += "\"" + telegraf_json[omiclass][plugin][field]["ladtablekey"] + "\", "
else:
non_ops_fields += "\"" + telegraf_json[omiclass][plugin][field]["displayName"] + "\", "
#Add respective rename processor plugin based on the displayname
if is_lad:
lad_specific_rename_str += "\n" + " "*2 + "[[processors.rename.replace]]\n"
lad_specific_rename_str += " "*4 + "field = \"" + field + "\"\n"
lad_specific_rename_str += " "*4 + "dest = \"" + telegraf_json[omiclass][plugin][field]["ladtablekey"] + "\"\n"
elif not is_vmi and not is_vmi_rate_counter:
# no rename of fields as they are set in telegraf directly
ama_rename_str += "\n" + " "*2 + "[[processors.rename.replace]]\n"
ama_rename_str += " "*4 + "field = \"" + field + "\"\n"
ama_rename_str += " "*4 + "dest = \"" + telegraf_json[omiclass][plugin][field]["displayName"] + "\"\n"
# Avoid adding the rename logic for the redundant *_filesystem fields for diskio which were added specifically for OMI parity in LAD
# Had to re-use these six fields to avoid renaming issues since both Filesystem and Disk in OMI-LAD use them
# AMA only uses them once so only need this for LAD
if is_lad:
if field in excess_diskio_plugin_list_lad:
excess_diskio_field_drop_list_str += "\"" + field + "\", "
else:
metricsext_rename_str += "\n" + " "*2 + "[[processors.rename.replace]]\n"
metricsext_rename_str += " "*4 + "field = \"" + field + "\"\n"
metricsext_rename_str += " "*4 + "dest = \"" + plugin + "/" + field + "\"\n"
elif not is_vmi and not is_vmi_rate_counter:
# no rename of fields as they are set in telegraf directly
metricsext_rename_str += "\n" + " "*2 + "[[processors.rename.replace]]\n"
metricsext_rename_str += " "*4 + "field = \"" + field + "\"\n"
metricsext_rename_str += " "*4 + "dest = \"" + plugin + "/" + field + "\"\n"
#Add respective operations for aggregators
# if is_lad:
if not is_vmi and not is_vmi_rate_counter:
suffix = ""
if is_lad:
suffix = "_total\"]\n"
else:
suffix = "_mdsd_la_perf\"]\n"
if rate_aggregate:
aggregator_str += "[[aggregators.basicstats]]\n"
aggregator_str += " "*2 + "namepass = [\"" + plugin + suffix
aggregator_str += " "*2 + "period = \"" + min_interval + "\"\n"
aggregator_str += " "*2 + "drop_original = true\n"
aggregator_str += " "*2 + "fieldpass = [" + ops_fields[:-2] + "]\n" #-2 to strip the last comma and space
aggregator_str += " "*2 + "stats = [" + ops + "]\n"
if non_rate_aggregate:
aggregator_str += "[[aggregators.basicstats]]\n"
aggregator_str += " "*2 + "namepass = [\"" + plugin + suffix
aggregator_str += " "*2 + "period = \"" + min_interval + "\"\n"
aggregator_str += " "*2 + "drop_original = true\n"
aggregator_str += " "*2 + "fieldpass = [" + non_ops_fields[:-2] + "]\n" #-2 to strip the last comma and space
aggregator_str += " "*2 + "stats = [\"mean\", \"max\", \"min\", \"sum\", \"count\"]\n\n"
elif is_vmi_rate_counter:
# Aggregator config for MDSD
aggregator_str += "[[aggregators.basicstats]]\n"
aggregator_str += " "*2 + "namepass = [\"" + plugin + "_mdsd\"]\n"
aggregator_str += " "*2 + "period = \"" + min_interval + "\"\n"
aggregator_str += " "*2 + "drop_original = true\n"
aggregator_str += " "*2 + "fieldpass = [" + ops_fields[:-2].replace('\\','\\\\\\\\') + "]\n" #-2 to strip the last comma and space
aggregator_str += " "*2 + "stats = [" + ops + "]\n\n"
# Aggregator config for ME
aggregator_str += "[[aggregators.mdmratemetrics]]\n"
aggregator_str += " "*2 + "namepass = [\"" + plugin + "\"]\n"
aggregator_str += " "*2 + "period = \"" + min_interval + "\"\n"
aggregator_str += " "*2 + "drop_original = true\n"
aggregator_str += " "*2 + "fieldpass = [" + ops_fields[:-2].replace('\\','\\\\\\\\') + "]\n" #-2 to strip the last comma and space
aggregator_str += " "*2 + "stats = [\"rate\"]\n\n"
if is_lad:
lad_specific_rename_str += "\n"
elif not is_vmi and not is_vmi_rate_counter:
# no rename of fields as they are set in telegraf directly
ama_rename_str += "\n"
# Using fields[: -2] here to get rid of the last ", " at the end of the string
input_str += " "*2 + "fieldpass = ["+fields[:-2]+"]\n"
if plugin == "cpu":
input_str += " "*2 + "report_active = true\n"
# Rate interval needs to be atleast twice the regular sourcing interval for aggregation to work.
# Since we want all the VMI metrics to be sent at the same interval as selected by the customer, To overcome the twice the min internval limitation,
# We are sourcing the VMI metrics that need to be aggregated at half the selected frequency
rated_min_interval = str(int(min_interval[:-1]) // 2) + "s"
input_str += " "*2 + "interval = " + "\"" + rated_min_interval + "\"\n\n"
telegraf_plugin = plugin
if is_vmi:
splitResult = plugin.split('_')
telegraf_plugin = splitResult[0]
if not is_lad:
configIds = counterConfigIdMap[telegraf_json[omiclass][plugin][field]["displayName"]]
for configId in configIds:
input_str += "\n"
input_str += " "*2 + "[inputs." + telegraf_plugin + ".tags]\n"
input_str += " "*4 + "configurationId=\"" + configId + "\"\n\n"
break
config_file["data"] = input_str + "\n" + metricsext_rename_str + "\n" + ama_rename_str + "\n" + lad_specific_rename_str + "\n" +aggregator_str
output.append(config_file)
config_file = {}
"""
Sample telegraf TOML file output
[[inputs.net]]
fieldpass = ["err_out", "packets_sent", "err_in", "bytes_sent", "packets_recv"]
interval = "5s"
[[inputs.cpu]]
fieldpass = ["usage_nice", "usage_user", "usage_idle", "usage_active", "usage_irq", "usage_system"]
interval = "15s"
[[processors.rename]]
[[processors.rename.replace]]
measurement = "net"
dest = "network"
[[processors.rename.replace]]
field = "err_out"
dest = "Packets sent errors"
[[aggregators.basicstats]]
period = "30s"
drop_original = false
fieldpass = ["Disk reads", "Disk writes", "Filesystem write bytes/sec"]
stats = ["rate"]
"""
## Get the log folder directory from HandlerEnvironment.json and use that for the telegraf default logging
logFolder, _ = get_handler_vars()
for measurement in storage_namepass_list:
storage_namepass_str += "\"" + measurement + "\", "
# Telegraf basic agent and output config
agentconf = "[agent]\n"
agentconf += " interval = \"10s\"\n"
agentconf += " round_interval = true\n"
agentconf += " metric_batch_size = 1000\n"
agentconf += " metric_buffer_limit = 1000000\n"
agentconf += " collection_jitter = \"0s\"\n"
agentconf += " flush_interval = \"10s\"\n"
agentconf += " flush_jitter = \"0s\"\n"
agentconf += " logtarget = \"file\"\n"
agentconf += " quiet = true\n"
agentconf += " logfile = \"" + logFolder + "/telegraf.log\"\n"
agentconf += " logfile_rotation_max_size = \"100MB\"\n"
agentconf += " logfile_rotation_max_archives = 5\n"
agentconf += "\n# Configuration for adding gloabl tags\n"
agentconf += "[global_tags]\n"
if is_lad:
agentconf += " DeploymentId= \"${DeploymentId}\"\n"
agentconf += " \"microsoft.subscriptionId\"= \"" + subscription_id + "\"\n"
agentconf += " \"microsoft.resourceGroupName\"= \"" + resource_group + "\"\n"
agentconf += " \"microsoft.regionName\"= \"" + region + "\"\n"
agentconf += " \"microsoft.resourceId\"= \"" + az_resource_id + "\"\n"
if virtual_machine_name != "":
agentconf += " \"VMInstanceId\"= \"" + virtual_machine_name + "\"\n"
if has_me_output or is_lad:
agentconf += "\n# Configuration for sending metrics to MetricsExtension\n"
# for AMA we use Sockets to write to ME but for LAD we continue using UDP
# because we support a lot more counters in AMA path and ME is not able to handle it with UDP
if is_lad:
agentconf += "[[outputs.influxdb]]\n"
else:
agentconf += "[[outputs.socket_writer]]\n"
agentconf += " namedrop = [" + storage_namepass_str[:-2] + "]\n"
if is_lad:
agentconf += " fielddrop = [" + excess_diskio_field_drop_list_str[:-2] + "]\n"
if is_lad:
agentconf += " urls = [\"" + str(me_url) + "\"]\n\n"
agentconf += " udp_payload = \"2048B\"\n\n"
else:
agentconf += " data_format = \"influx\"\n"
agentconf += " address = \"" + str(me_url) + "\"\n\n"
if has_mdsd_output:
agentconf += "\n# Configuration for sending metrics to MDSD\n"
agentconf += "[[outputs.socket_writer]]\n"
agentconf += " namepass = [" + storage_namepass_str[:-2] + "]\n"
agentconf += " data_format = \"influx\"\n"
agentconf += " address = \"" + str(mdsd_url) + "\"\n\n"
agentconf += "\n# Configuration for outputing metrics to file. Uncomment to enable.\n"
agentconf += "#[[outputs.file]]\n"
agentconf += "# files = [\"./metrics_to_file.out\"]\n\n"
agent_file = {"filename":"telegraf.conf", "data": agentconf}
output.append(agent_file)
return output, storage_namepass_list