receiver/apachesparkreceiver/internal/metadata/generated_metrics.go (3,669 lines of code) (raw):
// Code generated by mdatagen. DO NOT EDIT.
package metadata
import (
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/filter"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
)
// AttributeDirection specifies the value direction attribute.
type AttributeDirection int
const (
_ AttributeDirection = iota
AttributeDirectionIn
AttributeDirectionOut
)
// String returns the string representation of the AttributeDirection.
func (av AttributeDirection) String() string {
switch av {
case AttributeDirectionIn:
return "in"
case AttributeDirectionOut:
return "out"
}
return ""
}
// MapAttributeDirection is a helper map of string to AttributeDirection attribute value.
var MapAttributeDirection = map[string]AttributeDirection{
"in": AttributeDirectionIn,
"out": AttributeDirectionOut,
}
// AttributeExecutorTaskResult specifies the value executor_task_result attribute.
type AttributeExecutorTaskResult int
const (
_ AttributeExecutorTaskResult = iota
AttributeExecutorTaskResultCompleted
AttributeExecutorTaskResultFailed
)
// String returns the string representation of the AttributeExecutorTaskResult.
func (av AttributeExecutorTaskResult) String() string {
switch av {
case AttributeExecutorTaskResultCompleted:
return "completed"
case AttributeExecutorTaskResultFailed:
return "failed"
}
return ""
}
// MapAttributeExecutorTaskResult is a helper map of string to AttributeExecutorTaskResult attribute value.
var MapAttributeExecutorTaskResult = map[string]AttributeExecutorTaskResult{
"completed": AttributeExecutorTaskResultCompleted,
"failed": AttributeExecutorTaskResultFailed,
}
// AttributeGcType specifies the value gc_type attribute.
type AttributeGcType int
const (
_ AttributeGcType = iota
AttributeGcTypeMajor
AttributeGcTypeMinor
)
// String returns the string representation of the AttributeGcType.
func (av AttributeGcType) String() string {
switch av {
case AttributeGcTypeMajor:
return "major"
case AttributeGcTypeMinor:
return "minor"
}
return ""
}
// MapAttributeGcType is a helper map of string to AttributeGcType attribute value.
var MapAttributeGcType = map[string]AttributeGcType{
"major": AttributeGcTypeMajor,
"minor": AttributeGcTypeMinor,
}
// AttributeJobResult specifies the value job_result attribute.
type AttributeJobResult int
const (
_ AttributeJobResult = iota
AttributeJobResultCompleted
AttributeJobResultFailed
AttributeJobResultSkipped
)
// String returns the string representation of the AttributeJobResult.
func (av AttributeJobResult) String() string {
switch av {
case AttributeJobResultCompleted:
return "completed"
case AttributeJobResultFailed:
return "failed"
case AttributeJobResultSkipped:
return "skipped"
}
return ""
}
// MapAttributeJobResult is a helper map of string to AttributeJobResult attribute value.
var MapAttributeJobResult = map[string]AttributeJobResult{
"completed": AttributeJobResultCompleted,
"failed": AttributeJobResultFailed,
"skipped": AttributeJobResultSkipped,
}
// AttributeLocation specifies the value location attribute.
type AttributeLocation int
const (
_ AttributeLocation = iota
AttributeLocationOnHeap
AttributeLocationOffHeap
)
// String returns the string representation of the AttributeLocation.
func (av AttributeLocation) String() string {
switch av {
case AttributeLocationOnHeap:
return "on_heap"
case AttributeLocationOffHeap:
return "off_heap"
}
return ""
}
// MapAttributeLocation is a helper map of string to AttributeLocation attribute value.
var MapAttributeLocation = map[string]AttributeLocation{
"on_heap": AttributeLocationOnHeap,
"off_heap": AttributeLocationOffHeap,
}
// AttributePoolMemoryType specifies the value pool_memory_type attribute.
type AttributePoolMemoryType int
const (
_ AttributePoolMemoryType = iota
AttributePoolMemoryTypeDirect
AttributePoolMemoryTypeMapped
)
// String returns the string representation of the AttributePoolMemoryType.
func (av AttributePoolMemoryType) String() string {
switch av {
case AttributePoolMemoryTypeDirect:
return "direct"
case AttributePoolMemoryTypeMapped:
return "mapped"
}
return ""
}
// MapAttributePoolMemoryType is a helper map of string to AttributePoolMemoryType attribute value.
var MapAttributePoolMemoryType = map[string]AttributePoolMemoryType{
"direct": AttributePoolMemoryTypeDirect,
"mapped": AttributePoolMemoryTypeMapped,
}
// AttributeSchedulerStatus specifies the value scheduler_status attribute.
type AttributeSchedulerStatus int
const (
_ AttributeSchedulerStatus = iota
AttributeSchedulerStatusWaiting
AttributeSchedulerStatusRunning
)
// String returns the string representation of the AttributeSchedulerStatus.
func (av AttributeSchedulerStatus) String() string {
switch av {
case AttributeSchedulerStatusWaiting:
return "waiting"
case AttributeSchedulerStatusRunning:
return "running"
}
return ""
}
// MapAttributeSchedulerStatus is a helper map of string to AttributeSchedulerStatus attribute value.
var MapAttributeSchedulerStatus = map[string]AttributeSchedulerStatus{
"waiting": AttributeSchedulerStatusWaiting,
"running": AttributeSchedulerStatusRunning,
}
// AttributeSource specifies the value source attribute.
type AttributeSource int
const (
_ AttributeSource = iota
AttributeSourceLocal
AttributeSourceRemote
)
// String returns the string representation of the AttributeSource.
func (av AttributeSource) String() string {
switch av {
case AttributeSourceLocal:
return "local"
case AttributeSourceRemote:
return "remote"
}
return ""
}
// MapAttributeSource is a helper map of string to AttributeSource attribute value.
var MapAttributeSource = map[string]AttributeSource{
"local": AttributeSourceLocal,
"remote": AttributeSourceRemote,
}
// AttributeStageTaskResult specifies the value stage_task_result attribute.
type AttributeStageTaskResult int
const (
_ AttributeStageTaskResult = iota
AttributeStageTaskResultCompleted
AttributeStageTaskResultFailed
AttributeStageTaskResultKilled
)
// String returns the string representation of the AttributeStageTaskResult.
func (av AttributeStageTaskResult) String() string {
switch av {
case AttributeStageTaskResultCompleted:
return "completed"
case AttributeStageTaskResultFailed:
return "failed"
case AttributeStageTaskResultKilled:
return "killed"
}
return ""
}
// MapAttributeStageTaskResult is a helper map of string to AttributeStageTaskResult attribute value.
var MapAttributeStageTaskResult = map[string]AttributeStageTaskResult{
"completed": AttributeStageTaskResultCompleted,
"failed": AttributeStageTaskResultFailed,
"killed": AttributeStageTaskResultKilled,
}
// AttributeState specifies the value state attribute.
type AttributeState int
const (
_ AttributeState = iota
AttributeStateUsed
AttributeStateFree
)
// String returns the string representation of the AttributeState.
func (av AttributeState) String() string {
switch av {
case AttributeStateUsed:
return "used"
case AttributeStateFree:
return "free"
}
return ""
}
// MapAttributeState is a helper map of string to AttributeState attribute value.
var MapAttributeState = map[string]AttributeState{
"used": AttributeStateUsed,
"free": AttributeStateFree,
}
var MetricsInfo = metricsInfo{
SparkDriverBlockManagerDiskUsage: metricInfo{
Name: "spark.driver.block_manager.disk.usage",
},
SparkDriverBlockManagerMemoryUsage: metricInfo{
Name: "spark.driver.block_manager.memory.usage",
},
SparkDriverCodeGeneratorCompilationAverageTime: metricInfo{
Name: "spark.driver.code_generator.compilation.average_time",
},
SparkDriverCodeGeneratorCompilationCount: metricInfo{
Name: "spark.driver.code_generator.compilation.count",
},
SparkDriverCodeGeneratorGeneratedClassAverageSize: metricInfo{
Name: "spark.driver.code_generator.generated_class.average_size",
},
SparkDriverCodeGeneratorGeneratedClassCount: metricInfo{
Name: "spark.driver.code_generator.generated_class.count",
},
SparkDriverCodeGeneratorGeneratedMethodAverageSize: metricInfo{
Name: "spark.driver.code_generator.generated_method.average_size",
},
SparkDriverCodeGeneratorGeneratedMethodCount: metricInfo{
Name: "spark.driver.code_generator.generated_method.count",
},
SparkDriverCodeGeneratorSourceCodeAverageSize: metricInfo{
Name: "spark.driver.code_generator.source_code.average_size",
},
SparkDriverCodeGeneratorSourceCodeOperations: metricInfo{
Name: "spark.driver.code_generator.source_code.operations",
},
SparkDriverDagSchedulerJobActive: metricInfo{
Name: "spark.driver.dag_scheduler.job.active",
},
SparkDriverDagSchedulerJobCount: metricInfo{
Name: "spark.driver.dag_scheduler.job.count",
},
SparkDriverDagSchedulerStageCount: metricInfo{
Name: "spark.driver.dag_scheduler.stage.count",
},
SparkDriverDagSchedulerStageFailed: metricInfo{
Name: "spark.driver.dag_scheduler.stage.failed",
},
SparkDriverExecutorGcOperations: metricInfo{
Name: "spark.driver.executor.gc.operations",
},
SparkDriverExecutorGcTime: metricInfo{
Name: "spark.driver.executor.gc.time",
},
SparkDriverExecutorMemoryExecution: metricInfo{
Name: "spark.driver.executor.memory.execution",
},
SparkDriverExecutorMemoryJvm: metricInfo{
Name: "spark.driver.executor.memory.jvm",
},
SparkDriverExecutorMemoryPool: metricInfo{
Name: "spark.driver.executor.memory.pool",
},
SparkDriverExecutorMemoryStorage: metricInfo{
Name: "spark.driver.executor.memory.storage",
},
SparkDriverHiveExternalCatalogFileCacheHits: metricInfo{
Name: "spark.driver.hive_external_catalog.file_cache_hits",
},
SparkDriverHiveExternalCatalogFilesDiscovered: metricInfo{
Name: "spark.driver.hive_external_catalog.files_discovered",
},
SparkDriverHiveExternalCatalogHiveClientCalls: metricInfo{
Name: "spark.driver.hive_external_catalog.hive_client_calls",
},
SparkDriverHiveExternalCatalogParallelListingJobs: metricInfo{
Name: "spark.driver.hive_external_catalog.parallel_listing_jobs",
},
SparkDriverHiveExternalCatalogPartitionsFetched: metricInfo{
Name: "spark.driver.hive_external_catalog.partitions_fetched",
},
SparkDriverJvmCPUTime: metricInfo{
Name: "spark.driver.jvm_cpu_time",
},
SparkDriverLiveListenerBusDropped: metricInfo{
Name: "spark.driver.live_listener_bus.dropped",
},
SparkDriverLiveListenerBusPosted: metricInfo{
Name: "spark.driver.live_listener_bus.posted",
},
SparkDriverLiveListenerBusProcessingTimeAverage: metricInfo{
Name: "spark.driver.live_listener_bus.processing_time.average",
},
SparkDriverLiveListenerBusQueueSize: metricInfo{
Name: "spark.driver.live_listener_bus.queue_size",
},
SparkExecutorDiskUsage: metricInfo{
Name: "spark.executor.disk.usage",
},
SparkExecutorGcTime: metricInfo{
Name: "spark.executor.gc_time",
},
SparkExecutorInputSize: metricInfo{
Name: "spark.executor.input_size",
},
SparkExecutorMemoryUsage: metricInfo{
Name: "spark.executor.memory.usage",
},
SparkExecutorShuffleIoSize: metricInfo{
Name: "spark.executor.shuffle.io.size",
},
SparkExecutorStorageMemoryUsage: metricInfo{
Name: "spark.executor.storage_memory.usage",
},
SparkExecutorTaskActive: metricInfo{
Name: "spark.executor.task.active",
},
SparkExecutorTaskLimit: metricInfo{
Name: "spark.executor.task.limit",
},
SparkExecutorTaskResult: metricInfo{
Name: "spark.executor.task.result",
},
SparkExecutorTime: metricInfo{
Name: "spark.executor.time",
},
SparkJobStageActive: metricInfo{
Name: "spark.job.stage.active",
},
SparkJobStageResult: metricInfo{
Name: "spark.job.stage.result",
},
SparkJobTaskActive: metricInfo{
Name: "spark.job.task.active",
},
SparkJobTaskResult: metricInfo{
Name: "spark.job.task.result",
},
SparkStageDiskSpilled: metricInfo{
Name: "spark.stage.disk.spilled",
},
SparkStageExecutorCPUTime: metricInfo{
Name: "spark.stage.executor.cpu_time",
},
SparkStageExecutorRunTime: metricInfo{
Name: "spark.stage.executor.run_time",
},
SparkStageIoRecords: metricInfo{
Name: "spark.stage.io.records",
},
SparkStageIoSize: metricInfo{
Name: "spark.stage.io.size",
},
SparkStageJvmGcTime: metricInfo{
Name: "spark.stage.jvm_gc_time",
},
SparkStageMemoryPeak: metricInfo{
Name: "spark.stage.memory.peak",
},
SparkStageMemorySpilled: metricInfo{
Name: "spark.stage.memory.spilled",
},
SparkStageShuffleBlocksFetched: metricInfo{
Name: "spark.stage.shuffle.blocks_fetched",
},
SparkStageShuffleFetchWaitTime: metricInfo{
Name: "spark.stage.shuffle.fetch_wait_time",
},
SparkStageShuffleIoDisk: metricInfo{
Name: "spark.stage.shuffle.io.disk",
},
SparkStageShuffleIoReadSize: metricInfo{
Name: "spark.stage.shuffle.io.read.size",
},
SparkStageShuffleIoRecords: metricInfo{
Name: "spark.stage.shuffle.io.records",
},
SparkStageShuffleIoWriteSize: metricInfo{
Name: "spark.stage.shuffle.io.write.size",
},
SparkStageShuffleWriteTime: metricInfo{
Name: "spark.stage.shuffle.write_time",
},
SparkStageStatus: metricInfo{
Name: "spark.stage.status",
},
SparkStageTaskActive: metricInfo{
Name: "spark.stage.task.active",
},
SparkStageTaskResult: metricInfo{
Name: "spark.stage.task.result",
},
SparkStageTaskResultSize: metricInfo{
Name: "spark.stage.task.result_size",
},
}
type metricsInfo struct {
SparkDriverBlockManagerDiskUsage metricInfo
SparkDriverBlockManagerMemoryUsage metricInfo
SparkDriverCodeGeneratorCompilationAverageTime metricInfo
SparkDriverCodeGeneratorCompilationCount metricInfo
SparkDriverCodeGeneratorGeneratedClassAverageSize metricInfo
SparkDriverCodeGeneratorGeneratedClassCount metricInfo
SparkDriverCodeGeneratorGeneratedMethodAverageSize metricInfo
SparkDriverCodeGeneratorGeneratedMethodCount metricInfo
SparkDriverCodeGeneratorSourceCodeAverageSize metricInfo
SparkDriverCodeGeneratorSourceCodeOperations metricInfo
SparkDriverDagSchedulerJobActive metricInfo
SparkDriverDagSchedulerJobCount metricInfo
SparkDriverDagSchedulerStageCount metricInfo
SparkDriverDagSchedulerStageFailed metricInfo
SparkDriverExecutorGcOperations metricInfo
SparkDriverExecutorGcTime metricInfo
SparkDriverExecutorMemoryExecution metricInfo
SparkDriverExecutorMemoryJvm metricInfo
SparkDriverExecutorMemoryPool metricInfo
SparkDriverExecutorMemoryStorage metricInfo
SparkDriverHiveExternalCatalogFileCacheHits metricInfo
SparkDriverHiveExternalCatalogFilesDiscovered metricInfo
SparkDriverHiveExternalCatalogHiveClientCalls metricInfo
SparkDriverHiveExternalCatalogParallelListingJobs metricInfo
SparkDriverHiveExternalCatalogPartitionsFetched metricInfo
SparkDriverJvmCPUTime metricInfo
SparkDriverLiveListenerBusDropped metricInfo
SparkDriverLiveListenerBusPosted metricInfo
SparkDriverLiveListenerBusProcessingTimeAverage metricInfo
SparkDriverLiveListenerBusQueueSize metricInfo
SparkExecutorDiskUsage metricInfo
SparkExecutorGcTime metricInfo
SparkExecutorInputSize metricInfo
SparkExecutorMemoryUsage metricInfo
SparkExecutorShuffleIoSize metricInfo
SparkExecutorStorageMemoryUsage metricInfo
SparkExecutorTaskActive metricInfo
SparkExecutorTaskLimit metricInfo
SparkExecutorTaskResult metricInfo
SparkExecutorTime metricInfo
SparkJobStageActive metricInfo
SparkJobStageResult metricInfo
SparkJobTaskActive metricInfo
SparkJobTaskResult metricInfo
SparkStageDiskSpilled metricInfo
SparkStageExecutorCPUTime metricInfo
SparkStageExecutorRunTime metricInfo
SparkStageIoRecords metricInfo
SparkStageIoSize metricInfo
SparkStageJvmGcTime metricInfo
SparkStageMemoryPeak metricInfo
SparkStageMemorySpilled metricInfo
SparkStageShuffleBlocksFetched metricInfo
SparkStageShuffleFetchWaitTime metricInfo
SparkStageShuffleIoDisk metricInfo
SparkStageShuffleIoReadSize metricInfo
SparkStageShuffleIoRecords metricInfo
SparkStageShuffleIoWriteSize metricInfo
SparkStageShuffleWriteTime metricInfo
SparkStageStatus metricInfo
SparkStageTaskActive metricInfo
SparkStageTaskResult metricInfo
SparkStageTaskResultSize metricInfo
}
type metricInfo struct {
Name string
}
type metricSparkDriverBlockManagerDiskUsage struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.block_manager.disk.usage metric with initial data.
func (m *metricSparkDriverBlockManagerDiskUsage) init() {
m.data.SetName("spark.driver.block_manager.disk.usage")
m.data.SetDescription("Disk space used by the BlockManager.")
m.data.SetUnit("mb")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverBlockManagerDiskUsage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverBlockManagerDiskUsage) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverBlockManagerDiskUsage) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverBlockManagerDiskUsage(cfg MetricConfig) metricSparkDriverBlockManagerDiskUsage {
m := metricSparkDriverBlockManagerDiskUsage{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverBlockManagerMemoryUsage struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.block_manager.memory.usage metric with initial data.
func (m *metricSparkDriverBlockManagerMemoryUsage) init() {
m.data.SetName("spark.driver.block_manager.memory.usage")
m.data.SetDescription("Memory usage for the driver's BlockManager.")
m.data.SetUnit("mb")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverBlockManagerMemoryUsage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, locationAttributeValue string, stateAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("location", locationAttributeValue)
dp.Attributes().PutStr("state", stateAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverBlockManagerMemoryUsage) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverBlockManagerMemoryUsage) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverBlockManagerMemoryUsage(cfg MetricConfig) metricSparkDriverBlockManagerMemoryUsage {
m := metricSparkDriverBlockManagerMemoryUsage{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorCompilationAverageTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.compilation.average_time metric with initial data.
func (m *metricSparkDriverCodeGeneratorCompilationAverageTime) init() {
m.data.SetName("spark.driver.code_generator.compilation.average_time")
m.data.SetDescription("Average time spent during CodeGenerator source code compilation operations.")
m.data.SetUnit("ms")
m.data.SetEmptyGauge()
}
func (m *metricSparkDriverCodeGeneratorCompilationAverageTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) {
if !m.config.Enabled {
return
}
dp := m.data.Gauge().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetDoubleValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorCompilationAverageTime) updateCapacity() {
if m.data.Gauge().DataPoints().Len() > m.capacity {
m.capacity = m.data.Gauge().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorCompilationAverageTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorCompilationAverageTime(cfg MetricConfig) metricSparkDriverCodeGeneratorCompilationAverageTime {
m := metricSparkDriverCodeGeneratorCompilationAverageTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorCompilationCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.compilation.count metric with initial data.
func (m *metricSparkDriverCodeGeneratorCompilationCount) init() {
m.data.SetName("spark.driver.code_generator.compilation.count")
m.data.SetDescription("Number of source code compilation operations performed by the CodeGenerator.")
m.data.SetUnit("{ compilation }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverCodeGeneratorCompilationCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorCompilationCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorCompilationCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorCompilationCount(cfg MetricConfig) metricSparkDriverCodeGeneratorCompilationCount {
m := metricSparkDriverCodeGeneratorCompilationCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorGeneratedClassAverageSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.generated_class.average_size metric with initial data.
func (m *metricSparkDriverCodeGeneratorGeneratedClassAverageSize) init() {
m.data.SetName("spark.driver.code_generator.generated_class.average_size")
m.data.SetDescription("Average class size of the classes generated by the CodeGenerator.")
m.data.SetUnit("bytes")
m.data.SetEmptyGauge()
}
func (m *metricSparkDriverCodeGeneratorGeneratedClassAverageSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) {
if !m.config.Enabled {
return
}
dp := m.data.Gauge().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetDoubleValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorGeneratedClassAverageSize) updateCapacity() {
if m.data.Gauge().DataPoints().Len() > m.capacity {
m.capacity = m.data.Gauge().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorGeneratedClassAverageSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorGeneratedClassAverageSize(cfg MetricConfig) metricSparkDriverCodeGeneratorGeneratedClassAverageSize {
m := metricSparkDriverCodeGeneratorGeneratedClassAverageSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorGeneratedClassCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.generated_class.count metric with initial data.
func (m *metricSparkDriverCodeGeneratorGeneratedClassCount) init() {
m.data.SetName("spark.driver.code_generator.generated_class.count")
m.data.SetDescription("Number of classes generated by the CodeGenerator.")
m.data.SetUnit("{ class }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverCodeGeneratorGeneratedClassCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorGeneratedClassCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorGeneratedClassCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorGeneratedClassCount(cfg MetricConfig) metricSparkDriverCodeGeneratorGeneratedClassCount {
m := metricSparkDriverCodeGeneratorGeneratedClassCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorGeneratedMethodAverageSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.generated_method.average_size metric with initial data.
func (m *metricSparkDriverCodeGeneratorGeneratedMethodAverageSize) init() {
m.data.SetName("spark.driver.code_generator.generated_method.average_size")
m.data.SetDescription("Average method size of the classes generated by the CodeGenerator.")
m.data.SetUnit("bytes")
m.data.SetEmptyGauge()
}
func (m *metricSparkDriverCodeGeneratorGeneratedMethodAverageSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) {
if !m.config.Enabled {
return
}
dp := m.data.Gauge().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetDoubleValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorGeneratedMethodAverageSize) updateCapacity() {
if m.data.Gauge().DataPoints().Len() > m.capacity {
m.capacity = m.data.Gauge().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorGeneratedMethodAverageSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorGeneratedMethodAverageSize(cfg MetricConfig) metricSparkDriverCodeGeneratorGeneratedMethodAverageSize {
m := metricSparkDriverCodeGeneratorGeneratedMethodAverageSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorGeneratedMethodCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.generated_method.count metric with initial data.
func (m *metricSparkDriverCodeGeneratorGeneratedMethodCount) init() {
m.data.SetName("spark.driver.code_generator.generated_method.count")
m.data.SetDescription("Number of methods generated by the CodeGenerator.")
m.data.SetUnit("{ method }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverCodeGeneratorGeneratedMethodCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorGeneratedMethodCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorGeneratedMethodCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorGeneratedMethodCount(cfg MetricConfig) metricSparkDriverCodeGeneratorGeneratedMethodCount {
m := metricSparkDriverCodeGeneratorGeneratedMethodCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorSourceCodeAverageSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.source_code.average_size metric with initial data.
func (m *metricSparkDriverCodeGeneratorSourceCodeAverageSize) init() {
m.data.SetName("spark.driver.code_generator.source_code.average_size")
m.data.SetDescription("Average size of the source code generated by a CodeGenerator code generation operation.")
m.data.SetUnit("bytes")
m.data.SetEmptyGauge()
}
func (m *metricSparkDriverCodeGeneratorSourceCodeAverageSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) {
if !m.config.Enabled {
return
}
dp := m.data.Gauge().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetDoubleValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorSourceCodeAverageSize) updateCapacity() {
if m.data.Gauge().DataPoints().Len() > m.capacity {
m.capacity = m.data.Gauge().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorSourceCodeAverageSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorSourceCodeAverageSize(cfg MetricConfig) metricSparkDriverCodeGeneratorSourceCodeAverageSize {
m := metricSparkDriverCodeGeneratorSourceCodeAverageSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverCodeGeneratorSourceCodeOperations struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.code_generator.source_code.operations metric with initial data.
func (m *metricSparkDriverCodeGeneratorSourceCodeOperations) init() {
m.data.SetName("spark.driver.code_generator.source_code.operations")
m.data.SetDescription("Number of source code generation operations performed by the CodeGenerator.")
m.data.SetUnit("{ operation }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverCodeGeneratorSourceCodeOperations) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverCodeGeneratorSourceCodeOperations) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverCodeGeneratorSourceCodeOperations) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverCodeGeneratorSourceCodeOperations(cfg MetricConfig) metricSparkDriverCodeGeneratorSourceCodeOperations {
m := metricSparkDriverCodeGeneratorSourceCodeOperations{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverDagSchedulerJobActive struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.dag_scheduler.job.active metric with initial data.
func (m *metricSparkDriverDagSchedulerJobActive) init() {
m.data.SetName("spark.driver.dag_scheduler.job.active")
m.data.SetDescription("Number of active jobs currently being processed by the DAGScheduler.")
m.data.SetUnit("{ job }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverDagSchedulerJobActive) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverDagSchedulerJobActive) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverDagSchedulerJobActive) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverDagSchedulerJobActive(cfg MetricConfig) metricSparkDriverDagSchedulerJobActive {
m := metricSparkDriverDagSchedulerJobActive{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverDagSchedulerJobCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.dag_scheduler.job.count metric with initial data.
func (m *metricSparkDriverDagSchedulerJobCount) init() {
m.data.SetName("spark.driver.dag_scheduler.job.count")
m.data.SetDescription("Number of jobs that have been submitted to the DAGScheduler.")
m.data.SetUnit("{ job }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverDagSchedulerJobCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverDagSchedulerJobCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverDagSchedulerJobCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverDagSchedulerJobCount(cfg MetricConfig) metricSparkDriverDagSchedulerJobCount {
m := metricSparkDriverDagSchedulerJobCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverDagSchedulerStageCount struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.dag_scheduler.stage.count metric with initial data.
func (m *metricSparkDriverDagSchedulerStageCount) init() {
m.data.SetName("spark.driver.dag_scheduler.stage.count")
m.data.SetDescription("Number of stages the DAGScheduler is either running or needs to run.")
m.data.SetUnit("{ stage }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverDagSchedulerStageCount) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, schedulerStatusAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("status", schedulerStatusAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverDagSchedulerStageCount) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverDagSchedulerStageCount) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverDagSchedulerStageCount(cfg MetricConfig) metricSparkDriverDagSchedulerStageCount {
m := metricSparkDriverDagSchedulerStageCount{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverDagSchedulerStageFailed struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.dag_scheduler.stage.failed metric with initial data.
func (m *metricSparkDriverDagSchedulerStageFailed) init() {
m.data.SetName("spark.driver.dag_scheduler.stage.failed")
m.data.SetDescription("Number of failed stages run by the DAGScheduler.")
m.data.SetUnit("{ stage }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverDagSchedulerStageFailed) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverDagSchedulerStageFailed) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverDagSchedulerStageFailed) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverDagSchedulerStageFailed(cfg MetricConfig) metricSparkDriverDagSchedulerStageFailed {
m := metricSparkDriverDagSchedulerStageFailed{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverExecutorGcOperations struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.executor.gc.operations metric with initial data.
func (m *metricSparkDriverExecutorGcOperations) init() {
m.data.SetName("spark.driver.executor.gc.operations")
m.data.SetDescription("Number of garbage collection operations performed by the driver.")
m.data.SetUnit("{ gc_operation }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverExecutorGcOperations) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, gcTypeAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("gc_type", gcTypeAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverExecutorGcOperations) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverExecutorGcOperations) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverExecutorGcOperations(cfg MetricConfig) metricSparkDriverExecutorGcOperations {
m := metricSparkDriverExecutorGcOperations{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverExecutorGcTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.executor.gc.time metric with initial data.
func (m *metricSparkDriverExecutorGcTime) init() {
m.data.SetName("spark.driver.executor.gc.time")
m.data.SetDescription("Total elapsed time during garbage collection operations performed by the driver.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverExecutorGcTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, gcTypeAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("gc_type", gcTypeAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverExecutorGcTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverExecutorGcTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverExecutorGcTime(cfg MetricConfig) metricSparkDriverExecutorGcTime {
m := metricSparkDriverExecutorGcTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverExecutorMemoryExecution struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.executor.memory.execution metric with initial data.
func (m *metricSparkDriverExecutorMemoryExecution) init() {
m.data.SetName("spark.driver.executor.memory.execution")
m.data.SetDescription("Amount of execution memory currently used by the driver.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverExecutorMemoryExecution) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, locationAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("location", locationAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverExecutorMemoryExecution) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverExecutorMemoryExecution) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverExecutorMemoryExecution(cfg MetricConfig) metricSparkDriverExecutorMemoryExecution {
m := metricSparkDriverExecutorMemoryExecution{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverExecutorMemoryJvm struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.executor.memory.jvm metric with initial data.
func (m *metricSparkDriverExecutorMemoryJvm) init() {
m.data.SetName("spark.driver.executor.memory.jvm")
m.data.SetDescription("Amount of memory used by the driver's JVM.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverExecutorMemoryJvm) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, locationAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("location", locationAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverExecutorMemoryJvm) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverExecutorMemoryJvm) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverExecutorMemoryJvm(cfg MetricConfig) metricSparkDriverExecutorMemoryJvm {
m := metricSparkDriverExecutorMemoryJvm{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverExecutorMemoryPool struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.executor.memory.pool metric with initial data.
func (m *metricSparkDriverExecutorMemoryPool) init() {
m.data.SetName("spark.driver.executor.memory.pool")
m.data.SetDescription("Amount of pool memory currently used by the driver.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverExecutorMemoryPool) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, poolMemoryTypeAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("type", poolMemoryTypeAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverExecutorMemoryPool) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverExecutorMemoryPool) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverExecutorMemoryPool(cfg MetricConfig) metricSparkDriverExecutorMemoryPool {
m := metricSparkDriverExecutorMemoryPool{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverExecutorMemoryStorage struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.executor.memory.storage metric with initial data.
func (m *metricSparkDriverExecutorMemoryStorage) init() {
m.data.SetName("spark.driver.executor.memory.storage")
m.data.SetDescription("Amount of storage memory currently used by the driver.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkDriverExecutorMemoryStorage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, locationAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("location", locationAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverExecutorMemoryStorage) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverExecutorMemoryStorage) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverExecutorMemoryStorage(cfg MetricConfig) metricSparkDriverExecutorMemoryStorage {
m := metricSparkDriverExecutorMemoryStorage{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverHiveExternalCatalogFileCacheHits struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.hive_external_catalog.file_cache_hits metric with initial data.
func (m *metricSparkDriverHiveExternalCatalogFileCacheHits) init() {
m.data.SetName("spark.driver.hive_external_catalog.file_cache_hits")
m.data.SetDescription("Number of file cache hits on the HiveExternalCatalog.")
m.data.SetUnit("{ hit }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverHiveExternalCatalogFileCacheHits) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverHiveExternalCatalogFileCacheHits) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverHiveExternalCatalogFileCacheHits) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverHiveExternalCatalogFileCacheHits(cfg MetricConfig) metricSparkDriverHiveExternalCatalogFileCacheHits {
m := metricSparkDriverHiveExternalCatalogFileCacheHits{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverHiveExternalCatalogFilesDiscovered struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.hive_external_catalog.files_discovered metric with initial data.
func (m *metricSparkDriverHiveExternalCatalogFilesDiscovered) init() {
m.data.SetName("spark.driver.hive_external_catalog.files_discovered")
m.data.SetDescription("Number of files discovered while listing the partitions of a table in the Hive metastore")
m.data.SetUnit("{ file }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverHiveExternalCatalogFilesDiscovered) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverHiveExternalCatalogFilesDiscovered) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverHiveExternalCatalogFilesDiscovered) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverHiveExternalCatalogFilesDiscovered(cfg MetricConfig) metricSparkDriverHiveExternalCatalogFilesDiscovered {
m := metricSparkDriverHiveExternalCatalogFilesDiscovered{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverHiveExternalCatalogHiveClientCalls struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.hive_external_catalog.hive_client_calls metric with initial data.
func (m *metricSparkDriverHiveExternalCatalogHiveClientCalls) init() {
m.data.SetName("spark.driver.hive_external_catalog.hive_client_calls")
m.data.SetDescription("Number of calls to the underlying Hive Metastore client made by the Spark application.")
m.data.SetUnit("{ call }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverHiveExternalCatalogHiveClientCalls) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverHiveExternalCatalogHiveClientCalls) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverHiveExternalCatalogHiveClientCalls) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverHiveExternalCatalogHiveClientCalls(cfg MetricConfig) metricSparkDriverHiveExternalCatalogHiveClientCalls {
m := metricSparkDriverHiveExternalCatalogHiveClientCalls{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverHiveExternalCatalogParallelListingJobs struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.hive_external_catalog.parallel_listing_jobs metric with initial data.
func (m *metricSparkDriverHiveExternalCatalogParallelListingJobs) init() {
m.data.SetName("spark.driver.hive_external_catalog.parallel_listing_jobs")
m.data.SetDescription("Number of parallel listing jobs initiated by the HiveExternalCatalog when listing partitions of a table.")
m.data.SetUnit("{ listing_job }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverHiveExternalCatalogParallelListingJobs) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverHiveExternalCatalogParallelListingJobs) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverHiveExternalCatalogParallelListingJobs) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverHiveExternalCatalogParallelListingJobs(cfg MetricConfig) metricSparkDriverHiveExternalCatalogParallelListingJobs {
m := metricSparkDriverHiveExternalCatalogParallelListingJobs{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverHiveExternalCatalogPartitionsFetched struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.hive_external_catalog.partitions_fetched metric with initial data.
func (m *metricSparkDriverHiveExternalCatalogPartitionsFetched) init() {
m.data.SetName("spark.driver.hive_external_catalog.partitions_fetched")
m.data.SetDescription("Table partitions fetched by the HiveExternalCatalog.")
m.data.SetUnit("{ partition }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverHiveExternalCatalogPartitionsFetched) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverHiveExternalCatalogPartitionsFetched) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverHiveExternalCatalogPartitionsFetched) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverHiveExternalCatalogPartitionsFetched(cfg MetricConfig) metricSparkDriverHiveExternalCatalogPartitionsFetched {
m := metricSparkDriverHiveExternalCatalogPartitionsFetched{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverJvmCPUTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.jvm_cpu_time metric with initial data.
func (m *metricSparkDriverJvmCPUTime) init() {
m.data.SetName("spark.driver.jvm_cpu_time")
m.data.SetDescription("Current CPU time taken by the Spark driver.")
m.data.SetUnit("ns")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverJvmCPUTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverJvmCPUTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverJvmCPUTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverJvmCPUTime(cfg MetricConfig) metricSparkDriverJvmCPUTime {
m := metricSparkDriverJvmCPUTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverLiveListenerBusDropped struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.live_listener_bus.dropped metric with initial data.
func (m *metricSparkDriverLiveListenerBusDropped) init() {
m.data.SetName("spark.driver.live_listener_bus.dropped")
m.data.SetDescription("Number of events that have been dropped by the LiveListenerBus.")
m.data.SetUnit("{ event }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverLiveListenerBusDropped) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverLiveListenerBusDropped) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverLiveListenerBusDropped) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverLiveListenerBusDropped(cfg MetricConfig) metricSparkDriverLiveListenerBusDropped {
m := metricSparkDriverLiveListenerBusDropped{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverLiveListenerBusPosted struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.live_listener_bus.posted metric with initial data.
func (m *metricSparkDriverLiveListenerBusPosted) init() {
m.data.SetName("spark.driver.live_listener_bus.posted")
m.data.SetDescription("Number of events that have been posted on the LiveListenerBus.")
m.data.SetUnit("{ event }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverLiveListenerBusPosted) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverLiveListenerBusPosted) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverLiveListenerBusPosted) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverLiveListenerBusPosted(cfg MetricConfig) metricSparkDriverLiveListenerBusPosted {
m := metricSparkDriverLiveListenerBusPosted{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverLiveListenerBusProcessingTimeAverage struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.live_listener_bus.processing_time.average metric with initial data.
func (m *metricSparkDriverLiveListenerBusProcessingTimeAverage) init() {
m.data.SetName("spark.driver.live_listener_bus.processing_time.average")
m.data.SetDescription("Average time taken for the LiveListenerBus to process an event posted to it.")
m.data.SetUnit("ms")
m.data.SetEmptyGauge()
}
func (m *metricSparkDriverLiveListenerBusProcessingTimeAverage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64) {
if !m.config.Enabled {
return
}
dp := m.data.Gauge().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetDoubleValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverLiveListenerBusProcessingTimeAverage) updateCapacity() {
if m.data.Gauge().DataPoints().Len() > m.capacity {
m.capacity = m.data.Gauge().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverLiveListenerBusProcessingTimeAverage) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverLiveListenerBusProcessingTimeAverage(cfg MetricConfig) metricSparkDriverLiveListenerBusProcessingTimeAverage {
m := metricSparkDriverLiveListenerBusProcessingTimeAverage{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkDriverLiveListenerBusQueueSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.driver.live_listener_bus.queue_size metric with initial data.
func (m *metricSparkDriverLiveListenerBusQueueSize) init() {
m.data.SetName("spark.driver.live_listener_bus.queue_size")
m.data.SetDescription("Number of events currently waiting to be processed by the LiveListenerBus.")
m.data.SetUnit("{ event }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkDriverLiveListenerBusQueueSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkDriverLiveListenerBusQueueSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkDriverLiveListenerBusQueueSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkDriverLiveListenerBusQueueSize(cfg MetricConfig) metricSparkDriverLiveListenerBusQueueSize {
m := metricSparkDriverLiveListenerBusQueueSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorDiskUsage struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.disk.usage metric with initial data.
func (m *metricSparkExecutorDiskUsage) init() {
m.data.SetName("spark.executor.disk.usage")
m.data.SetDescription("Disk space used by this executor for RDD storage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkExecutorDiskUsage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorDiskUsage) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorDiskUsage) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorDiskUsage(cfg MetricConfig) metricSparkExecutorDiskUsage {
m := metricSparkExecutorDiskUsage{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorGcTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.gc_time metric with initial data.
func (m *metricSparkExecutorGcTime) init() {
m.data.SetName("spark.executor.gc_time")
m.data.SetDescription("Elapsed time the JVM spent in garbage collection in this executor.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkExecutorGcTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorGcTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorGcTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorGcTime(cfg MetricConfig) metricSparkExecutorGcTime {
m := metricSparkExecutorGcTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorInputSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.input_size metric with initial data.
func (m *metricSparkExecutorInputSize) init() {
m.data.SetName("spark.executor.input_size")
m.data.SetDescription("Amount of data input for this executor.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkExecutorInputSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorInputSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorInputSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorInputSize(cfg MetricConfig) metricSparkExecutorInputSize {
m := metricSparkExecutorInputSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorMemoryUsage struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.memory.usage metric with initial data.
func (m *metricSparkExecutorMemoryUsage) init() {
m.data.SetName("spark.executor.memory.usage")
m.data.SetDescription("Storage memory used by this executor.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkExecutorMemoryUsage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorMemoryUsage) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorMemoryUsage) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorMemoryUsage(cfg MetricConfig) metricSparkExecutorMemoryUsage {
m := metricSparkExecutorMemoryUsage{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorShuffleIoSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.shuffle.io.size metric with initial data.
func (m *metricSparkExecutorShuffleIoSize) init() {
m.data.SetName("spark.executor.shuffle.io.size")
m.data.SetDescription("Amount of data written and read during shuffle operations for this executor.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkExecutorShuffleIoSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, directionAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("direction", directionAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorShuffleIoSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorShuffleIoSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorShuffleIoSize(cfg MetricConfig) metricSparkExecutorShuffleIoSize {
m := metricSparkExecutorShuffleIoSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorStorageMemoryUsage struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.storage_memory.usage metric with initial data.
func (m *metricSparkExecutorStorageMemoryUsage) init() {
m.data.SetName("spark.executor.storage_memory.usage")
m.data.SetDescription("The executor's storage memory usage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkExecutorStorageMemoryUsage) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, locationAttributeValue string, stateAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("location", locationAttributeValue)
dp.Attributes().PutStr("state", stateAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorStorageMemoryUsage) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorStorageMemoryUsage) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorStorageMemoryUsage(cfg MetricConfig) metricSparkExecutorStorageMemoryUsage {
m := metricSparkExecutorStorageMemoryUsage{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorTaskActive struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.task.active metric with initial data.
func (m *metricSparkExecutorTaskActive) init() {
m.data.SetName("spark.executor.task.active")
m.data.SetDescription("Number of tasks currently running in this executor.")
m.data.SetUnit("{ task }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkExecutorTaskActive) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorTaskActive) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorTaskActive) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorTaskActive(cfg MetricConfig) metricSparkExecutorTaskActive {
m := metricSparkExecutorTaskActive{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorTaskLimit struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.task.limit metric with initial data.
func (m *metricSparkExecutorTaskLimit) init() {
m.data.SetName("spark.executor.task.limit")
m.data.SetDescription("Maximum number of tasks that can run concurrently in this executor.")
m.data.SetUnit("{ task }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkExecutorTaskLimit) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorTaskLimit) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorTaskLimit) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorTaskLimit(cfg MetricConfig) metricSparkExecutorTaskLimit {
m := metricSparkExecutorTaskLimit{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorTaskResult struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.task.result metric with initial data.
func (m *metricSparkExecutorTaskResult) init() {
m.data.SetName("spark.executor.task.result")
m.data.SetDescription("Number of tasks with a specific result in this executor.")
m.data.SetUnit("{ task }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkExecutorTaskResult) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, executorTaskResultAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("result", executorTaskResultAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorTaskResult) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorTaskResult) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorTaskResult(cfg MetricConfig) metricSparkExecutorTaskResult {
m := metricSparkExecutorTaskResult{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkExecutorTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.executor.time metric with initial data.
func (m *metricSparkExecutorTime) init() {
m.data.SetName("spark.executor.time")
m.data.SetDescription("Elapsed time the JVM spent executing tasks in this executor.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkExecutorTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkExecutorTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkExecutorTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkExecutorTime(cfg MetricConfig) metricSparkExecutorTime {
m := metricSparkExecutorTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkJobStageActive struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.job.stage.active metric with initial data.
func (m *metricSparkJobStageActive) init() {
m.data.SetName("spark.job.stage.active")
m.data.SetDescription("Number of active stages in this job.")
m.data.SetUnit("{ stage }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkJobStageActive) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkJobStageActive) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkJobStageActive) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkJobStageActive(cfg MetricConfig) metricSparkJobStageActive {
m := metricSparkJobStageActive{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkJobStageResult struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.job.stage.result metric with initial data.
func (m *metricSparkJobStageResult) init() {
m.data.SetName("spark.job.stage.result")
m.data.SetDescription("Number of stages with a specific result in this job.")
m.data.SetUnit("{ stage }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkJobStageResult) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, jobResultAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("result", jobResultAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkJobStageResult) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkJobStageResult) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkJobStageResult(cfg MetricConfig) metricSparkJobStageResult {
m := metricSparkJobStageResult{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkJobTaskActive struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.job.task.active metric with initial data.
func (m *metricSparkJobTaskActive) init() {
m.data.SetName("spark.job.task.active")
m.data.SetDescription("Number of active tasks in this job.")
m.data.SetUnit("{ task }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkJobTaskActive) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkJobTaskActive) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkJobTaskActive) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkJobTaskActive(cfg MetricConfig) metricSparkJobTaskActive {
m := metricSparkJobTaskActive{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkJobTaskResult struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.job.task.result metric with initial data.
func (m *metricSparkJobTaskResult) init() {
m.data.SetName("spark.job.task.result")
m.data.SetDescription("Number of tasks with a specific result in this job.")
m.data.SetUnit("{ task }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkJobTaskResult) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, jobResultAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("result", jobResultAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkJobTaskResult) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkJobTaskResult) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkJobTaskResult(cfg MetricConfig) metricSparkJobTaskResult {
m := metricSparkJobTaskResult{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageDiskSpilled struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.disk.spilled metric with initial data.
func (m *metricSparkStageDiskSpilled) init() {
m.data.SetName("spark.stage.disk.spilled")
m.data.SetDescription("The amount of disk space used for storing portions of overly large data chunks that couldn't fit in memory in this stage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageDiskSpilled) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageDiskSpilled) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageDiskSpilled) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageDiskSpilled(cfg MetricConfig) metricSparkStageDiskSpilled {
m := metricSparkStageDiskSpilled{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageExecutorCPUTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.executor.cpu_time metric with initial data.
func (m *metricSparkStageExecutorCPUTime) init() {
m.data.SetName("spark.stage.executor.cpu_time")
m.data.SetDescription("CPU time spent by the executor in this stage.")
m.data.SetUnit("ns")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageExecutorCPUTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageExecutorCPUTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageExecutorCPUTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageExecutorCPUTime(cfg MetricConfig) metricSparkStageExecutorCPUTime {
m := metricSparkStageExecutorCPUTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageExecutorRunTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.executor.run_time metric with initial data.
func (m *metricSparkStageExecutorRunTime) init() {
m.data.SetName("spark.stage.executor.run_time")
m.data.SetDescription("Amount of time spent by the executor in this stage.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageExecutorRunTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageExecutorRunTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageExecutorRunTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageExecutorRunTime(cfg MetricConfig) metricSparkStageExecutorRunTime {
m := metricSparkStageExecutorRunTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageIoRecords struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.io.records metric with initial data.
func (m *metricSparkStageIoRecords) init() {
m.data.SetName("spark.stage.io.records")
m.data.SetDescription("Number of records written and read in this stage.")
m.data.SetUnit("{ record }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkStageIoRecords) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, directionAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("direction", directionAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageIoRecords) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageIoRecords) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageIoRecords(cfg MetricConfig) metricSparkStageIoRecords {
m := metricSparkStageIoRecords{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageIoSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.io.size metric with initial data.
func (m *metricSparkStageIoSize) init() {
m.data.SetName("spark.stage.io.size")
m.data.SetDescription("Amount of data written and read at this stage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkStageIoSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, directionAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("direction", directionAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageIoSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageIoSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageIoSize(cfg MetricConfig) metricSparkStageIoSize {
m := metricSparkStageIoSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageJvmGcTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.jvm_gc_time metric with initial data.
func (m *metricSparkStageJvmGcTime) init() {
m.data.SetName("spark.stage.jvm_gc_time")
m.data.SetDescription("The amount of time the JVM spent on garbage collection in this stage.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageJvmGcTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageJvmGcTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageJvmGcTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageJvmGcTime(cfg MetricConfig) metricSparkStageJvmGcTime {
m := metricSparkStageJvmGcTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageMemoryPeak struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.memory.peak metric with initial data.
func (m *metricSparkStageMemoryPeak) init() {
m.data.SetName("spark.stage.memory.peak")
m.data.SetDescription("Peak memory used by internal data structures created during shuffles, aggregations and joins in this stage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageMemoryPeak) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageMemoryPeak) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageMemoryPeak) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageMemoryPeak(cfg MetricConfig) metricSparkStageMemoryPeak {
m := metricSparkStageMemoryPeak{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageMemorySpilled struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.memory.spilled metric with initial data.
func (m *metricSparkStageMemorySpilled) init() {
m.data.SetName("spark.stage.memory.spilled")
m.data.SetDescription("The amount of memory moved to disk due to size constraints (spilled) in this stage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageMemorySpilled) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageMemorySpilled) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageMemorySpilled) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageMemorySpilled(cfg MetricConfig) metricSparkStageMemorySpilled {
m := metricSparkStageMemorySpilled{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageShuffleBlocksFetched struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.shuffle.blocks_fetched metric with initial data.
func (m *metricSparkStageShuffleBlocksFetched) init() {
m.data.SetName("spark.stage.shuffle.blocks_fetched")
m.data.SetDescription("Number of blocks fetched in shuffle operations in this stage.")
m.data.SetUnit("{ block }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkStageShuffleBlocksFetched) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, sourceAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("source", sourceAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageShuffleBlocksFetched) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageShuffleBlocksFetched) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageShuffleBlocksFetched(cfg MetricConfig) metricSparkStageShuffleBlocksFetched {
m := metricSparkStageShuffleBlocksFetched{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageShuffleFetchWaitTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.shuffle.fetch_wait_time metric with initial data.
func (m *metricSparkStageShuffleFetchWaitTime) init() {
m.data.SetName("spark.stage.shuffle.fetch_wait_time")
m.data.SetDescription("Time spent in this stage waiting for remote shuffle blocks.")
m.data.SetUnit("ms")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageShuffleFetchWaitTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageShuffleFetchWaitTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageShuffleFetchWaitTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageShuffleFetchWaitTime(cfg MetricConfig) metricSparkStageShuffleFetchWaitTime {
m := metricSparkStageShuffleFetchWaitTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageShuffleIoDisk struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.shuffle.io.disk metric with initial data.
func (m *metricSparkStageShuffleIoDisk) init() {
m.data.SetName("spark.stage.shuffle.io.disk")
m.data.SetDescription("Amount of data read to disk in shuffle operations (sometimes required for large blocks, as opposed to the default behavior of reading into memory).")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageShuffleIoDisk) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageShuffleIoDisk) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageShuffleIoDisk) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageShuffleIoDisk(cfg MetricConfig) metricSparkStageShuffleIoDisk {
m := metricSparkStageShuffleIoDisk{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageShuffleIoReadSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.shuffle.io.read.size metric with initial data.
func (m *metricSparkStageShuffleIoReadSize) init() {
m.data.SetName("spark.stage.shuffle.io.read.size")
m.data.SetDescription("Amount of data read in shuffle operations in this stage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkStageShuffleIoReadSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, sourceAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("source", sourceAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageShuffleIoReadSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageShuffleIoReadSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageShuffleIoReadSize(cfg MetricConfig) metricSparkStageShuffleIoReadSize {
m := metricSparkStageShuffleIoReadSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageShuffleIoRecords struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.shuffle.io.records metric with initial data.
func (m *metricSparkStageShuffleIoRecords) init() {
m.data.SetName("spark.stage.shuffle.io.records")
m.data.SetDescription("Number of records written or read in shuffle operations in this stage.")
m.data.SetUnit("{ record }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkStageShuffleIoRecords) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, directionAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("direction", directionAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageShuffleIoRecords) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageShuffleIoRecords) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageShuffleIoRecords(cfg MetricConfig) metricSparkStageShuffleIoRecords {
m := metricSparkStageShuffleIoRecords{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageShuffleIoWriteSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.shuffle.io.write.size metric with initial data.
func (m *metricSparkStageShuffleIoWriteSize) init() {
m.data.SetName("spark.stage.shuffle.io.write.size")
m.data.SetDescription("Amount of data written in shuffle operations in this stage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageShuffleIoWriteSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageShuffleIoWriteSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageShuffleIoWriteSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageShuffleIoWriteSize(cfg MetricConfig) metricSparkStageShuffleIoWriteSize {
m := metricSparkStageShuffleIoWriteSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageShuffleWriteTime struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.shuffle.write_time metric with initial data.
func (m *metricSparkStageShuffleWriteTime) init() {
m.data.SetName("spark.stage.shuffle.write_time")
m.data.SetDescription("Time spent blocking on writes to disk or buffer cache in this stage.")
m.data.SetUnit("ns")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageShuffleWriteTime) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageShuffleWriteTime) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageShuffleWriteTime) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageShuffleWriteTime(cfg MetricConfig) metricSparkStageShuffleWriteTime {
m := metricSparkStageShuffleWriteTime{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageStatus struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.status metric with initial data.
func (m *metricSparkStageStatus) init() {
m.data.SetName("spark.stage.status")
m.data.SetDescription("A one-hot encoding representing the status of this stage.")
m.data.SetUnit("{ status }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkStageStatus) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, stageActiveAttributeValue bool, stageCompleteAttributeValue bool, stagePendingAttributeValue bool, stageFailedAttributeValue bool) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutBool("active", stageActiveAttributeValue)
dp.Attributes().PutBool("complete", stageCompleteAttributeValue)
dp.Attributes().PutBool("pending", stagePendingAttributeValue)
dp.Attributes().PutBool("failed", stageFailedAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageStatus) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageStatus) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageStatus(cfg MetricConfig) metricSparkStageStatus {
m := metricSparkStageStatus{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageTaskActive struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.task.active metric with initial data.
func (m *metricSparkStageTaskActive) init() {
m.data.SetName("spark.stage.task.active")
m.data.SetDescription("Number of active tasks in this stage.")
m.data.SetUnit("{ task }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(false)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageTaskActive) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageTaskActive) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageTaskActive) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageTaskActive(cfg MetricConfig) metricSparkStageTaskActive {
m := metricSparkStageTaskActive{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageTaskResult struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.task.result metric with initial data.
func (m *metricSparkStageTaskResult) init() {
m.data.SetName("spark.stage.task.result")
m.data.SetDescription("Number of tasks with a specific result in this stage.")
m.data.SetUnit("{ task }")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
m.data.Sum().DataPoints().EnsureCapacity(m.capacity)
}
func (m *metricSparkStageTaskResult) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64, stageTaskResultAttributeValue string) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
dp.Attributes().PutStr("result", stageTaskResultAttributeValue)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageTaskResult) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageTaskResult) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageTaskResult(cfg MetricConfig) metricSparkStageTaskResult {
m := metricSparkStageTaskResult{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
type metricSparkStageTaskResultSize struct {
data pmetric.Metric // data buffer for generated metric.
config MetricConfig // metric config provided by user.
capacity int // max observed number of data points added to the metric.
}
// init fills spark.stage.task.result_size metric with initial data.
func (m *metricSparkStageTaskResultSize) init() {
m.data.SetName("spark.stage.task.result_size")
m.data.SetDescription("The amount of data transmitted back to the driver by all the tasks in this stage.")
m.data.SetUnit("bytes")
m.data.SetEmptySum()
m.data.Sum().SetIsMonotonic(true)
m.data.Sum().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
}
func (m *metricSparkStageTaskResultSize) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val int64) {
if !m.config.Enabled {
return
}
dp := m.data.Sum().DataPoints().AppendEmpty()
dp.SetStartTimestamp(start)
dp.SetTimestamp(ts)
dp.SetIntValue(val)
}
// updateCapacity saves max length of data point slices that will be used for the slice capacity.
func (m *metricSparkStageTaskResultSize) updateCapacity() {
if m.data.Sum().DataPoints().Len() > m.capacity {
m.capacity = m.data.Sum().DataPoints().Len()
}
}
// emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points.
func (m *metricSparkStageTaskResultSize) emit(metrics pmetric.MetricSlice) {
if m.config.Enabled && m.data.Sum().DataPoints().Len() > 0 {
m.updateCapacity()
m.data.MoveTo(metrics.AppendEmpty())
m.init()
}
}
func newMetricSparkStageTaskResultSize(cfg MetricConfig) metricSparkStageTaskResultSize {
m := metricSparkStageTaskResultSize{config: cfg}
if cfg.Enabled {
m.data = pmetric.NewMetric()
m.init()
}
return m
}
// MetricsBuilder provides an interface for scrapers to report metrics while taking care of all the transformations
// required to produce metric representation defined in metadata and user config.
type MetricsBuilder struct {
config MetricsBuilderConfig // config of the metrics builder.
startTime pcommon.Timestamp // start time that will be applied to all recorded data points.
metricsCapacity int // maximum observed number of metrics per resource.
metricsBuffer pmetric.Metrics // accumulates metrics data before emitting.
buildInfo component.BuildInfo // contains version information.
resourceAttributeIncludeFilter map[string]filter.Filter
resourceAttributeExcludeFilter map[string]filter.Filter
metricSparkDriverBlockManagerDiskUsage metricSparkDriverBlockManagerDiskUsage
metricSparkDriverBlockManagerMemoryUsage metricSparkDriverBlockManagerMemoryUsage
metricSparkDriverCodeGeneratorCompilationAverageTime metricSparkDriverCodeGeneratorCompilationAverageTime
metricSparkDriverCodeGeneratorCompilationCount metricSparkDriverCodeGeneratorCompilationCount
metricSparkDriverCodeGeneratorGeneratedClassAverageSize metricSparkDriverCodeGeneratorGeneratedClassAverageSize
metricSparkDriverCodeGeneratorGeneratedClassCount metricSparkDriverCodeGeneratorGeneratedClassCount
metricSparkDriverCodeGeneratorGeneratedMethodAverageSize metricSparkDriverCodeGeneratorGeneratedMethodAverageSize
metricSparkDriverCodeGeneratorGeneratedMethodCount metricSparkDriverCodeGeneratorGeneratedMethodCount
metricSparkDriverCodeGeneratorSourceCodeAverageSize metricSparkDriverCodeGeneratorSourceCodeAverageSize
metricSparkDriverCodeGeneratorSourceCodeOperations metricSparkDriverCodeGeneratorSourceCodeOperations
metricSparkDriverDagSchedulerJobActive metricSparkDriverDagSchedulerJobActive
metricSparkDriverDagSchedulerJobCount metricSparkDriverDagSchedulerJobCount
metricSparkDriverDagSchedulerStageCount metricSparkDriverDagSchedulerStageCount
metricSparkDriverDagSchedulerStageFailed metricSparkDriverDagSchedulerStageFailed
metricSparkDriverExecutorGcOperations metricSparkDriverExecutorGcOperations
metricSparkDriverExecutorGcTime metricSparkDriverExecutorGcTime
metricSparkDriverExecutorMemoryExecution metricSparkDriverExecutorMemoryExecution
metricSparkDriverExecutorMemoryJvm metricSparkDriverExecutorMemoryJvm
metricSparkDriverExecutorMemoryPool metricSparkDriverExecutorMemoryPool
metricSparkDriverExecutorMemoryStorage metricSparkDriverExecutorMemoryStorage
metricSparkDriverHiveExternalCatalogFileCacheHits metricSparkDriverHiveExternalCatalogFileCacheHits
metricSparkDriverHiveExternalCatalogFilesDiscovered metricSparkDriverHiveExternalCatalogFilesDiscovered
metricSparkDriverHiveExternalCatalogHiveClientCalls metricSparkDriverHiveExternalCatalogHiveClientCalls
metricSparkDriverHiveExternalCatalogParallelListingJobs metricSparkDriverHiveExternalCatalogParallelListingJobs
metricSparkDriverHiveExternalCatalogPartitionsFetched metricSparkDriverHiveExternalCatalogPartitionsFetched
metricSparkDriverJvmCPUTime metricSparkDriverJvmCPUTime
metricSparkDriverLiveListenerBusDropped metricSparkDriverLiveListenerBusDropped
metricSparkDriverLiveListenerBusPosted metricSparkDriverLiveListenerBusPosted
metricSparkDriverLiveListenerBusProcessingTimeAverage metricSparkDriverLiveListenerBusProcessingTimeAverage
metricSparkDriverLiveListenerBusQueueSize metricSparkDriverLiveListenerBusQueueSize
metricSparkExecutorDiskUsage metricSparkExecutorDiskUsage
metricSparkExecutorGcTime metricSparkExecutorGcTime
metricSparkExecutorInputSize metricSparkExecutorInputSize
metricSparkExecutorMemoryUsage metricSparkExecutorMemoryUsage
metricSparkExecutorShuffleIoSize metricSparkExecutorShuffleIoSize
metricSparkExecutorStorageMemoryUsage metricSparkExecutorStorageMemoryUsage
metricSparkExecutorTaskActive metricSparkExecutorTaskActive
metricSparkExecutorTaskLimit metricSparkExecutorTaskLimit
metricSparkExecutorTaskResult metricSparkExecutorTaskResult
metricSparkExecutorTime metricSparkExecutorTime
metricSparkJobStageActive metricSparkJobStageActive
metricSparkJobStageResult metricSparkJobStageResult
metricSparkJobTaskActive metricSparkJobTaskActive
metricSparkJobTaskResult metricSparkJobTaskResult
metricSparkStageDiskSpilled metricSparkStageDiskSpilled
metricSparkStageExecutorCPUTime metricSparkStageExecutorCPUTime
metricSparkStageExecutorRunTime metricSparkStageExecutorRunTime
metricSparkStageIoRecords metricSparkStageIoRecords
metricSparkStageIoSize metricSparkStageIoSize
metricSparkStageJvmGcTime metricSparkStageJvmGcTime
metricSparkStageMemoryPeak metricSparkStageMemoryPeak
metricSparkStageMemorySpilled metricSparkStageMemorySpilled
metricSparkStageShuffleBlocksFetched metricSparkStageShuffleBlocksFetched
metricSparkStageShuffleFetchWaitTime metricSparkStageShuffleFetchWaitTime
metricSparkStageShuffleIoDisk metricSparkStageShuffleIoDisk
metricSparkStageShuffleIoReadSize metricSparkStageShuffleIoReadSize
metricSparkStageShuffleIoRecords metricSparkStageShuffleIoRecords
metricSparkStageShuffleIoWriteSize metricSparkStageShuffleIoWriteSize
metricSparkStageShuffleWriteTime metricSparkStageShuffleWriteTime
metricSparkStageStatus metricSparkStageStatus
metricSparkStageTaskActive metricSparkStageTaskActive
metricSparkStageTaskResult metricSparkStageTaskResult
metricSparkStageTaskResultSize metricSparkStageTaskResultSize
}
// MetricBuilderOption applies changes to default metrics builder.
type MetricBuilderOption interface {
apply(*MetricsBuilder)
}
type metricBuilderOptionFunc func(mb *MetricsBuilder)
func (mbof metricBuilderOptionFunc) apply(mb *MetricsBuilder) {
mbof(mb)
}
// WithStartTime sets startTime on the metrics builder.
func WithStartTime(startTime pcommon.Timestamp) MetricBuilderOption {
return metricBuilderOptionFunc(func(mb *MetricsBuilder) {
mb.startTime = startTime
})
}
func NewMetricsBuilder(mbc MetricsBuilderConfig, settings receiver.Settings, options ...MetricBuilderOption) *MetricsBuilder {
mb := &MetricsBuilder{
config: mbc,
startTime: pcommon.NewTimestampFromTime(time.Now()),
metricsBuffer: pmetric.NewMetrics(),
buildInfo: settings.BuildInfo,
metricSparkDriverBlockManagerDiskUsage: newMetricSparkDriverBlockManagerDiskUsage(mbc.Metrics.SparkDriverBlockManagerDiskUsage),
metricSparkDriverBlockManagerMemoryUsage: newMetricSparkDriverBlockManagerMemoryUsage(mbc.Metrics.SparkDriverBlockManagerMemoryUsage),
metricSparkDriverCodeGeneratorCompilationAverageTime: newMetricSparkDriverCodeGeneratorCompilationAverageTime(mbc.Metrics.SparkDriverCodeGeneratorCompilationAverageTime),
metricSparkDriverCodeGeneratorCompilationCount: newMetricSparkDriverCodeGeneratorCompilationCount(mbc.Metrics.SparkDriverCodeGeneratorCompilationCount),
metricSparkDriverCodeGeneratorGeneratedClassAverageSize: newMetricSparkDriverCodeGeneratorGeneratedClassAverageSize(mbc.Metrics.SparkDriverCodeGeneratorGeneratedClassAverageSize),
metricSparkDriverCodeGeneratorGeneratedClassCount: newMetricSparkDriverCodeGeneratorGeneratedClassCount(mbc.Metrics.SparkDriverCodeGeneratorGeneratedClassCount),
metricSparkDriverCodeGeneratorGeneratedMethodAverageSize: newMetricSparkDriverCodeGeneratorGeneratedMethodAverageSize(mbc.Metrics.SparkDriverCodeGeneratorGeneratedMethodAverageSize),
metricSparkDriverCodeGeneratorGeneratedMethodCount: newMetricSparkDriverCodeGeneratorGeneratedMethodCount(mbc.Metrics.SparkDriverCodeGeneratorGeneratedMethodCount),
metricSparkDriverCodeGeneratorSourceCodeAverageSize: newMetricSparkDriverCodeGeneratorSourceCodeAverageSize(mbc.Metrics.SparkDriverCodeGeneratorSourceCodeAverageSize),
metricSparkDriverCodeGeneratorSourceCodeOperations: newMetricSparkDriverCodeGeneratorSourceCodeOperations(mbc.Metrics.SparkDriverCodeGeneratorSourceCodeOperations),
metricSparkDriverDagSchedulerJobActive: newMetricSparkDriverDagSchedulerJobActive(mbc.Metrics.SparkDriverDagSchedulerJobActive),
metricSparkDriverDagSchedulerJobCount: newMetricSparkDriverDagSchedulerJobCount(mbc.Metrics.SparkDriverDagSchedulerJobCount),
metricSparkDriverDagSchedulerStageCount: newMetricSparkDriverDagSchedulerStageCount(mbc.Metrics.SparkDriverDagSchedulerStageCount),
metricSparkDriverDagSchedulerStageFailed: newMetricSparkDriverDagSchedulerStageFailed(mbc.Metrics.SparkDriverDagSchedulerStageFailed),
metricSparkDriverExecutorGcOperations: newMetricSparkDriverExecutorGcOperations(mbc.Metrics.SparkDriverExecutorGcOperations),
metricSparkDriverExecutorGcTime: newMetricSparkDriverExecutorGcTime(mbc.Metrics.SparkDriverExecutorGcTime),
metricSparkDriverExecutorMemoryExecution: newMetricSparkDriverExecutorMemoryExecution(mbc.Metrics.SparkDriverExecutorMemoryExecution),
metricSparkDriverExecutorMemoryJvm: newMetricSparkDriverExecutorMemoryJvm(mbc.Metrics.SparkDriverExecutorMemoryJvm),
metricSparkDriverExecutorMemoryPool: newMetricSparkDriverExecutorMemoryPool(mbc.Metrics.SparkDriverExecutorMemoryPool),
metricSparkDriverExecutorMemoryStorage: newMetricSparkDriverExecutorMemoryStorage(mbc.Metrics.SparkDriverExecutorMemoryStorage),
metricSparkDriverHiveExternalCatalogFileCacheHits: newMetricSparkDriverHiveExternalCatalogFileCacheHits(mbc.Metrics.SparkDriverHiveExternalCatalogFileCacheHits),
metricSparkDriverHiveExternalCatalogFilesDiscovered: newMetricSparkDriverHiveExternalCatalogFilesDiscovered(mbc.Metrics.SparkDriverHiveExternalCatalogFilesDiscovered),
metricSparkDriverHiveExternalCatalogHiveClientCalls: newMetricSparkDriverHiveExternalCatalogHiveClientCalls(mbc.Metrics.SparkDriverHiveExternalCatalogHiveClientCalls),
metricSparkDriverHiveExternalCatalogParallelListingJobs: newMetricSparkDriverHiveExternalCatalogParallelListingJobs(mbc.Metrics.SparkDriverHiveExternalCatalogParallelListingJobs),
metricSparkDriverHiveExternalCatalogPartitionsFetched: newMetricSparkDriverHiveExternalCatalogPartitionsFetched(mbc.Metrics.SparkDriverHiveExternalCatalogPartitionsFetched),
metricSparkDriverJvmCPUTime: newMetricSparkDriverJvmCPUTime(mbc.Metrics.SparkDriverJvmCPUTime),
metricSparkDriverLiveListenerBusDropped: newMetricSparkDriverLiveListenerBusDropped(mbc.Metrics.SparkDriverLiveListenerBusDropped),
metricSparkDriverLiveListenerBusPosted: newMetricSparkDriverLiveListenerBusPosted(mbc.Metrics.SparkDriverLiveListenerBusPosted),
metricSparkDriverLiveListenerBusProcessingTimeAverage: newMetricSparkDriverLiveListenerBusProcessingTimeAverage(mbc.Metrics.SparkDriverLiveListenerBusProcessingTimeAverage),
metricSparkDriverLiveListenerBusQueueSize: newMetricSparkDriverLiveListenerBusQueueSize(mbc.Metrics.SparkDriverLiveListenerBusQueueSize),
metricSparkExecutorDiskUsage: newMetricSparkExecutorDiskUsage(mbc.Metrics.SparkExecutorDiskUsage),
metricSparkExecutorGcTime: newMetricSparkExecutorGcTime(mbc.Metrics.SparkExecutorGcTime),
metricSparkExecutorInputSize: newMetricSparkExecutorInputSize(mbc.Metrics.SparkExecutorInputSize),
metricSparkExecutorMemoryUsage: newMetricSparkExecutorMemoryUsage(mbc.Metrics.SparkExecutorMemoryUsage),
metricSparkExecutorShuffleIoSize: newMetricSparkExecutorShuffleIoSize(mbc.Metrics.SparkExecutorShuffleIoSize),
metricSparkExecutorStorageMemoryUsage: newMetricSparkExecutorStorageMemoryUsage(mbc.Metrics.SparkExecutorStorageMemoryUsage),
metricSparkExecutorTaskActive: newMetricSparkExecutorTaskActive(mbc.Metrics.SparkExecutorTaskActive),
metricSparkExecutorTaskLimit: newMetricSparkExecutorTaskLimit(mbc.Metrics.SparkExecutorTaskLimit),
metricSparkExecutorTaskResult: newMetricSparkExecutorTaskResult(mbc.Metrics.SparkExecutorTaskResult),
metricSparkExecutorTime: newMetricSparkExecutorTime(mbc.Metrics.SparkExecutorTime),
metricSparkJobStageActive: newMetricSparkJobStageActive(mbc.Metrics.SparkJobStageActive),
metricSparkJobStageResult: newMetricSparkJobStageResult(mbc.Metrics.SparkJobStageResult),
metricSparkJobTaskActive: newMetricSparkJobTaskActive(mbc.Metrics.SparkJobTaskActive),
metricSparkJobTaskResult: newMetricSparkJobTaskResult(mbc.Metrics.SparkJobTaskResult),
metricSparkStageDiskSpilled: newMetricSparkStageDiskSpilled(mbc.Metrics.SparkStageDiskSpilled),
metricSparkStageExecutorCPUTime: newMetricSparkStageExecutorCPUTime(mbc.Metrics.SparkStageExecutorCPUTime),
metricSparkStageExecutorRunTime: newMetricSparkStageExecutorRunTime(mbc.Metrics.SparkStageExecutorRunTime),
metricSparkStageIoRecords: newMetricSparkStageIoRecords(mbc.Metrics.SparkStageIoRecords),
metricSparkStageIoSize: newMetricSparkStageIoSize(mbc.Metrics.SparkStageIoSize),
metricSparkStageJvmGcTime: newMetricSparkStageJvmGcTime(mbc.Metrics.SparkStageJvmGcTime),
metricSparkStageMemoryPeak: newMetricSparkStageMemoryPeak(mbc.Metrics.SparkStageMemoryPeak),
metricSparkStageMemorySpilled: newMetricSparkStageMemorySpilled(mbc.Metrics.SparkStageMemorySpilled),
metricSparkStageShuffleBlocksFetched: newMetricSparkStageShuffleBlocksFetched(mbc.Metrics.SparkStageShuffleBlocksFetched),
metricSparkStageShuffleFetchWaitTime: newMetricSparkStageShuffleFetchWaitTime(mbc.Metrics.SparkStageShuffleFetchWaitTime),
metricSparkStageShuffleIoDisk: newMetricSparkStageShuffleIoDisk(mbc.Metrics.SparkStageShuffleIoDisk),
metricSparkStageShuffleIoReadSize: newMetricSparkStageShuffleIoReadSize(mbc.Metrics.SparkStageShuffleIoReadSize),
metricSparkStageShuffleIoRecords: newMetricSparkStageShuffleIoRecords(mbc.Metrics.SparkStageShuffleIoRecords),
metricSparkStageShuffleIoWriteSize: newMetricSparkStageShuffleIoWriteSize(mbc.Metrics.SparkStageShuffleIoWriteSize),
metricSparkStageShuffleWriteTime: newMetricSparkStageShuffleWriteTime(mbc.Metrics.SparkStageShuffleWriteTime),
metricSparkStageStatus: newMetricSparkStageStatus(mbc.Metrics.SparkStageStatus),
metricSparkStageTaskActive: newMetricSparkStageTaskActive(mbc.Metrics.SparkStageTaskActive),
metricSparkStageTaskResult: newMetricSparkStageTaskResult(mbc.Metrics.SparkStageTaskResult),
metricSparkStageTaskResultSize: newMetricSparkStageTaskResultSize(mbc.Metrics.SparkStageTaskResultSize),
resourceAttributeIncludeFilter: make(map[string]filter.Filter),
resourceAttributeExcludeFilter: make(map[string]filter.Filter),
}
if mbc.ResourceAttributes.SparkApplicationID.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["spark.application.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkApplicationID.MetricsInclude)
}
if mbc.ResourceAttributes.SparkApplicationID.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["spark.application.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkApplicationID.MetricsExclude)
}
if mbc.ResourceAttributes.SparkApplicationName.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["spark.application.name"] = filter.CreateFilter(mbc.ResourceAttributes.SparkApplicationName.MetricsInclude)
}
if mbc.ResourceAttributes.SparkApplicationName.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["spark.application.name"] = filter.CreateFilter(mbc.ResourceAttributes.SparkApplicationName.MetricsExclude)
}
if mbc.ResourceAttributes.SparkExecutorID.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["spark.executor.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkExecutorID.MetricsInclude)
}
if mbc.ResourceAttributes.SparkExecutorID.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["spark.executor.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkExecutorID.MetricsExclude)
}
if mbc.ResourceAttributes.SparkJobID.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["spark.job.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkJobID.MetricsInclude)
}
if mbc.ResourceAttributes.SparkJobID.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["spark.job.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkJobID.MetricsExclude)
}
if mbc.ResourceAttributes.SparkStageAttemptID.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["spark.stage.attempt.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkStageAttemptID.MetricsInclude)
}
if mbc.ResourceAttributes.SparkStageAttemptID.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["spark.stage.attempt.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkStageAttemptID.MetricsExclude)
}
if mbc.ResourceAttributes.SparkStageID.MetricsInclude != nil {
mb.resourceAttributeIncludeFilter["spark.stage.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkStageID.MetricsInclude)
}
if mbc.ResourceAttributes.SparkStageID.MetricsExclude != nil {
mb.resourceAttributeExcludeFilter["spark.stage.id"] = filter.CreateFilter(mbc.ResourceAttributes.SparkStageID.MetricsExclude)
}
for _, op := range options {
op.apply(mb)
}
return mb
}
// NewResourceBuilder returns a new resource builder that should be used to build a resource associated with for the emitted metrics.
func (mb *MetricsBuilder) NewResourceBuilder() *ResourceBuilder {
return NewResourceBuilder(mb.config.ResourceAttributes)
}
// updateCapacity updates max length of metrics and resource attributes that will be used for the slice capacity.
func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) {
if mb.metricsCapacity < rm.ScopeMetrics().At(0).Metrics().Len() {
mb.metricsCapacity = rm.ScopeMetrics().At(0).Metrics().Len()
}
}
// ResourceMetricsOption applies changes to provided resource metrics.
type ResourceMetricsOption interface {
apply(pmetric.ResourceMetrics)
}
type resourceMetricsOptionFunc func(pmetric.ResourceMetrics)
func (rmof resourceMetricsOptionFunc) apply(rm pmetric.ResourceMetrics) {
rmof(rm)
}
// WithResource sets the provided resource on the emitted ResourceMetrics.
// It's recommended to use ResourceBuilder to create the resource.
func WithResource(res pcommon.Resource) ResourceMetricsOption {
return resourceMetricsOptionFunc(func(rm pmetric.ResourceMetrics) {
res.CopyTo(rm.Resource())
})
}
// WithStartTimeOverride overrides start time for all the resource metrics data points.
// This option should be only used if different start time has to be set on metrics coming from different resources.
func WithStartTimeOverride(start pcommon.Timestamp) ResourceMetricsOption {
return resourceMetricsOptionFunc(func(rm pmetric.ResourceMetrics) {
var dps pmetric.NumberDataPointSlice
metrics := rm.ScopeMetrics().At(0).Metrics()
for i := 0; i < metrics.Len(); i++ {
switch metrics.At(i).Type() {
case pmetric.MetricTypeGauge:
dps = metrics.At(i).Gauge().DataPoints()
case pmetric.MetricTypeSum:
dps = metrics.At(i).Sum().DataPoints()
}
for j := 0; j < dps.Len(); j++ {
dps.At(j).SetStartTimestamp(start)
}
}
})
}
// EmitForResource saves all the generated metrics under a new resource and updates the internal state to be ready for
// recording another set of data points as part of another resource. This function can be helpful when one scraper
// needs to emit metrics from several resources. Otherwise calling this function is not required,
// just `Emit` function can be called instead.
// Resource attributes should be provided as ResourceMetricsOption arguments.
func (mb *MetricsBuilder) EmitForResource(options ...ResourceMetricsOption) {
rm := pmetric.NewResourceMetrics()
ils := rm.ScopeMetrics().AppendEmpty()
ils.Scope().SetName(ScopeName)
ils.Scope().SetVersion(mb.buildInfo.Version)
ils.Metrics().EnsureCapacity(mb.metricsCapacity)
mb.metricSparkDriverBlockManagerDiskUsage.emit(ils.Metrics())
mb.metricSparkDriverBlockManagerMemoryUsage.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorCompilationAverageTime.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorCompilationCount.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorGeneratedClassAverageSize.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorGeneratedClassCount.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorGeneratedMethodAverageSize.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorGeneratedMethodCount.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorSourceCodeAverageSize.emit(ils.Metrics())
mb.metricSparkDriverCodeGeneratorSourceCodeOperations.emit(ils.Metrics())
mb.metricSparkDriverDagSchedulerJobActive.emit(ils.Metrics())
mb.metricSparkDriverDagSchedulerJobCount.emit(ils.Metrics())
mb.metricSparkDriverDagSchedulerStageCount.emit(ils.Metrics())
mb.metricSparkDriverDagSchedulerStageFailed.emit(ils.Metrics())
mb.metricSparkDriverExecutorGcOperations.emit(ils.Metrics())
mb.metricSparkDriverExecutorGcTime.emit(ils.Metrics())
mb.metricSparkDriverExecutorMemoryExecution.emit(ils.Metrics())
mb.metricSparkDriverExecutorMemoryJvm.emit(ils.Metrics())
mb.metricSparkDriverExecutorMemoryPool.emit(ils.Metrics())
mb.metricSparkDriverExecutorMemoryStorage.emit(ils.Metrics())
mb.metricSparkDriverHiveExternalCatalogFileCacheHits.emit(ils.Metrics())
mb.metricSparkDriverHiveExternalCatalogFilesDiscovered.emit(ils.Metrics())
mb.metricSparkDriverHiveExternalCatalogHiveClientCalls.emit(ils.Metrics())
mb.metricSparkDriverHiveExternalCatalogParallelListingJobs.emit(ils.Metrics())
mb.metricSparkDriverHiveExternalCatalogPartitionsFetched.emit(ils.Metrics())
mb.metricSparkDriverJvmCPUTime.emit(ils.Metrics())
mb.metricSparkDriverLiveListenerBusDropped.emit(ils.Metrics())
mb.metricSparkDriverLiveListenerBusPosted.emit(ils.Metrics())
mb.metricSparkDriverLiveListenerBusProcessingTimeAverage.emit(ils.Metrics())
mb.metricSparkDriverLiveListenerBusQueueSize.emit(ils.Metrics())
mb.metricSparkExecutorDiskUsage.emit(ils.Metrics())
mb.metricSparkExecutorGcTime.emit(ils.Metrics())
mb.metricSparkExecutorInputSize.emit(ils.Metrics())
mb.metricSparkExecutorMemoryUsage.emit(ils.Metrics())
mb.metricSparkExecutorShuffleIoSize.emit(ils.Metrics())
mb.metricSparkExecutorStorageMemoryUsage.emit(ils.Metrics())
mb.metricSparkExecutorTaskActive.emit(ils.Metrics())
mb.metricSparkExecutorTaskLimit.emit(ils.Metrics())
mb.metricSparkExecutorTaskResult.emit(ils.Metrics())
mb.metricSparkExecutorTime.emit(ils.Metrics())
mb.metricSparkJobStageActive.emit(ils.Metrics())
mb.metricSparkJobStageResult.emit(ils.Metrics())
mb.metricSparkJobTaskActive.emit(ils.Metrics())
mb.metricSparkJobTaskResult.emit(ils.Metrics())
mb.metricSparkStageDiskSpilled.emit(ils.Metrics())
mb.metricSparkStageExecutorCPUTime.emit(ils.Metrics())
mb.metricSparkStageExecutorRunTime.emit(ils.Metrics())
mb.metricSparkStageIoRecords.emit(ils.Metrics())
mb.metricSparkStageIoSize.emit(ils.Metrics())
mb.metricSparkStageJvmGcTime.emit(ils.Metrics())
mb.metricSparkStageMemoryPeak.emit(ils.Metrics())
mb.metricSparkStageMemorySpilled.emit(ils.Metrics())
mb.metricSparkStageShuffleBlocksFetched.emit(ils.Metrics())
mb.metricSparkStageShuffleFetchWaitTime.emit(ils.Metrics())
mb.metricSparkStageShuffleIoDisk.emit(ils.Metrics())
mb.metricSparkStageShuffleIoReadSize.emit(ils.Metrics())
mb.metricSparkStageShuffleIoRecords.emit(ils.Metrics())
mb.metricSparkStageShuffleIoWriteSize.emit(ils.Metrics())
mb.metricSparkStageShuffleWriteTime.emit(ils.Metrics())
mb.metricSparkStageStatus.emit(ils.Metrics())
mb.metricSparkStageTaskActive.emit(ils.Metrics())
mb.metricSparkStageTaskResult.emit(ils.Metrics())
mb.metricSparkStageTaskResultSize.emit(ils.Metrics())
for _, op := range options {
op.apply(rm)
}
for attr, filter := range mb.resourceAttributeIncludeFilter {
if val, ok := rm.Resource().Attributes().Get(attr); ok && !filter.Matches(val.AsString()) {
return
}
}
for attr, filter := range mb.resourceAttributeExcludeFilter {
if val, ok := rm.Resource().Attributes().Get(attr); ok && filter.Matches(val.AsString()) {
return
}
}
if ils.Metrics().Len() > 0 {
mb.updateCapacity(rm)
rm.MoveTo(mb.metricsBuffer.ResourceMetrics().AppendEmpty())
}
}
// Emit returns all the metrics accumulated by the metrics builder and updates the internal state to be ready for
// recording another set of metrics. This function will be responsible for applying all the transformations required to
// produce metric representation defined in metadata and user config, e.g. delta or cumulative.
func (mb *MetricsBuilder) Emit(options ...ResourceMetricsOption) pmetric.Metrics {
mb.EmitForResource(options...)
metrics := mb.metricsBuffer
mb.metricsBuffer = pmetric.NewMetrics()
return metrics
}
// RecordSparkDriverBlockManagerDiskUsageDataPoint adds a data point to spark.driver.block_manager.disk.usage metric.
func (mb *MetricsBuilder) RecordSparkDriverBlockManagerDiskUsageDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverBlockManagerDiskUsage.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverBlockManagerMemoryUsageDataPoint adds a data point to spark.driver.block_manager.memory.usage metric.
func (mb *MetricsBuilder) RecordSparkDriverBlockManagerMemoryUsageDataPoint(ts pcommon.Timestamp, val int64, locationAttributeValue AttributeLocation, stateAttributeValue AttributeState) {
mb.metricSparkDriverBlockManagerMemoryUsage.recordDataPoint(mb.startTime, ts, val, locationAttributeValue.String(), stateAttributeValue.String())
}
// RecordSparkDriverCodeGeneratorCompilationAverageTimeDataPoint adds a data point to spark.driver.code_generator.compilation.average_time metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorCompilationAverageTimeDataPoint(ts pcommon.Timestamp, val float64) {
mb.metricSparkDriverCodeGeneratorCompilationAverageTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverCodeGeneratorCompilationCountDataPoint adds a data point to spark.driver.code_generator.compilation.count metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorCompilationCountDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverCodeGeneratorCompilationCount.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverCodeGeneratorGeneratedClassAverageSizeDataPoint adds a data point to spark.driver.code_generator.generated_class.average_size metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorGeneratedClassAverageSizeDataPoint(ts pcommon.Timestamp, val float64) {
mb.metricSparkDriverCodeGeneratorGeneratedClassAverageSize.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverCodeGeneratorGeneratedClassCountDataPoint adds a data point to spark.driver.code_generator.generated_class.count metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorGeneratedClassCountDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverCodeGeneratorGeneratedClassCount.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverCodeGeneratorGeneratedMethodAverageSizeDataPoint adds a data point to spark.driver.code_generator.generated_method.average_size metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorGeneratedMethodAverageSizeDataPoint(ts pcommon.Timestamp, val float64) {
mb.metricSparkDriverCodeGeneratorGeneratedMethodAverageSize.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverCodeGeneratorGeneratedMethodCountDataPoint adds a data point to spark.driver.code_generator.generated_method.count metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorGeneratedMethodCountDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverCodeGeneratorGeneratedMethodCount.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverCodeGeneratorSourceCodeAverageSizeDataPoint adds a data point to spark.driver.code_generator.source_code.average_size metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorSourceCodeAverageSizeDataPoint(ts pcommon.Timestamp, val float64) {
mb.metricSparkDriverCodeGeneratorSourceCodeAverageSize.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverCodeGeneratorSourceCodeOperationsDataPoint adds a data point to spark.driver.code_generator.source_code.operations metric.
func (mb *MetricsBuilder) RecordSparkDriverCodeGeneratorSourceCodeOperationsDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverCodeGeneratorSourceCodeOperations.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverDagSchedulerJobActiveDataPoint adds a data point to spark.driver.dag_scheduler.job.active metric.
func (mb *MetricsBuilder) RecordSparkDriverDagSchedulerJobActiveDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverDagSchedulerJobActive.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverDagSchedulerJobCountDataPoint adds a data point to spark.driver.dag_scheduler.job.count metric.
func (mb *MetricsBuilder) RecordSparkDriverDagSchedulerJobCountDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverDagSchedulerJobCount.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverDagSchedulerStageCountDataPoint adds a data point to spark.driver.dag_scheduler.stage.count metric.
func (mb *MetricsBuilder) RecordSparkDriverDagSchedulerStageCountDataPoint(ts pcommon.Timestamp, val int64, schedulerStatusAttributeValue AttributeSchedulerStatus) {
mb.metricSparkDriverDagSchedulerStageCount.recordDataPoint(mb.startTime, ts, val, schedulerStatusAttributeValue.String())
}
// RecordSparkDriverDagSchedulerStageFailedDataPoint adds a data point to spark.driver.dag_scheduler.stage.failed metric.
func (mb *MetricsBuilder) RecordSparkDriverDagSchedulerStageFailedDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverDagSchedulerStageFailed.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverExecutorGcOperationsDataPoint adds a data point to spark.driver.executor.gc.operations metric.
func (mb *MetricsBuilder) RecordSparkDriverExecutorGcOperationsDataPoint(ts pcommon.Timestamp, val int64, gcTypeAttributeValue AttributeGcType) {
mb.metricSparkDriverExecutorGcOperations.recordDataPoint(mb.startTime, ts, val, gcTypeAttributeValue.String())
}
// RecordSparkDriverExecutorGcTimeDataPoint adds a data point to spark.driver.executor.gc.time metric.
func (mb *MetricsBuilder) RecordSparkDriverExecutorGcTimeDataPoint(ts pcommon.Timestamp, val int64, gcTypeAttributeValue AttributeGcType) {
mb.metricSparkDriverExecutorGcTime.recordDataPoint(mb.startTime, ts, val, gcTypeAttributeValue.String())
}
// RecordSparkDriverExecutorMemoryExecutionDataPoint adds a data point to spark.driver.executor.memory.execution metric.
func (mb *MetricsBuilder) RecordSparkDriverExecutorMemoryExecutionDataPoint(ts pcommon.Timestamp, val int64, locationAttributeValue AttributeLocation) {
mb.metricSparkDriverExecutorMemoryExecution.recordDataPoint(mb.startTime, ts, val, locationAttributeValue.String())
}
// RecordSparkDriverExecutorMemoryJvmDataPoint adds a data point to spark.driver.executor.memory.jvm metric.
func (mb *MetricsBuilder) RecordSparkDriverExecutorMemoryJvmDataPoint(ts pcommon.Timestamp, val int64, locationAttributeValue AttributeLocation) {
mb.metricSparkDriverExecutorMemoryJvm.recordDataPoint(mb.startTime, ts, val, locationAttributeValue.String())
}
// RecordSparkDriverExecutorMemoryPoolDataPoint adds a data point to spark.driver.executor.memory.pool metric.
func (mb *MetricsBuilder) RecordSparkDriverExecutorMemoryPoolDataPoint(ts pcommon.Timestamp, val int64, poolMemoryTypeAttributeValue AttributePoolMemoryType) {
mb.metricSparkDriverExecutorMemoryPool.recordDataPoint(mb.startTime, ts, val, poolMemoryTypeAttributeValue.String())
}
// RecordSparkDriverExecutorMemoryStorageDataPoint adds a data point to spark.driver.executor.memory.storage metric.
func (mb *MetricsBuilder) RecordSparkDriverExecutorMemoryStorageDataPoint(ts pcommon.Timestamp, val int64, locationAttributeValue AttributeLocation) {
mb.metricSparkDriverExecutorMemoryStorage.recordDataPoint(mb.startTime, ts, val, locationAttributeValue.String())
}
// RecordSparkDriverHiveExternalCatalogFileCacheHitsDataPoint adds a data point to spark.driver.hive_external_catalog.file_cache_hits metric.
func (mb *MetricsBuilder) RecordSparkDriverHiveExternalCatalogFileCacheHitsDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverHiveExternalCatalogFileCacheHits.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverHiveExternalCatalogFilesDiscoveredDataPoint adds a data point to spark.driver.hive_external_catalog.files_discovered metric.
func (mb *MetricsBuilder) RecordSparkDriverHiveExternalCatalogFilesDiscoveredDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverHiveExternalCatalogFilesDiscovered.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverHiveExternalCatalogHiveClientCallsDataPoint adds a data point to spark.driver.hive_external_catalog.hive_client_calls metric.
func (mb *MetricsBuilder) RecordSparkDriverHiveExternalCatalogHiveClientCallsDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverHiveExternalCatalogHiveClientCalls.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverHiveExternalCatalogParallelListingJobsDataPoint adds a data point to spark.driver.hive_external_catalog.parallel_listing_jobs metric.
func (mb *MetricsBuilder) RecordSparkDriverHiveExternalCatalogParallelListingJobsDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverHiveExternalCatalogParallelListingJobs.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverHiveExternalCatalogPartitionsFetchedDataPoint adds a data point to spark.driver.hive_external_catalog.partitions_fetched metric.
func (mb *MetricsBuilder) RecordSparkDriverHiveExternalCatalogPartitionsFetchedDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverHiveExternalCatalogPartitionsFetched.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverJvmCPUTimeDataPoint adds a data point to spark.driver.jvm_cpu_time metric.
func (mb *MetricsBuilder) RecordSparkDriverJvmCPUTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverJvmCPUTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverLiveListenerBusDroppedDataPoint adds a data point to spark.driver.live_listener_bus.dropped metric.
func (mb *MetricsBuilder) RecordSparkDriverLiveListenerBusDroppedDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverLiveListenerBusDropped.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverLiveListenerBusPostedDataPoint adds a data point to spark.driver.live_listener_bus.posted metric.
func (mb *MetricsBuilder) RecordSparkDriverLiveListenerBusPostedDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverLiveListenerBusPosted.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverLiveListenerBusProcessingTimeAverageDataPoint adds a data point to spark.driver.live_listener_bus.processing_time.average metric.
func (mb *MetricsBuilder) RecordSparkDriverLiveListenerBusProcessingTimeAverageDataPoint(ts pcommon.Timestamp, val float64) {
mb.metricSparkDriverLiveListenerBusProcessingTimeAverage.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkDriverLiveListenerBusQueueSizeDataPoint adds a data point to spark.driver.live_listener_bus.queue_size metric.
func (mb *MetricsBuilder) RecordSparkDriverLiveListenerBusQueueSizeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkDriverLiveListenerBusQueueSize.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkExecutorDiskUsageDataPoint adds a data point to spark.executor.disk.usage metric.
func (mb *MetricsBuilder) RecordSparkExecutorDiskUsageDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkExecutorDiskUsage.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkExecutorGcTimeDataPoint adds a data point to spark.executor.gc_time metric.
func (mb *MetricsBuilder) RecordSparkExecutorGcTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkExecutorGcTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkExecutorInputSizeDataPoint adds a data point to spark.executor.input_size metric.
func (mb *MetricsBuilder) RecordSparkExecutorInputSizeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkExecutorInputSize.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkExecutorMemoryUsageDataPoint adds a data point to spark.executor.memory.usage metric.
func (mb *MetricsBuilder) RecordSparkExecutorMemoryUsageDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkExecutorMemoryUsage.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkExecutorShuffleIoSizeDataPoint adds a data point to spark.executor.shuffle.io.size metric.
func (mb *MetricsBuilder) RecordSparkExecutorShuffleIoSizeDataPoint(ts pcommon.Timestamp, val int64, directionAttributeValue AttributeDirection) {
mb.metricSparkExecutorShuffleIoSize.recordDataPoint(mb.startTime, ts, val, directionAttributeValue.String())
}
// RecordSparkExecutorStorageMemoryUsageDataPoint adds a data point to spark.executor.storage_memory.usage metric.
func (mb *MetricsBuilder) RecordSparkExecutorStorageMemoryUsageDataPoint(ts pcommon.Timestamp, val int64, locationAttributeValue AttributeLocation, stateAttributeValue AttributeState) {
mb.metricSparkExecutorStorageMemoryUsage.recordDataPoint(mb.startTime, ts, val, locationAttributeValue.String(), stateAttributeValue.String())
}
// RecordSparkExecutorTaskActiveDataPoint adds a data point to spark.executor.task.active metric.
func (mb *MetricsBuilder) RecordSparkExecutorTaskActiveDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkExecutorTaskActive.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkExecutorTaskLimitDataPoint adds a data point to spark.executor.task.limit metric.
func (mb *MetricsBuilder) RecordSparkExecutorTaskLimitDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkExecutorTaskLimit.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkExecutorTaskResultDataPoint adds a data point to spark.executor.task.result metric.
func (mb *MetricsBuilder) RecordSparkExecutorTaskResultDataPoint(ts pcommon.Timestamp, val int64, executorTaskResultAttributeValue AttributeExecutorTaskResult) {
mb.metricSparkExecutorTaskResult.recordDataPoint(mb.startTime, ts, val, executorTaskResultAttributeValue.String())
}
// RecordSparkExecutorTimeDataPoint adds a data point to spark.executor.time metric.
func (mb *MetricsBuilder) RecordSparkExecutorTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkExecutorTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkJobStageActiveDataPoint adds a data point to spark.job.stage.active metric.
func (mb *MetricsBuilder) RecordSparkJobStageActiveDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkJobStageActive.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkJobStageResultDataPoint adds a data point to spark.job.stage.result metric.
func (mb *MetricsBuilder) RecordSparkJobStageResultDataPoint(ts pcommon.Timestamp, val int64, jobResultAttributeValue AttributeJobResult) {
mb.metricSparkJobStageResult.recordDataPoint(mb.startTime, ts, val, jobResultAttributeValue.String())
}
// RecordSparkJobTaskActiveDataPoint adds a data point to spark.job.task.active metric.
func (mb *MetricsBuilder) RecordSparkJobTaskActiveDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkJobTaskActive.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkJobTaskResultDataPoint adds a data point to spark.job.task.result metric.
func (mb *MetricsBuilder) RecordSparkJobTaskResultDataPoint(ts pcommon.Timestamp, val int64, jobResultAttributeValue AttributeJobResult) {
mb.metricSparkJobTaskResult.recordDataPoint(mb.startTime, ts, val, jobResultAttributeValue.String())
}
// RecordSparkStageDiskSpilledDataPoint adds a data point to spark.stage.disk.spilled metric.
func (mb *MetricsBuilder) RecordSparkStageDiskSpilledDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageDiskSpilled.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageExecutorCPUTimeDataPoint adds a data point to spark.stage.executor.cpu_time metric.
func (mb *MetricsBuilder) RecordSparkStageExecutorCPUTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageExecutorCPUTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageExecutorRunTimeDataPoint adds a data point to spark.stage.executor.run_time metric.
func (mb *MetricsBuilder) RecordSparkStageExecutorRunTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageExecutorRunTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageIoRecordsDataPoint adds a data point to spark.stage.io.records metric.
func (mb *MetricsBuilder) RecordSparkStageIoRecordsDataPoint(ts pcommon.Timestamp, val int64, directionAttributeValue AttributeDirection) {
mb.metricSparkStageIoRecords.recordDataPoint(mb.startTime, ts, val, directionAttributeValue.String())
}
// RecordSparkStageIoSizeDataPoint adds a data point to spark.stage.io.size metric.
func (mb *MetricsBuilder) RecordSparkStageIoSizeDataPoint(ts pcommon.Timestamp, val int64, directionAttributeValue AttributeDirection) {
mb.metricSparkStageIoSize.recordDataPoint(mb.startTime, ts, val, directionAttributeValue.String())
}
// RecordSparkStageJvmGcTimeDataPoint adds a data point to spark.stage.jvm_gc_time metric.
func (mb *MetricsBuilder) RecordSparkStageJvmGcTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageJvmGcTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageMemoryPeakDataPoint adds a data point to spark.stage.memory.peak metric.
func (mb *MetricsBuilder) RecordSparkStageMemoryPeakDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageMemoryPeak.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageMemorySpilledDataPoint adds a data point to spark.stage.memory.spilled metric.
func (mb *MetricsBuilder) RecordSparkStageMemorySpilledDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageMemorySpilled.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageShuffleBlocksFetchedDataPoint adds a data point to spark.stage.shuffle.blocks_fetched metric.
func (mb *MetricsBuilder) RecordSparkStageShuffleBlocksFetchedDataPoint(ts pcommon.Timestamp, val int64, sourceAttributeValue AttributeSource) {
mb.metricSparkStageShuffleBlocksFetched.recordDataPoint(mb.startTime, ts, val, sourceAttributeValue.String())
}
// RecordSparkStageShuffleFetchWaitTimeDataPoint adds a data point to spark.stage.shuffle.fetch_wait_time metric.
func (mb *MetricsBuilder) RecordSparkStageShuffleFetchWaitTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageShuffleFetchWaitTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageShuffleIoDiskDataPoint adds a data point to spark.stage.shuffle.io.disk metric.
func (mb *MetricsBuilder) RecordSparkStageShuffleIoDiskDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageShuffleIoDisk.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageShuffleIoReadSizeDataPoint adds a data point to spark.stage.shuffle.io.read.size metric.
func (mb *MetricsBuilder) RecordSparkStageShuffleIoReadSizeDataPoint(ts pcommon.Timestamp, val int64, sourceAttributeValue AttributeSource) {
mb.metricSparkStageShuffleIoReadSize.recordDataPoint(mb.startTime, ts, val, sourceAttributeValue.String())
}
// RecordSparkStageShuffleIoRecordsDataPoint adds a data point to spark.stage.shuffle.io.records metric.
func (mb *MetricsBuilder) RecordSparkStageShuffleIoRecordsDataPoint(ts pcommon.Timestamp, val int64, directionAttributeValue AttributeDirection) {
mb.metricSparkStageShuffleIoRecords.recordDataPoint(mb.startTime, ts, val, directionAttributeValue.String())
}
// RecordSparkStageShuffleIoWriteSizeDataPoint adds a data point to spark.stage.shuffle.io.write.size metric.
func (mb *MetricsBuilder) RecordSparkStageShuffleIoWriteSizeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageShuffleIoWriteSize.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageShuffleWriteTimeDataPoint adds a data point to spark.stage.shuffle.write_time metric.
func (mb *MetricsBuilder) RecordSparkStageShuffleWriteTimeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageShuffleWriteTime.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageStatusDataPoint adds a data point to spark.stage.status metric.
func (mb *MetricsBuilder) RecordSparkStageStatusDataPoint(ts pcommon.Timestamp, val int64, stageActiveAttributeValue bool, stageCompleteAttributeValue bool, stagePendingAttributeValue bool, stageFailedAttributeValue bool) {
mb.metricSparkStageStatus.recordDataPoint(mb.startTime, ts, val, stageActiveAttributeValue, stageCompleteAttributeValue, stagePendingAttributeValue, stageFailedAttributeValue)
}
// RecordSparkStageTaskActiveDataPoint adds a data point to spark.stage.task.active metric.
func (mb *MetricsBuilder) RecordSparkStageTaskActiveDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageTaskActive.recordDataPoint(mb.startTime, ts, val)
}
// RecordSparkStageTaskResultDataPoint adds a data point to spark.stage.task.result metric.
func (mb *MetricsBuilder) RecordSparkStageTaskResultDataPoint(ts pcommon.Timestamp, val int64, stageTaskResultAttributeValue AttributeStageTaskResult) {
mb.metricSparkStageTaskResult.recordDataPoint(mb.startTime, ts, val, stageTaskResultAttributeValue.String())
}
// RecordSparkStageTaskResultSizeDataPoint adds a data point to spark.stage.task.result_size metric.
func (mb *MetricsBuilder) RecordSparkStageTaskResultSizeDataPoint(ts pcommon.Timestamp, val int64) {
mb.metricSparkStageTaskResultSize.recordDataPoint(mb.startTime, ts, val)
}
// Reset resets metrics builder to its initial state. It should be used when external metrics source is restarted,
// and metrics builder should update its startTime and reset it's internal state accordingly.
func (mb *MetricsBuilder) Reset(options ...MetricBuilderOption) {
mb.startTime = pcommon.NewTimestampFromTime(time.Now())
for _, op := range options {
op.apply(mb)
}
}