remappers/hostmetrics/process.go (272 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package hostmetrics import ( "time" "github.com/elastic/opentelemetry-lib/remappers/internal/remappedmetric" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" ) func remapProcessMetrics( src, out pmetric.MetricSlice, resource pcommon.Resource, mutator func(pmetric.NumberDataPoint), ) error { var timestamp, startTimestamp pcommon.Timestamp var threads, memUsage, memVirtual, fdOpen, ioReadBytes, ioWriteBytes, ioReadOperations, ioWriteOperations int64 var memUtil, total, systemCpuTime, userCpuTime float64 for i := 0; i < src.Len(); i++ { metric := src.At(i) switch metric.Name() { case "process.threads": dp := metric.Sum().DataPoints().At(0) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } threads = dp.IntValue() case "process.memory.utilization": dp := metric.Gauge().DataPoints().At(0) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } memUtil = dp.DoubleValue() case "process.memory.usage": dp := metric.Sum().DataPoints().At(0) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } memUsage = dp.IntValue() case "process.memory.virtual": dp := metric.Sum().DataPoints().At(0) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } memVirtual = dp.IntValue() case "process.open_file_descriptors": dp := metric.Sum().DataPoints().At(0) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } fdOpen = dp.IntValue() case "process.cpu.time": dataPoints := metric.Sum().DataPoints() for j := 0; j < dataPoints.Len(); j++ { dp := dataPoints.At(j) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } value := dp.DoubleValue() if state, ok := dp.Attributes().Get("state"); ok { switch state.Str() { case "system": systemCpuTime = value total += value case "user": userCpuTime = value total += value case "wait": total += value } } } case "process.disk.io": dataPoints := metric.Sum().DataPoints() for j := 0; j < dataPoints.Len(); j++ { dp := dataPoints.At(j) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } value := dp.IntValue() if direction, ok := dp.Attributes().Get("direction"); ok { switch direction.Str() { case "read": ioReadBytes = value case "write": ioWriteBytes = value } } } case "process.disk.operations": dataPoints := metric.Sum().DataPoints() for j := 0; j < dataPoints.Len(); j++ { dp := dataPoints.At(j) if timestamp == 0 { timestamp = dp.Timestamp() } if startTimestamp == 0 { startTimestamp = dp.StartTimestamp() } value := dp.IntValue() if direction, ok := dp.Attributes().Get("direction"); ok { switch direction.Str() { case "read": ioReadOperations = value case "write": ioWriteOperations = value } } } } } systemCpuTime = systemCpuTime * 1000 userCpuTime = userCpuTime * 1000 startTime := startTimestamp.AsTime() startTimeMillis := startTime.UnixMilli() memUtilPct := memUtil / 100 cpuTimeValue := total * 1000 processRuntime := timestamp.AsTime().UnixMilli() - startTimeMillis cpuPct := cpuTimeValue / float64(processRuntime) finalMutator := remappedmetric.ChainedMutator( mutator, addProcessResources(resource, startTime.UTC()), ) remappedmetric.Add(out, finalMutator, // The timestamp metrics get converted from Int to Timestamp in Kibana // since these are mapped to timestamp datatype remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "process.cpu.start_time", Timestamp: timestamp, IntValue: &startTimeMillis, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.num_threads", Timestamp: timestamp, IntValue: &threads, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeGauge, Name: "system.process.memory.rss.pct", Timestamp: timestamp, DoubleValue: &memUtilPct, }, // The process rss bytes have been found to be equal to the memory usage reported by OTEL remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.memory.rss.bytes", Timestamp: timestamp, IntValue: &memUsage, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.memory.size", Timestamp: timestamp, IntValue: &memVirtual, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.fd.open", Timestamp: timestamp, IntValue: &fdOpen, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeGauge, Name: "process.memory.pct", Timestamp: timestamp, DoubleValue: &memUtilPct, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.cpu.total.value", Timestamp: timestamp, DoubleValue: &cpuTimeValue, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.cpu.system.ticks", Timestamp: timestamp, DoubleValue: &systemCpuTime, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.cpu.user.ticks", Timestamp: timestamp, DoubleValue: &userCpuTime, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.cpu.total.ticks", Timestamp: timestamp, DoubleValue: &cpuTimeValue, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.io.read_bytes", Timestamp: timestamp, IntValue: &ioReadBytes, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.io.write_bytes", Timestamp: timestamp, IntValue: &ioWriteBytes, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.io.read_ops", Timestamp: timestamp, IntValue: &ioReadOperations, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeSum, Name: "system.process.io.write_ops", Timestamp: timestamp, IntValue: &ioWriteOperations, }, remappedmetric.Metric{ DataType: pmetric.MetricTypeGauge, Name: "system.process.cpu.total.pct", Timestamp: timestamp, DoubleValue: &cpuPct, }, ) return nil } func addProcessResources(resource pcommon.Resource, startTime time.Time) func(pmetric.NumberDataPoint) { startTimeStr := startTime.Format(time.RFC3339) return func(dp pmetric.NumberDataPoint) { ppid, _ := resource.Attributes().Get("process.parent_pid") if ppid.Int() != 0 { dp.Attributes().PutInt("process.parent.pid", ppid.Int()) } owner, _ := resource.Attributes().Get("process.owner") if owner.Str() != "" { dp.Attributes().PutStr("user.name", owner.Str()) } else { dp.Attributes().PutStr("user.name", "undefined") } exec, _ := resource.Attributes().Get("process.executable.path") if exec.Str() != "" { dp.Attributes().PutStr("process.executable", exec.Str()) } name, _ := resource.Attributes().Get("process.executable.name") if name.Str() != "" { dp.Attributes().PutStr("process.name", name.Str()) } cmdline, _ := resource.Attributes().Get("process.command_line") if cmdline.Str() != "" { dp.Attributes().PutStr("system.process.cmdline", cmdline.Str()) } else { dp.Attributes().PutStr("system.process.cmdline", "undefined") } dp.Attributes().PutStr("system.process.cpu.start_time", startTimeStr) // Adding dummy value to process.state as "undefined", since this field is not // available through hostmetrics receiver currently and Process tab in curated // UI's need this field as a prerequisite. dp.Attributes().PutStr("system.process.state", "undefined") } }