in apps/monitoring-lua-filter/aws-for-fluent-bit-monitoring-metrics.lua [556:646]
function to_emf(tag, timestamp, record)
local new_record = record
load_environment_configuration()
local parsed_timestamp = math.floor(timestamp * 1000)
local ret, parsed_record=pcall(function() return json.decode(record["exec"]) end)
if not ret then
return -1, nil, nil
end
local emf_records = {}
local emf_data_properties = record
emf_data_properties["exec"] = nil
for plugin_type, plugins in pairs(parsed_record["flb_metrics"]) do
for alias, metrics in pairs(plugins) do
local metrics_manifest = {}
emf = {
_aws = {
Timestamp = parsed_timestamp,
CloudWatchMetrics = {{
Namespace = CONFIGURATION["namespace"],
Dimensions = {},
Metrics = {}
}}
}
}
add_properties(emf, emf_data_properties)
add_properties(emf, (CONFIGURATION["properties"] or {}))
add_dimensionsets(emf, CONFIGURATION["dimensions"])
add_properties(emf, {
Alias = alias,
PluginType = plugin_type
})
add_metrics(parsed_timestamp, alias, emf, "", metrics, metrics_manifest)
if plugin_type == "input" then
storage_chunks = parsed_record["storage"]["input_chunks"][alias]["chunks"]
storage_chunks["busy_size"] = convert_size_to_bytes(storage_chunks["busy_size"])
add_metrics(parsed_timestamp, alias, emf, "disk_chunks_", storage_chunks,
metrics_manifest)
local memory_metrics = parsed_record["storage"]["input_chunks"][alias]["status"]
memory_metrics["memory_overlimit"] = memory_metrics["overlimit"] and 1 or 0
memory_metrics["memory_size"] = convert_size_to_bytes(memory_metrics["mem_size"])
memory_metrics["memory_limit"] = convert_size_to_bytes(
memory_metrics["mem_limit"])
memory_metrics["overlimit"] = nil
memory_metrics["mem_size"] = nil
memory_metrics["mem_limit"] = nil
add_metrics(parsed_timestamp, alias, emf, "", memory_metrics, metrics_manifest)
end
for metric_property, metric_configuration in pairs(CONFIGURATION["metrics"]) do
if (metrics_manifest[metric_property] == nil and
metric_configuration["remove"] ~= true) then
if emf[metric_property] ~= nil then
add_metrics(parsed_timestamp, alias, emf, "", {
[metric_property]=emf[metric_property]
}, metrics_manifest)
end
end
end
table.insert(emf_records, emf)
end
end
return 1, timestamp, emf_records
end