collector/logs/sources/tail/pod_target.go (174 lines of code) (raw):
package tail
import (
"fmt"
"strings"
"github.com/Azure/adx-mon/collector/logs/sources/tail/sourceparse"
"github.com/Azure/adx-mon/collector/logs/transforms/parser"
"github.com/Azure/adx-mon/pkg/logger"
v1 "k8s.io/api/core/v1"
)
const (
// Defined as <DBName>:<Table>
AdxMonLogDestinationAnnotation = "adx-mon/log-destination"
// Defined as comma separated parser names
AdxMonLogParsersAnnotation = "adx-mon/log-parsers"
)
// getFileTargets generates a list of FileTailTargets for the containers within a given pod
func getFileTargets(pod *v1.Pod, nodeName string, staticPodTargets []*StaticPodTargets) []FileTailTarget {
// Only look for targets on our node
if pod.Spec.NodeName != nodeName {
return nil
}
if logger.IsDebug() {
logger.Debugf("Checking for targets for pod %s/%s", pod.Namespace, pod.Name)
}
// Skip the pod if it has not opted in to scraping or are not static pods
staticPodTarget := getStaticPod(pod, staticPodTargets)
if !strings.EqualFold(getAnnotationOrDefault(pod, "adx-mon/scrape", "false"), "true") && staticPodTarget == nil {
if logger.IsDebug() {
logger.Debugf("Pod %s/%s has not opted in to scraping", pod.Namespace, pod.Name)
}
return nil
}
// Skip the pod if it does not have a log database or table configured
// Destination in form of "database:table"
dest := getAnnotationOrDefault(pod, AdxMonLogDestinationAnnotation, "")
if staticPodTarget != nil && dest == "" {
dest = staticPodTarget.Destination
}
destPair := strings.Split(dest, ":")
if len(destPair) != 2 {
if logger.IsDebug() {
logger.Debugf("Pod %s/%s has no log destination configured", pod.Namespace, pod.Name)
}
return nil
}
podDB := destPair[0]
podTable := destPair[1]
if podDB == "" || podTable == "" {
if logger.IsDebug() {
logger.Debugf("Pod %s/%s has no log destination configured", pod.Namespace, pod.Name)
}
return nil
}
parserList := []string{}
parsers := getAnnotationOrDefault(pod, AdxMonLogParsersAnnotation, "")
if staticPodTarget != nil && parsers == "" {
parsers = strings.Join(staticPodTarget.Parsers, ",")
}
if parsers != "" {
parserList = strings.Split(parsers, ",")
for _, currParser := range parserList {
if !parser.IsValidParser(currParser) {
logger.Warnf("Invalid parser %s for pod %s/%s", currParser, pod.Namespace, pod.Name)
return nil
}
}
}
podName := pod.Name
namespaceName := pod.Namespace
containerCount := len(pod.Spec.Containers) + len(pod.Spec.InitContainers) + len(pod.Spec.EphemeralContainers)
targets := make([]FileTailTarget, 0, containerCount)
// example name /var/log/containers/adx-reconciler-85f865d7b5-j5f49_adx-reconciler_adx-reconciler-ea96bea0582a986c502378aef429a275eb75c1d68e1c912ef93c2b5300990b04.log
// podname_namespace_containername-containerid.log
logFilePrefix := fmt.Sprintf("/var/log/containers/%s_%s", podName, namespaceName)
for _, container := range pod.Spec.InitContainers {
if target, ok := targetForContainer(pod, pod.Status.InitContainerStatuses, parserList, container.Name, logFilePrefix, podDB, podTable); ok {
targets = append(targets, target)
}
}
for _, container := range pod.Spec.Containers {
if target, ok := targetForContainer(pod, pod.Status.ContainerStatuses, parserList, container.Name, logFilePrefix, podDB, podTable); ok {
targets = append(targets, target)
}
}
for _, container := range pod.Spec.EphemeralContainers {
if target, ok := targetForContainer(pod, pod.Status.EphemeralContainerStatuses, parserList, container.Name, logFilePrefix, podDB, podTable); ok {
targets = append(targets, target)
}
}
return targets
}
func targetForContainer(pod *v1.Pod, containerStatuses []v1.ContainerStatus, parserList []string, containerName, logFilePrefix, podDB, podTable string) (FileTailTarget, bool) {
containerId, ok := getContainerID(containerStatuses, containerName)
if !ok {
logger.Warnf("Failed to get container ID for container %s in pod %s/%s", containerName, pod.Namespace, pod.Name)
return FileTailTarget{}, false
}
// containerId is <type>://<containerId>
containerIdWithoutType := containerId
slashIdx := strings.IndexByte(containerId, '/')
if slashIdx != -1 && slashIdx+2 < len(containerId) {
containerIdWithoutType = containerId[slashIdx+2:]
}
logFile := fmt.Sprintf("%s_%s-%s.log", logFilePrefix, containerName, containerIdWithoutType)
if logger.IsDebug() {
logger.Debugf("Found target: file:%s database:%s table:%s parsers:%v", logFile, podDB, podTable, parserList)
}
resourceValues := map[string]interface{}{
"pod": pod.Name,
"namespace": pod.Namespace,
"container": containerName,
"containerID": containerId,
}
for k, v := range pod.GetAnnotations() {
if !strings.HasPrefix(k, "adx-mon/") {
key := fmt.Sprintf("annotation.%s", k)
resourceValues[key] = v
}
}
for k, v := range pod.GetLabels() {
key := fmt.Sprintf("label.%s", k)
resourceValues[key] = v
}
target := FileTailTarget{
FilePath: logFile,
LogType: sourceparse.LogTypeKubernetes,
Database: podDB,
Table: podTable,
Parsers: parserList,
Resources: resourceValues,
}
return target, true
}
func getAnnotationOrDefault(p *v1.Pod, key, def string) string {
if value, ok := p.Annotations[key]; ok && value != "" {
return value
}
return def
}
func isTargetChanged(old, new FileTailTarget) bool {
if len(old.Parsers) != len(new.Parsers) {
return true
}
for i := range old.Parsers {
if old.Parsers[i] != new.Parsers[i] {
return true
}
}
if len(old.Resources) != len(new.Resources) {
return true
}
for k, v := range old.Resources {
if new.Resources[k] != v {
return true
}
}
return old.Database != new.Database || old.Table != new.Table
}
func getContainerID(containerStatuses []v1.ContainerStatus, containerName string) (string, bool) {
for _, container := range containerStatuses {
if container.Name == containerName {
containerIdPopulated := container.ContainerID != ""
return container.ContainerID, containerIdPopulated
}
}
return "", false
}
func getStaticPod(pod *v1.Pod, staticPodTargets []*StaticPodTargets) *StaticPodTargets {
for _, staticPod := range staticPodTargets {
if staticPod.Namespace != "" && pod.Namespace != staticPod.Namespace {
continue
}
if staticPod.Name != "" && pod.Name != staticPod.Name {
continue
}
for label, value := range staticPod.Labels {
if pod.Labels[label] != value {
continue
}
}
return staticPod
}
return nil
}