in pkg/component/component.go [529:713]
func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level, headers HeadersProvider) (map[string]outputI, error) {
const (
outputsKey = "outputs"
enabledKey = "enabled"
inputsKey = "inputs"
typeKey = "type"
idKey = "id"
useOutputKey = "use_output"
runtimeManagerKey = "_runtime_experimental"
)
// intermediate structure for output to input mapping (this structure allows different input types per output)
outputsMap := make(map[string]outputI)
// map the outputs first
outputsRaw, ok := policy[outputsKey]
if !ok {
// no outputs defined; no components then
return nil, nil
}
outputs, ok := outputsRaw.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid 'outputs', expected a map not a %T", outputsRaw)
}
for name, outputRaw := range outputs {
output, ok := outputRaw.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid 'outputs.%s', expected a map not a %T", name, outputRaw)
}
typeRaw, ok := output[typeKey]
if !ok {
return nil, fmt.Errorf("invalid 'outputs.%s', 'type' missing", name)
}
t, ok := typeRaw.(string)
if !ok {
return nil, fmt.Errorf("invalid 'outputs.%s.type', expected a string not a %T", name, typeRaw)
}
enabled := true
if enabledRaw, ok := output[enabledKey]; ok {
enabledVal, ok := enabledRaw.(bool)
if !ok {
return nil, fmt.Errorf("invalid 'outputs.%s.enabled', expected a bool not a %T", name, enabledRaw)
}
enabled = enabledVal
delete(output, enabledKey)
}
logLevel, err := getLogLevel(output, ll)
if err != nil {
return nil, fmt.Errorf("invalid 'outputs.%s.log_level', %w", name, err)
}
// inject headers configured during enroll
if t == elasticsearchType && headers != nil {
// can be nil when called from install/uninstall
if agentHeaders := headers.Headers(); len(agentHeaders) > 0 {
headers := make(map[string]interface{})
if existingHeadersRaw, found := output[headersKey]; found {
existingHeaders, ok := existingHeadersRaw.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid 'outputs.headers', expected a map not a %T", outputRaw)
}
headers = existingHeaders
}
for headerName, headerVal := range agentHeaders {
headers[headerName] = headerVal
}
output[headersKey] = headers
}
}
outputsMap[name] = outputI{
name: name,
enabled: enabled,
logLevel: logLevel,
outputType: t,
config: output,
inputs: make(map[string][]inputI),
}
}
// map the inputs to the outputs
inputsRaw, ok := policy[inputsKey]
if !ok {
// no inputs; no components then
return nil, nil
}
inputs, ok := inputsRaw.([]interface{})
if !ok {
return nil, fmt.Errorf("invalid 'inputs', expected an array not a %T", inputsRaw)
}
for idx, inputRaw := range inputs {
input, ok := inputRaw.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d', expected a map not a %T", idx, inputRaw)
}
typeRaw, ok := input[typeKey]
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d', 'type' missing", idx)
}
t, ok := typeRaw.(string)
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d.type', expected a string not a %T", idx, typeRaw)
}
if realInputType, found := aliasMapping[t]; found {
t = realInputType
// by replacing type we make sure component understands aliasing
input[typeKey] = t
}
idRaw, ok := input[idKey]
if !ok {
// no ID; fallback to type
idRaw = t
}
id, ok := idRaw.(string)
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d.id', expected a string not a %T", idx, idRaw)
}
if hasDuplicate(outputsMap, id) {
return nil, fmt.Errorf("invalid 'inputs.%d.id', has a duplicate id %q. Please add a unique value for the 'id' key to each input in the agent policy", idx, id)
}
outputName := "default"
if outputRaw, ok := input[useOutputKey]; ok {
outputNameVal, ok := outputRaw.(string)
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d.use_output', expected a string not a %T", idx, outputRaw)
}
outputName = outputNameVal
delete(input, useOutputKey)
}
output, ok := outputsMap[outputName]
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d.use_output', references an unknown output '%s'", idx, outputName)
}
enabled := true
if enabledRaw, ok := input[enabledKey]; ok {
enabledVal, ok := enabledRaw.(bool)
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d.enabled', expected a bool not a %T", idx, enabledRaw)
}
enabled = enabledVal
delete(input, enabledKey)
}
logLevel, err := getLogLevel(input, ll)
if err != nil {
return nil, fmt.Errorf("invalid 'inputs.%d.log_level', %w", idx, err)
}
runtimeManager := DefaultRuntimeManager
// determine the runtime manager for the input
if runtimeManagerRaw, ok := input[runtimeManagerKey]; ok {
runtimeManagerStr, ok := runtimeManagerRaw.(string)
if !ok {
return nil, fmt.Errorf("invalid 'inputs.%d.runtime', expected a string, not a %T", idx, runtimeManagerRaw)
}
runtimeManagerVal := RuntimeManager(runtimeManagerStr)
switch runtimeManagerVal {
case OtelRuntimeManager, ProcessRuntimeManager:
default:
return nil, fmt.Errorf("invalid 'inputs.%d.runtime', valid values are: %s, %s", idx, OtelRuntimeManager, ProcessRuntimeManager)
}
runtimeManager = runtimeManagerVal
delete(input, runtimeManagerKey)
}
// Inject the top level fleet policy revision into each input configuration. This
// allows individual inputs (like endpoint) to detect policy changes more easily.
injectInputPolicyID(policy, input)
output.inputs[t] = append(output.inputs[t], inputI{
idx: idx,
id: id,
enabled: enabled,
logLevel: logLevel,
inputType: t,
config: input,
runtimeManager: runtimeManager,
})
}
if len(outputsMap) == 0 {
return nil, nil
}
return outputsMap, nil
}