kubelet-to-gcm/monitor/kubelet/translate.go (533 lines of code) (raw):

/* Copyright 2017 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package kubelet import ( "fmt" "time" "github.com/golang/glog" v3 "google.golang.org/api/monitoring/v3" stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1" "github.com/GoogleCloudPlatform/k8s-stackdriver/kubelet-to-gcm/monitor" ) var ( daemonCpuCoreUsageTimeMD = &metricMetadata{ MetricKind: "CUMULATIVE", ValueType: "DOUBLE", Name: "kubernetes.io/node_daemon/cpu/core_usage_time", } daemonMemUsedMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/node_daemon/memory/used_bytes", } // Container metrics containerUptimeMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "DOUBLE", Name: "kubernetes.io/container/uptime", } containerCpuCoreUsageTimeMD = &metricMetadata{ MetricKind: "CUMULATIVE", ValueType: "DOUBLE", Name: "kubernetes.io/container/cpu/core_usage_time", } containerMemTotalMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/container/memory/limit_bytes", } containerMemUsedMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/container/memory/used_bytes", } containerPageFaultsMD = &metricMetadata{ MetricKind: "CUMULATIVE", ValueType: "INT64", Name: "kubernetes.io/container/memory/page_fault_count", } containerEphemeralstorageUsedMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/container/ephemeral_storage/used_bytes", } // Node metrics nodeCpuCoreUsageTimeMD = &metricMetadata{ MetricKind: "CUMULATIVE", ValueType: "DOUBLE", Name: "kubernetes.io/node/cpu/core_usage_time", } nodeMemTotalMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/node/memory/total_bytes", } nodeMemUsedMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/node/memory/used_bytes", } nodeEphemeralstorageTotalMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/node/ephemeral_storage/total_bytes", } nodeEphemeralstorageUsedMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "kubernetes.io/node/ephemeral_storage/used_bytes", } // Legacy metrics. legacyUsageTimeMD = &metricMetadata{ MetricKind: "CUMULATIVE", ValueType: "DOUBLE", Name: "container.googleapis.com/container/cpu/usage_time", } legacyDiskTotalMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "container.googleapis.com/container/disk/bytes_total", } legacyDiskUsedMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "container.googleapis.com/container/disk/bytes_used", } legacyMemTotalMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "container.googleapis.com/container/memory/bytes_total", } legacyMemUsedMD = &metricMetadata{ MetricKind: "GAUGE", ValueType: "INT64", Name: "container.googleapis.com/container/memory/bytes_used", } legacyPageFaultsMD = &metricMetadata{ MetricKind: "CUMULATIVE", ValueType: "INT64", Name: "container.googleapis.com/container/memory/page_fault_count", } legacyUptimeMD = &metricMetadata{ MetricKind: "CUMULATIVE", ValueType: "DOUBLE", Name: "container.googleapis.com/container/uptime", } memUsedNonEvictableLabels = map[string]string{"memory_type": "non-evictable"} memUsedEvictableLabels = map[string]string{"memory_type": "evictable"} minorPageFaultLabels = map[string]string{"fault_type": "minor"} majorPageFaultLabels = map[string]string{"fault_type": "major"} noLabels = map[string]string{} ) type metricMetadata struct { MetricKind, ValueType, Name string } // Translator contains the required information to perform translations from // kubelet summarys to GCM's GKE metrics. type Translator struct { zone, project, cluster, clusterLocation, instance, instanceID, schemaPrefix string monitoredResourceLabels map[string]string resolution time.Duration useOldResourceModel bool } // NewTranslator creates a new Translator with the given fields. func NewTranslator(zone, project, cluster, clusterLocation, instance, instanceID, schemaPrefix string, monitoredResourceLabels map[string]string, resolution time.Duration) *Translator { return &Translator{ zone: zone, project: project, cluster: cluster, clusterLocation: clusterLocation, instance: instance, instanceID: instanceID, schemaPrefix: schemaPrefix, monitoredResourceLabels: monitoredResourceLabels, resolution: resolution, useOldResourceModel: schemaPrefix == "", } } // Translate translates a summary to its TimeSeries. func (t *Translator) Translate(summary *stats.Summary) (*v3.CreateTimeSeriesRequest, error) { var ts []*v3.TimeSeries nodeTs, err := t.translateNode(summary.Node) if err != nil { return nil, err } podsTs, err := t.translateContainers(summary.Pods) if err != nil { return nil, err } ts = append(ts, nodeTs...) ts = append(ts, podsTs...) return &v3.CreateTimeSeriesRequest{TimeSeries: ts}, nil } func (t *Translator) translateNode(node stats.NodeStats) ([]*v3.TimeSeries, error) { var ( timeSeries, memTS, fsTS, cpuTS []*v3.TimeSeries tsFactory *timeSeriesFactory err error ) tsFactory = newTimeSeriesFactory(t.getMonitoredResource(map[string]string{"pod": "machine"}), t.resolution) // Uptime. This is embedded: there's no nil check. timeSeries = append(timeSeries, tsFactory.newTimeSeries(noLabels, t.getUptimeMD(), t.getUptimePoint(node.StartTime.Time))) // Memory stats. memUsedMD, memTotalMD, pageFaultsMD := t.getMemoryMD(tsFactory.monitoredResource.Type) memTS, err = translateMemory(node.Memory, tsFactory, node.StartTime.Time, memUsedMD, memTotalMD, pageFaultsMD, "") if err != nil { return nil, err } timeSeries = append(timeSeries, memTS...) // File-system stats. diskUsedMD, diskTotalMD := t.getFsMD(tsFactory.monitoredResource.Type) fsTS, err = translateFS("/", node.Fs, tsFactory, node.StartTime.Time, diskUsedMD, diskTotalMD) if err != nil { return nil, err } timeSeries = append(timeSeries, fsTS...) // CPU stats. cpuTS, err = translateCPU(node.CPU, tsFactory, node.StartTime.Time, t.getCpuMD(tsFactory.monitoredResource.Type), "") if err != nil { return nil, err } timeSeries = append(timeSeries, cpuTS...) // System containers for _, container := range node.SystemContainers { // For system containers: // * There won't be duplication; // * There aren't pod id and namespace; // * There is no fs stats. // Pod ID and namespace for system containers are empty. if t.useOldResourceModel { containerSeries, err := t.translateContainer("", "", container, false /* requireFsStats */) if err != nil { glog.Warningf("Failed to translate system container stats for %q: %v", container.Name, err) continue } timeSeries = append(timeSeries, containerSeries...) } else { cpuTS, err = translateCPU(container.CPU, tsFactory, node.StartTime.Time, daemonCpuCoreUsageTimeMD, container.Name) if err != nil { glog.Warningf("Failed to translate system container CPU stats for %q: %v", container.Name, err) } else { timeSeries = append(timeSeries, cpuTS...) } memTS, err = translateMemory(container.Memory, tsFactory, node.StartTime.Time, daemonMemUsedMD, nil, nil, container.Name) if err != nil { glog.Warningf("Failed to translate system container memory stats for %q: %v", container.Name, err) continue } timeSeries = append(timeSeries, memTS...) } } return timeSeries, nil } func (t *Translator) translateContainers(pods []stats.PodStats) ([]*v3.TimeSeries, error) { var timeSeries []*v3.TimeSeries for _, pod := range pods { metricsSeen := make(map[string]time.Time) metrics := make(map[string][]*v3.TimeSeries) namespace := pod.PodRef.Namespace podID := pod.PodRef.Name // There can be duplicate data points for containers, so only // take the latest one. for _, container := range pod.Containers { containerName := container.Name // Check for duplicates if container.StartTime.Time.Before(metricsSeen[containerName]) || container.StartTime.Time.Equal(metricsSeen[containerName]) { continue } metricsSeen[containerName] = container.StartTime.Time containerSeries, err := t.translateContainer(podID, namespace, container, true /* requireFsStats */) if err != nil { glog.Warningf("Failed to translate container stats for container %q in pod %q(%q): %v", containerName, podID, namespace, err) continue } metrics[containerName] = containerSeries } // Flatten the deduplicated metrics. for _, containerSeries := range metrics { timeSeries = append(timeSeries, containerSeries...) } } return timeSeries, nil } func (t *Translator) translateContainer(podID, namespace string, container stats.ContainerStats, requireFsStats bool) ([]*v3.TimeSeries, error) { var ( containerSeries, memTS, rootfsTS, logfsTS, cpuTS []*v3.TimeSeries err error containerName = container.Name containerLabels = map[string]string{ "namespace": namespace, "pod": podID, "container": containerName, } ) tsFactory := newTimeSeriesFactory(t.getMonitoredResource(containerLabels), t.resolution) // Uptime. This is embedded: there's no nil check. containerSeries = append(containerSeries, tsFactory.newTimeSeries(noLabels, t.getUptimeMD(), t.getUptimePoint(container.StartTime.Time))) // Memory stats. memUsedMD, memTotalMD, pageFaultsMD := t.getMemoryMD(tsFactory.monitoredResource.Type) memTS, err = translateMemory(container.Memory, tsFactory, container.StartTime.Time, memUsedMD, memTotalMD, pageFaultsMD, "") if err != nil { return nil, fmt.Errorf("failed to translate memory stats: %v", err) } containerSeries = append(containerSeries, memTS...) // File-system stats. diskUsedMD, diskTotalMD := t.getFsMD(tsFactory.monitoredResource.Type) if t.useOldResourceModel { rootfsTS, err = translateFS("/", container.Rootfs, tsFactory, container.StartTime.Time, diskUsedMD, diskTotalMD) } else { rootfsTS, err = containerTranslateFS("/", container.Rootfs, container.Logs, tsFactory, container.StartTime.Time) } if err != nil { if requireFsStats { return nil, fmt.Errorf("failed to translate rootfs stats: %v", err) } } else { containerSeries = append(containerSeries, rootfsTS...) } if t.useOldResourceModel { logfsTS, err = translateFS("logs", container.Logs, tsFactory, container.StartTime.Time, diskUsedMD, diskTotalMD) if err != nil { if requireFsStats { return nil, fmt.Errorf("failed to translate log stats: %v", err) } } else { containerSeries = append(containerSeries, logfsTS...) } } // CPU stats. cpuTS, err = translateCPU(container.CPU, tsFactory, container.StartTime.Time, t.getCpuMD(tsFactory.monitoredResource.Type), "") if err != nil { return nil, fmt.Errorf("failed to translate cpu stats: %v", err) } containerSeries = append(containerSeries, cpuTS...) return containerSeries, nil } func (t *Translator) getUptimeMD() *metricMetadata { if t.useOldResourceModel { return legacyUptimeMD } return containerUptimeMD } func (t *Translator) getUptimePoint(startTime time.Time) *v3.Point { now := time.Now() s := now.Format(time.RFC3339) if t.useOldResourceModel { s = startTime.Format(time.RFC3339) } return &v3.Point{ Interval: &v3.TimeInterval{ EndTime: now.Format(time.RFC3339), StartTime: s, }, Value: &v3.TypedValue{ DoubleValue: monitor.Float64Ptr(float64(time.Since(startTime).Seconds())), ForceSendFields: []string{"DoubleValue"}, }, } } func (t *Translator) getCpuMD(resourceType string) *metricMetadata { switch resourceType { case t.schemaPrefix + "node": return nodeCpuCoreUsageTimeMD case t.schemaPrefix + "container": return containerCpuCoreUsageTimeMD default: return legacyUsageTimeMD } } func (t *Translator) getFsMD(resourceType string) (*metricMetadata, *metricMetadata) { switch resourceType { case t.schemaPrefix + "node": return nodeEphemeralstorageUsedMD, nodeEphemeralstorageTotalMD case t.schemaPrefix + "container": return containerEphemeralstorageUsedMD, nil default: return legacyDiskUsedMD, legacyDiskTotalMD } } func (t *Translator) getMemoryMD(resourceType string) (*metricMetadata, *metricMetadata, *metricMetadata) { switch resourceType { case t.schemaPrefix + "node": return nodeMemUsedMD, nodeMemTotalMD, nil case t.schemaPrefix + "container": return containerMemUsedMD, containerMemTotalMD, containerPageFaultsMD default: return legacyMemUsedMD, legacyMemTotalMD, legacyPageFaultsMD } } // translateCPU creates all the TimeSeries for a give CPUStat. func translateCPU(cpu *stats.CPUStats, tsFactory *timeSeriesFactory, startTime time.Time, usageTimeMD *metricMetadata, component string) ([]*v3.TimeSeries, error) { var timeSeries []*v3.TimeSeries // First check that all required information is present. if cpu == nil { return nil, fmt.Errorf("CPU information missing.") } if cpu.UsageCoreNanoSeconds == nil { return nil, fmt.Errorf("UsageCoreNanoSeconds missing from CPUStats %v", cpu) } // Only send cpu usage metric if start time is before current time. Right after container is started, kubelet can return start time == end time. if !cpu.Time.Time.After(startTime) { return nil, nil } // Total CPU utilization for all time. Convert from nanosec to sec. cpuTotalPoint := tsFactory.newPoint(&v3.TypedValue{ DoubleValue: monitor.Float64Ptr(float64(*cpu.UsageCoreNanoSeconds) / float64(1000*1000*1000)), ForceSendFields: []string{"DoubleValue"}, }, startTime, cpu.Time.Time, usageTimeMD.MetricKind) labels := map[string]string{} if component != "" { labels["component"] = component } timeSeries = append(timeSeries, tsFactory.newTimeSeries(labels, usageTimeMD, cpuTotalPoint)) return timeSeries, nil } func containerTranslateFS(volume string, rootfs *stats.FsStats, logs *stats.FsStats, tsFactory *timeSeriesFactory, startTime time.Time) ([]*v3.TimeSeries, error) { var combinedUsage uint64 if rootfs != nil { combinedUsage = *rootfs.UsedBytes } if logs != nil { combinedUsage = combinedUsage + *logs.UsedBytes } combinedStats := &stats.FsStats{ UsedBytes: &combinedUsage, } if rootfs == nil && logs == nil { combinedStats = nil } return translateFS(volume, combinedStats, tsFactory, startTime, containerEphemeralstorageUsedMD, nil) } // translateFS creates all the TimeSeries for a given FsStats and volume name. func translateFS(volume string, fs *stats.FsStats, tsFactory *timeSeriesFactory, startTime time.Time, diskUsedMD *metricMetadata, diskTotalMD *metricMetadata) ([]*v3.TimeSeries, error) { var timeSeries []*v3.TimeSeries if fs == nil { return nil, fmt.Errorf("File-system information missing.") } // For some reason the Kubelet doesn't return when this sample is from, // so we'll use now. now := time.Now() resourceLabels := map[string]string{"device_name": volume} if tsFactory.monitoredResource.Type != "gke_container" { resourceLabels = noLabels } if diskTotalMD != nil { if fs.CapacityBytes == nil { return nil, fmt.Errorf("CapacityBytes is missing from FsStats %v", fs) } // Total disk available. diskTotalPoint := tsFactory.newPoint(&v3.TypedValue{ Int64Value: monitor.Int64Ptr(int64(*fs.CapacityBytes)), ForceSendFields: []string{"Int64Value"}, }, startTime, now, diskTotalMD.MetricKind) timeSeries = append(timeSeries, tsFactory.newTimeSeries(resourceLabels, diskTotalMD, diskTotalPoint)) } if diskUsedMD != nil { if fs.UsedBytes == nil { return nil, fmt.Errorf("UsedBytes is missing from FsStats %v", fs) } // Total disk used. diskUsedPoint := tsFactory.newPoint(&v3.TypedValue{ Int64Value: monitor.Int64Ptr(int64(*fs.UsedBytes)), ForceSendFields: []string{"Int64Value"}, }, startTime, now, diskUsedMD.MetricKind) timeSeries = append(timeSeries, tsFactory.newTimeSeries(resourceLabels, diskUsedMD, diskUsedPoint)) } return timeSeries, nil } // translateMemory creates all the TimeSeries for a given MemoryStats. func translateMemory(memory *stats.MemoryStats, tsFactory *timeSeriesFactory, startTime time.Time, memUsedMD *metricMetadata, memTotalMD *metricMetadata, pageFaultsMD *metricMetadata, component string) ([]*v3.TimeSeries, error) { var timeSeries []*v3.TimeSeries if memory == nil { return nil, fmt.Errorf("Memory information missing.") } // Only send page fault metric if start time is before current time. Right after container is started, kubelet can return start time == end time. if pageFaultsMD != nil && memory.Time.Time.After(startTime) { if memory.MajorPageFaults != nil { // Major page faults. majorPFPoint := tsFactory.newPoint(&v3.TypedValue{ Int64Value: monitor.Int64Ptr(int64(*memory.MajorPageFaults)), ForceSendFields: []string{"Int64Value"}, }, startTime, memory.Time.Time, pageFaultsMD.MetricKind) timeSeries = append(timeSeries, tsFactory.newTimeSeries(majorPageFaultLabels, pageFaultsMD, majorPFPoint)) } if memory.PageFaults != nil { // Minor page faults. minorPFPoint := tsFactory.newPoint(&v3.TypedValue{ Int64Value: monitor.Int64Ptr(int64(*memory.PageFaults - *memory.MajorPageFaults)), ForceSendFields: []string{"Int64Value"}, }, startTime, memory.Time.Time, pageFaultsMD.MetricKind) timeSeries = append(timeSeries, tsFactory.newTimeSeries(minorPageFaultLabels, pageFaultsMD, minorPFPoint)) } } if memUsedMD != nil { if memory.WorkingSetBytes == nil { return nil, fmt.Errorf("WorkingSetBytes information missing in MemoryStats %v", memory) } // Non-evictable memory. nonEvictMemPoint := tsFactory.newPoint(&v3.TypedValue{ Int64Value: monitor.Int64Ptr(int64(*memory.WorkingSetBytes)), ForceSendFields: []string{"Int64Value"}, }, startTime, memory.Time.Time, memUsedMD.MetricKind) labels := map[string]string{"memory_type": "non-evictable"} if component != "" { labels["component"] = component } timeSeries = append(timeSeries, tsFactory.newTimeSeries(labels, memUsedMD, nonEvictMemPoint)) if memory.UsageBytes != nil { // Evictable memory. evictMemPoint := tsFactory.newPoint(&v3.TypedValue{ Int64Value: monitor.Int64Ptr(int64(*memory.UsageBytes - *memory.WorkingSetBytes)), ForceSendFields: []string{"Int64Value"}, }, startTime, memory.Time.Time, memUsedMD.MetricKind) labels = map[string]string{"memory_type": "evictable"} if component != "" { labels["component"] = component } timeSeries = append(timeSeries, tsFactory.newTimeSeries(labels, memUsedMD, evictMemPoint)) } } if memTotalMD != nil { // Available memory. This may or may not be present, so don't fail if it's absent. if memory.AvailableBytes != nil { availableMemPoint := tsFactory.newPoint(&v3.TypedValue{ Int64Value: monitor.Int64Ptr(int64(*memory.AvailableBytes)), ForceSendFields: []string{"Int64Value"}, }, startTime, memory.Time.Time, memTotalMD.MetricKind) timeSeries = append(timeSeries, tsFactory.newTimeSeries(noLabels, memTotalMD, availableMemPoint)) } } return timeSeries, nil } func (t *Translator) getMonitoredResource(labels map[string]string) *v3.MonitoredResource { resourceLabels := map[string]string{ "project_id": t.project, "cluster_name": t.cluster, } if t.useOldResourceModel { resourceLabels["zone"] = t.zone resourceLabels["instance_id"] = t.instance resourceLabels["namespace_id"] = labels["namespace"] resourceLabels["pod_id"] = labels["pod"] resourceLabels["container_name"] = labels["container"] return &v3.MonitoredResource{ Type: "gke_container", Labels: resourceLabels, } } resourceLabels["location"] = t.clusterLocation if t.schemaPrefix != "k8s_" { resourceLabels["instance_id"] = t.instanceID } for k, v := range t.monitoredResourceLabels { resourceLabels[k] = v } if _, found := labels["container"]; !found { if t.instance != "" { resourceLabels["node_name"] = t.instance } return &v3.MonitoredResource{ Type: t.schemaPrefix + "node", Labels: resourceLabels, } } resourceLabels["namespace_name"] = labels["namespace"] resourceLabels["pod_name"] = labels["pod"] resourceLabels["container_name"] = labels["container"] return &v3.MonitoredResource{ Type: t.schemaPrefix + "container", Labels: resourceLabels, } } type timeSeriesFactory struct { resolution time.Duration monitoredResource *v3.MonitoredResource } func newTimeSeriesFactory(monitoredResource *v3.MonitoredResource, resolution time.Duration) *timeSeriesFactory { return &timeSeriesFactory{ resolution: resolution, monitoredResource: monitoredResource, } } func (t *timeSeriesFactory) newPoint(val *v3.TypedValue, collectionStartTime time.Time, sampleTime time.Time, metricKind string) *v3.Point { if metricKind == "GAUGE" { collectionStartTime = sampleTime } return &v3.Point{ Interval: &v3.TimeInterval{ EndTime: sampleTime.Format(time.RFC3339), StartTime: collectionStartTime.Format(time.RFC3339), }, Value: val, } } func (t *timeSeriesFactory) newTimeSeries(metricLabels map[string]string, metadata *metricMetadata, point *v3.Point) *v3.TimeSeries { return &v3.TimeSeries{ Metric: &v3.Metric{ Labels: metricLabels, Type: metadata.Name, }, MetricKind: metadata.MetricKind, ValueType: metadata.ValueType, Resource: t.monitoredResource, Points: []*v3.Point{point}, } }