func toIntermediate()

in pkg/component/component.go [529:713]


func toIntermediate(policy map[string]interface{}, aliasMapping map[string]string, ll logp.Level, headers HeadersProvider) (map[string]outputI, error) {
	const (
		outputsKey        = "outputs"
		enabledKey        = "enabled"
		inputsKey         = "inputs"
		typeKey           = "type"
		idKey             = "id"
		useOutputKey      = "use_output"
		runtimeManagerKey = "_runtime_experimental"
	)

	// intermediate structure for output to input mapping (this structure allows different input types per output)
	outputsMap := make(map[string]outputI)

	// map the outputs first
	outputsRaw, ok := policy[outputsKey]
	if !ok {
		// no outputs defined; no components then
		return nil, nil
	}
	outputs, ok := outputsRaw.(map[string]interface{})
	if !ok {
		return nil, fmt.Errorf("invalid 'outputs', expected a map not a %T", outputsRaw)
	}
	for name, outputRaw := range outputs {
		output, ok := outputRaw.(map[string]interface{})
		if !ok {
			return nil, fmt.Errorf("invalid 'outputs.%s', expected a map not a %T", name, outputRaw)
		}
		typeRaw, ok := output[typeKey]
		if !ok {
			return nil, fmt.Errorf("invalid 'outputs.%s', 'type' missing", name)
		}
		t, ok := typeRaw.(string)
		if !ok {
			return nil, fmt.Errorf("invalid 'outputs.%s.type', expected a string not a %T", name, typeRaw)
		}
		enabled := true
		if enabledRaw, ok := output[enabledKey]; ok {
			enabledVal, ok := enabledRaw.(bool)
			if !ok {
				return nil, fmt.Errorf("invalid 'outputs.%s.enabled', expected a bool not a %T", name, enabledRaw)
			}
			enabled = enabledVal
			delete(output, enabledKey)
		}
		logLevel, err := getLogLevel(output, ll)
		if err != nil {
			return nil, fmt.Errorf("invalid 'outputs.%s.log_level', %w", name, err)
		}

		// inject headers configured during enroll
		if t == elasticsearchType && headers != nil {
			// can be nil when called from install/uninstall
			if agentHeaders := headers.Headers(); len(agentHeaders) > 0 {
				headers := make(map[string]interface{})
				if existingHeadersRaw, found := output[headersKey]; found {
					existingHeaders, ok := existingHeadersRaw.(map[string]interface{})
					if !ok {
						return nil, fmt.Errorf("invalid 'outputs.headers', expected a map not a %T", outputRaw)
					}
					headers = existingHeaders
				}

				for headerName, headerVal := range agentHeaders {
					headers[headerName] = headerVal
				}

				output[headersKey] = headers
			}
		}

		outputsMap[name] = outputI{
			name:       name,
			enabled:    enabled,
			logLevel:   logLevel,
			outputType: t,
			config:     output,
			inputs:     make(map[string][]inputI),
		}
	}

	// map the inputs to the outputs
	inputsRaw, ok := policy[inputsKey]
	if !ok {
		// no inputs; no components then
		return nil, nil
	}
	inputs, ok := inputsRaw.([]interface{})
	if !ok {
		return nil, fmt.Errorf("invalid 'inputs', expected an array not a %T", inputsRaw)
	}
	for idx, inputRaw := range inputs {
		input, ok := inputRaw.(map[string]interface{})
		if !ok {
			return nil, fmt.Errorf("invalid 'inputs.%d', expected a map not a %T", idx, inputRaw)
		}
		typeRaw, ok := input[typeKey]
		if !ok {
			return nil, fmt.Errorf("invalid 'inputs.%d', 'type' missing", idx)
		}
		t, ok := typeRaw.(string)
		if !ok {
			return nil, fmt.Errorf("invalid 'inputs.%d.type', expected a string not a %T", idx, typeRaw)
		}
		if realInputType, found := aliasMapping[t]; found {
			t = realInputType
			// by replacing type we make sure component understands aliasing
			input[typeKey] = t
		}
		idRaw, ok := input[idKey]
		if !ok {
			// no ID; fallback to type
			idRaw = t
		}
		id, ok := idRaw.(string)
		if !ok {
			return nil, fmt.Errorf("invalid 'inputs.%d.id', expected a string not a %T", idx, idRaw)
		}
		if hasDuplicate(outputsMap, id) {
			return nil, fmt.Errorf("invalid 'inputs.%d.id', has a duplicate id %q. Please add a unique value for the 'id' key to each input in the agent policy", idx, id)
		}
		outputName := "default"
		if outputRaw, ok := input[useOutputKey]; ok {
			outputNameVal, ok := outputRaw.(string)
			if !ok {
				return nil, fmt.Errorf("invalid 'inputs.%d.use_output', expected a string not a %T", idx, outputRaw)
			}
			outputName = outputNameVal
			delete(input, useOutputKey)
		}
		output, ok := outputsMap[outputName]
		if !ok {
			return nil, fmt.Errorf("invalid 'inputs.%d.use_output', references an unknown output '%s'", idx, outputName)
		}
		enabled := true
		if enabledRaw, ok := input[enabledKey]; ok {
			enabledVal, ok := enabledRaw.(bool)
			if !ok {
				return nil, fmt.Errorf("invalid 'inputs.%d.enabled', expected a bool not a %T", idx, enabledRaw)
			}
			enabled = enabledVal
			delete(input, enabledKey)
		}
		logLevel, err := getLogLevel(input, ll)
		if err != nil {
			return nil, fmt.Errorf("invalid 'inputs.%d.log_level', %w", idx, err)
		}

		runtimeManager := DefaultRuntimeManager
		// determine the runtime manager for the input
		if runtimeManagerRaw, ok := input[runtimeManagerKey]; ok {
			runtimeManagerStr, ok := runtimeManagerRaw.(string)
			if !ok {
				return nil, fmt.Errorf("invalid 'inputs.%d.runtime', expected a string, not a %T", idx, runtimeManagerRaw)
			}
			runtimeManagerVal := RuntimeManager(runtimeManagerStr)
			switch runtimeManagerVal {
			case OtelRuntimeManager, ProcessRuntimeManager:
			default:
				return nil, fmt.Errorf("invalid 'inputs.%d.runtime', valid values are: %s, %s", idx, OtelRuntimeManager, ProcessRuntimeManager)
			}
			runtimeManager = runtimeManagerVal
			delete(input, runtimeManagerKey)
		}

		// Inject the top level fleet policy revision into each input configuration. This
		// allows individual inputs (like endpoint) to detect policy changes more easily.
		injectInputPolicyID(policy, input)

		output.inputs[t] = append(output.inputs[t], inputI{
			idx:            idx,
			id:             id,
			enabled:        enabled,
			logLevel:       logLevel,
			inputType:      t,
			config:         input,
			runtimeManager: runtimeManager,
		})
	}
	if len(outputsMap) == 0 {
		return nil, nil
	}
	return outputsMap, nil
}