internal/pkg/composable/providers/kubernetes/hints.go (270 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package kubernetes
import (
"fmt"
"regexp"
"strings"
"github.com/elastic/elastic-agent-autodiscover/utils"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
)
const (
hints = "hints"
integration = "package"
datastreams = "data_streams"
host = "host"
period = "period"
timeout = "timeout"
metricspath = "metrics_path"
username = "username"
password = "password"
stream = "stream" // this is the container stream: stdout/stderr
processors = "processors"
)
var allSupportedHints = []string{"enabled", integration, datastreams, host, period, timeout, metricspath, username, password, stream, processors}
type hintsBuilder struct {
Key string
logger *logp.Logger
}
func (m *hintsBuilder) getIntegration(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, integration)
}
func (m *hintsBuilder) getDataStreams(hints mapstr.M) []string {
ds := utils.GetHintAsList(hints, m.Key, datastreams)
return ds
}
func (m *hintsBuilder) getHost(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, host)
}
func (m *hintsBuilder) getStreamHost(hints mapstr.M, streamName string) string {
key := fmt.Sprintf("%v.%v", streamName, host)
return utils.GetHintString(hints, m.Key, key)
}
func (m *hintsBuilder) getPeriod(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, period)
}
func (m *hintsBuilder) getStreamPeriod(hints mapstr.M, streamName string) string {
key := fmt.Sprintf("%v.%v", streamName, period)
return utils.GetHintString(hints, m.Key, key)
}
func (m *hintsBuilder) getTimeout(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, timeout)
}
func (m *hintsBuilder) getStreamTimeout(hints mapstr.M, streamName string) string {
key := fmt.Sprintf("%v.%v", streamName, timeout)
return utils.GetHintString(hints, m.Key, key)
}
func (m *hintsBuilder) getMetricspath(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, metricspath)
}
func (m *hintsBuilder) getStreamMetricspath(hints mapstr.M, streamName string) string {
key := fmt.Sprintf("%v.%v", streamName, metricspath)
return utils.GetHintString(hints, m.Key, key)
}
func (m *hintsBuilder) getUsername(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, username)
}
func (m *hintsBuilder) getStreamUsername(hints mapstr.M, streamName string) string {
key := fmt.Sprintf("%v.%v", streamName, username)
return utils.GetHintString(hints, m.Key, key)
}
func (m *hintsBuilder) getPassword(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, password)
}
func (m *hintsBuilder) getStreamPassword(hints mapstr.M, streamName string) string {
key := fmt.Sprintf("%v.%v", streamName, password)
return utils.GetHintString(hints, m.Key, key)
}
func (m *hintsBuilder) getContainerStream(hints mapstr.M) string {
return utils.GetHintString(hints, m.Key, stream)
}
func (m *hintsBuilder) getStreamContainerStream(hints mapstr.M, streamName string) string {
key := fmt.Sprintf("%v.%v", streamName, stream)
return utils.GetHintString(hints, m.Key, key)
}
// Replace hints like `'${kubernetes.pod.ip}:6379'` with the actual values from the resource metadata.
// So if you replace the `${kubernetes.pod.ip}` part with the value from the Pod's metadata
// you end up with sth like `10.28.90.345:6379`
func (m *hintsBuilder) getFromMeta(value string, kubeMeta mapstr.M) string {
if value == "" {
return ""
}
r := regexp.MustCompile(`\${([^{}]+)}`)
matches := r.FindAllString(value, -1)
for _, match := range matches {
key := strings.TrimSuffix(strings.TrimPrefix(match, "${kubernetes."), "}")
val, err := kubeMeta.GetValue(key)
if err != nil {
m.logger.Debugf("cannot retrieve key from k8smeta: %v", key)
return ""
}
hintVal, ok := val.(string)
if !ok {
m.logger.Debugf("cannot convert value into string: %v", val)
return ""
}
value = strings.Replace(value, match, hintVal, -1)
}
return value
}
// GenerateHintsMapping gets a hint's map extracted from the annotations and constructs the final
// hints' mapping to be emitted.
func GenerateHintsMapping(hints mapstr.M, kubeMeta mapstr.M, logger *logp.Logger, containerID string) mapstr.M {
builder := hintsBuilder{
Key: "hints", // consider doing it a configurable,
logger: logger,
}
hintsMapping := mapstr.M{}
integration := builder.getIntegration(hints)
if integration == "" {
return hintsMapping
}
integrationHints := mapstr.M{}
if containerID != "" {
_, _ = hintsMapping.Put("container_id", containerID)
// Add the default container log fallback to enable any template which defines
// a log input with a `"${kubernetes.hints.container_logs.enabled} == true"` condition
_, _ = integrationHints.Put("container_logs.enabled", true)
}
integrationHost := builder.getFromMeta(builder.getHost(hints), kubeMeta)
if integrationHost != "" {
_, _ = integrationHints.Put(host, integrationHost)
}
integrationPeriod := builder.getFromMeta(builder.getPeriod(hints), kubeMeta)
if integrationPeriod != "" {
_, _ = integrationHints.Put(period, integrationPeriod)
}
integrationTimeout := builder.getFromMeta(builder.getTimeout(hints), kubeMeta)
if integrationTimeout != "" {
_, _ = integrationHints.Put(timeout, integrationTimeout)
}
integrationMetricsPath := builder.getFromMeta(builder.getMetricspath(hints), kubeMeta)
if integrationMetricsPath != "" {
_, _ = integrationHints.Put(metricspath, integrationMetricsPath)
}
integrationUsername := builder.getFromMeta(builder.getUsername(hints), kubeMeta)
if integrationUsername != "" {
_, _ = integrationHints.Put(username, integrationUsername)
}
integrationPassword := builder.getFromMeta(builder.getPassword(hints), kubeMeta)
if integrationPassword != "" {
_, _ = integrationHints.Put(password, integrationPassword)
}
integrationContainerStream := builder.getFromMeta(builder.getContainerStream(hints), kubeMeta)
if integrationContainerStream != "" {
_, _ = integrationHints.Put(stream, integrationContainerStream)
}
dataStreams := builder.getDataStreams(hints)
if len(dataStreams) == 0 {
_, _ = integrationHints.Put("enabled", true)
}
for _, dataStream := range dataStreams {
streamHints := mapstr.M{
"enabled": true,
}
if integrationPeriod != "" {
_, _ = streamHints.Put(period, integrationPeriod)
}
if integrationHost != "" {
_, _ = streamHints.Put(host, integrationHost)
}
if integrationTimeout != "" {
_, _ = streamHints.Put(timeout, integrationTimeout)
}
if integrationMetricsPath != "" {
_, _ = streamHints.Put(metricspath, integrationMetricsPath)
}
if integrationUsername != "" {
_, _ = streamHints.Put(username, integrationUsername)
}
if integrationPassword != "" {
_, _ = streamHints.Put(password, integrationPassword)
}
if integrationContainerStream != "" {
_, _ = streamHints.Put(stream, integrationContainerStream)
}
streamPeriod := builder.getFromMeta(builder.getStreamPeriod(hints, dataStream), kubeMeta)
if streamPeriod != "" {
_, _ = streamHints.Put(period, streamPeriod)
}
streamHost := builder.getFromMeta(builder.getStreamHost(hints, dataStream), kubeMeta)
if streamHost != "" {
_, _ = streamHints.Put(host, streamHost)
}
streamTimeout := builder.getFromMeta(builder.getStreamTimeout(hints, dataStream), kubeMeta)
if streamTimeout != "" {
_, _ = streamHints.Put(timeout, streamTimeout)
}
streamMetricsPath := builder.getFromMeta(builder.getStreamMetricspath(hints, dataStream), kubeMeta)
if streamMetricsPath != "" {
_, _ = streamHints.Put(metricspath, streamMetricsPath)
}
streamUsername := builder.getFromMeta(builder.getStreamUsername(hints, dataStream), kubeMeta)
if streamUsername != "" {
_, _ = streamHints.Put(username, streamUsername)
}
streamPassword := builder.getFromMeta(builder.getStreamPassword(hints, dataStream), kubeMeta)
if streamPassword != "" {
_, _ = streamHints.Put(password, streamPassword)
}
streamContainerStream := builder.getFromMeta(builder.getStreamContainerStream(hints, dataStream), kubeMeta)
if streamContainerStream != "" {
_, _ = streamHints.Put(stream, streamContainerStream)
}
_, _ = integrationHints.Put(dataStream, streamHints)
}
_, _ = hintsMapping.Put(integration, integrationHints)
return hintsMapping
}
// GetHintsMapping Generates the hints and processor mappings from provided pod annotation map
func GetHintsMapping(k8sMapping map[string]interface{}, logger *logp.Logger, prefix string, cID string) hintsData {
hintData := hintsData{
composableMapping: mapstr.M{},
processors: []mapstr.M{},
}
cName := ""
cHost := ""
ann, ok := k8sMapping["annotations"]
if !ok {
return hintData
}
annotations, ok := ann.(mapstr.M)
if !ok {
return hintData
}
// Get the name of the container from the metadata. We need it to extract the hints that affect it directly.
// E.g. co.elastic.hints.<container-name>/host: "..."
if con, ok := k8sMapping["container"]; ok {
if containers, ok := con.(mapstr.M); ok {
if name, err := containers.GetValue("name"); err == nil {
if nameString, ok := name.(string); ok {
cName = nameString
}
}
if cPort, err := containers.GetValue("port"); err == nil {
// This is the default for the host value of a specific container.
if portString, ok := cPort.(string); ok {
cHost = "${kubernetes.pod.ip}:" + portString
}
}
}
}
hintsExtracted, _ := utils.GenerateHints(annotations, cName, prefix, false, allSupportedHints)
if len(hintsExtracted) == 0 {
return hintData
}
// Check if host exists. Otherwise, add default entry for it.
if cHost != "" {
hintsValues, ok := hintsExtracted[hints]
if ok {
if hintsHostValues, ok := hintsValues.(mapstr.M); ok {
if _, ok := hintsHostValues[host]; !ok {
hintsHostValues[host] = cHost
}
}
} else {
hintsExtracted[hints] = mapstr.M{
host: cHost,
}
}
}
logger.Debugf("Extracted hints are :%v", hintsExtracted)
hintData.composableMapping = GenerateHintsMapping(hintsExtracted, k8sMapping, logger, cID)
logger.Debugf("Generated hints mappings :%v", hintData.composableMapping)
hintData.processors = utils.GetConfigs(annotations, prefix, hints+"/"+processors)
// We need to check the processors for the specific container, if they exist.
if cName != "" {
containerProcessors := utils.GetConfigs(annotations, prefix, hints+"."+cName+"/"+processors)
if len(containerProcessors) > 0 {
hintData.processors = append(hintData.processors, containerProcessors...)
}
}
logger.Debugf("Generated Processors mapping :%v", hintData.processors)
return hintData
}