function to_emf()

in apps/monitoring-lua-filter/aws-for-fluent-bit-monitoring-metrics.lua [556:646]


function to_emf(tag, timestamp, record)

  local new_record = record

  
  load_environment_configuration()

  
  local parsed_timestamp = math.floor(timestamp * 1000)
  local ret, parsed_record=pcall(function() return json.decode(record["exec"]) end)
  if not ret then
    return -1, nil, nil
  end

  local emf_records = {}

  
  local emf_data_properties = record
  emf_data_properties["exec"] = nil

  
  for plugin_type, plugins in pairs(parsed_record["flb_metrics"]) do
    for alias, metrics in pairs(plugins) do

      
      local metrics_manifest = {}
      emf = {
        _aws = {
          Timestamp = parsed_timestamp,
          CloudWatchMetrics = {{
            Namespace = CONFIGURATION["namespace"],
            Dimensions = {},
            Metrics = {}
          }}
        }
      }

      
      add_properties(emf, emf_data_properties)
      add_properties(emf, (CONFIGURATION["properties"] or {}))

      
      add_dimensionsets(emf, CONFIGURATION["dimensions"])
      add_properties(emf, {
        Alias = alias,
        PluginType = plugin_type
      })

      
      add_metrics(parsed_timestamp, alias, emf, "", metrics, metrics_manifest)

      
      if plugin_type == "input" then
        storage_chunks = parsed_record["storage"]["input_chunks"][alias]["chunks"]
        storage_chunks["busy_size"] = convert_size_to_bytes(storage_chunks["busy_size"])
        add_metrics(parsed_timestamp, alias, emf, "disk_chunks_", storage_chunks,
                    metrics_manifest)
        
        
        local memory_metrics = parsed_record["storage"]["input_chunks"][alias]["status"]
        memory_metrics["memory_overlimit"] = memory_metrics["overlimit"] and 1 or 0
        memory_metrics["memory_size"] = convert_size_to_bytes(memory_metrics["mem_size"])
        memory_metrics["memory_limit"] = convert_size_to_bytes(
                                                             memory_metrics["mem_limit"])
        memory_metrics["overlimit"] = nil
        memory_metrics["mem_size"] = nil
        memory_metrics["mem_limit"] = nil
        add_metrics(parsed_timestamp, alias, emf, "", memory_metrics, metrics_manifest)
      end

      
      for metric_property, metric_configuration in pairs(CONFIGURATION["metrics"]) do
        if (metrics_manifest[metric_property] == nil and
            metric_configuration["remove"] ~= true) then
          
          if emf[metric_property] ~= nil then
            
            add_metrics(parsed_timestamp, alias, emf, "", {
                [metric_property]=emf[metric_property]
              }, metrics_manifest)
          end
        end
      end

      
      table.insert(emf_records, emf)
    end
  end

  return 1, timestamp, emf_records
end