internal/elasticsearch/ingest/nodestats.go (404 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package ingest
import (
"encoding/json"
"fmt"
"io"
"strings"
"github.com/elastic/elastic-package/internal/elasticsearch"
)
// StatsRecord contains stats for a measurable entity (pipeline, processor, etc.)
type StatsRecord struct {
Count, Current, Failed int64
TimeInMillis int64 `json:"time_in_millis"`
}
// ProcessorStats contains a processor's stats and some metadata.
type ProcessorStats struct {
Type string
Extra string
Conditional bool
Stats StatsRecord
}
// PipelineStats contains stats for a pipeline.
type PipelineStats struct {
StatsRecord
Processors []ProcessorStats
}
// PipelineStatsMap holds the stats for a set of pipelines.
type PipelineStatsMap map[string]PipelineStats
type wrappedProcessor map[string]ProcessorStats
// Extract ProcessorStats from an object in the form:
// `{ "processor_type": { ...ProcessorStats...} }`
func (p wrappedProcessor) extract() (stats ProcessorStats, err error) {
if len(p) != 1 {
keys := make([]string, 0, len(p))
for k := range p {
keys = append(keys, k)
}
return stats, fmt.Errorf("can't extract processor stats. Need a single key in the processor identifier, got %d: %v", len(p), keys)
}
// Read single entry in map.
var processorType string
for processorType, stats = range p {
}
// Handle compound processors in the form compound:[...extra...]
if off := strings.Index(processorType, ":"); off != -1 {
stats.Extra = processorType[off+1:]
processorType = processorType[:off]
}
switch stats.Type {
case processorType:
case "conditional":
stats.Conditional = true
default:
return stats, fmt.Errorf("can't understand processor identifier '%s' in %+v", processorType, p)
}
stats.Type = processorType
return stats, nil
}
type pipelineStatsRecord struct {
StatsRecord
Processors []wrappedProcessor
}
func (r pipelineStatsRecord) extract() (stats PipelineStats, err error) {
stats = PipelineStats{
StatsRecord: r.StatsRecord,
Processors: make([]ProcessorStats, len(r.Processors)),
}
for idx, wrapped := range r.Processors {
if stats.Processors[idx], err = wrapped.extract(); err != nil {
return stats, fmt.Errorf("extracting processor %d: %w", idx, err)
}
}
return stats, nil
}
type pipelineStatsRecordMap map[string]pipelineStatsRecord
type pipelinesStatsNode struct {
Ingest struct {
Pipelines pipelineStatsRecordMap
}
}
type pipelinesStatsResponse struct {
Nodes map[string]pipelinesStatsNode
}
func GetPipelineStats(esClient *elasticsearch.API, pipelines []Pipeline) (stats PipelineStatsMap, err error) {
statsResponse, err := requestPipelineStats(esClient)
if err != nil {
return nil, err
}
return getPipelineStats(statsResponse, pipelines)
}
func GetPipelineStatsByPrefix(esClient *elasticsearch.API, pipelinePrefix string) (map[string]PipelineStatsMap, error) {
statsResponse, err := requestPipelineStats(esClient)
if err != nil {
return nil, err
}
return getPipelineStatsByPrefix(statsResponse, pipelinePrefix)
}
func requestPipelineStats(esClient *elasticsearch.API) ([]byte, error) {
filterPathReq := esClient.Nodes.Stats.WithFilterPath("nodes.*.ingest.pipelines")
includeUnloadedSegmentReq := esClient.Nodes.Stats.WithIncludeUnloadedSegments(true)
resp, err := esClient.Nodes.Stats(filterPathReq, includeUnloadedSegmentReq)
if err != nil {
return nil, fmt.Errorf("node stats API call failed: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read Stats API response body: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("unexpected response status for Node Stats (%d): %s: %w", resp.StatusCode, resp.Status(), elasticsearch.NewError(body))
}
return body, nil
}
func getPipelineStatsByPrefix(body []byte, pipelinePrefix string) (stats map[string]PipelineStatsMap, err error) {
var statsResponse pipelinesStatsResponse
if err = json.Unmarshal(body, &statsResponse); err != nil {
return nil, fmt.Errorf("error decoding Node Stats response: %w", err)
}
stats = make(map[string]PipelineStatsMap, len(statsResponse.Nodes))
for nid, node := range statsResponse.Nodes {
nodePStats := make(PipelineStatsMap)
for name, pStats := range node.Ingest.Pipelines {
if !strings.HasPrefix(name, pipelinePrefix) {
continue
}
if nodePStats[name], err = pStats.extract(); err != nil {
return stats, fmt.Errorf("converting pipeline %s: %w", name, err)
}
}
stats[nid] = nodePStats
}
return stats, nil
}
func getPipelineStats(body []byte, pipelines []Pipeline) (stats PipelineStatsMap, err error) {
var statsResponse pipelinesStatsResponse
if err = json.Unmarshal(body, &statsResponse); err != nil {
return nil, fmt.Errorf("error decoding Node Stats response: %w", err)
}
if nodeCount := len(statsResponse.Nodes); nodeCount != 1 {
return nil, fmt.Errorf("need exactly one ES node in stats response (got %d)", nodeCount)
}
var nodePipelines map[string]pipelineStatsRecord
for _, node := range statsResponse.Nodes {
nodePipelines = node.Ingest.Pipelines
}
stats = make(PipelineStatsMap, len(pipelines))
var missing []string
for _, requested := range pipelines {
if pStats, found := nodePipelines[requested.Name]; found {
if stats[requested.Name], err = pStats.extract(); err != nil {
return stats, fmt.Errorf("converting pipeline %s: %w", requested.Name, err)
}
} else {
missing = append(missing, requested.Name)
}
}
if len(missing) != 0 {
return stats, fmt.Errorf("node stats response is missing expected pipelines: %s", strings.Join(missing, ", "))
}
return stats, nil
}
type NodesStats struct {
ClusterName string `json:"cluster_name"`
Nodes map[string]NodeStats `json:"nodes"`
}
type NodeStats struct {
Breakers map[string]struct {
LimitSizeInBytes int `json:"limit_size_in_bytes"`
EstimatedSizeInBytes int `json:"estimated_size_in_bytes"`
Overhead float64 `json:"overhead"`
Tripped int `json:"tripped"`
}
Indices struct {
Docs struct {
Count int `json:"count"`
Deleted int `json:"deleted"`
} `json:"docs"`
ShardStats struct {
TotalCount int `json:"total_count"`
} `json:"shard_stats"`
Store struct {
SizeInBytes int `json:"size_in_bytes"`
TotalDataSetSizeInBytes int `json:"total_data_set_size_in_bytes"`
ReservedInBytes int `json:"reserved_in_bytes"`
} `json:"store"`
Indexing struct {
IndexTotal int `json:"index_total"`
IndexTimeInMillis int `json:"index_time_in_millis"`
IndexCurrent int `json:"index_current"`
IndexFailed int `json:"index_failed"`
DeleteTotal int `json:"delete_total"`
DeleteTimeInMillis int `json:"delete_time_in_millis"`
DeleteCurrent int `json:"delete_current"`
NoopUpdateTotal int `json:"noop_update_total"`
IsThrottled bool `json:"is_throttled"`
ThrottleTimeInMillis int `json:"throttle_time_in_millis"`
WriteLoad float64 `json:"write_load"`
} `json:"indexing"`
Get struct {
Total int `json:"total"`
TimeInMillis int `json:"time_in_millis"`
ExistsTotal int `json:"exists_total"`
ExistsTimeInMillis int `json:"exists_time_in_millis"`
MissingTotal int `json:"missing_total"`
MissingTimeInMillis int `json:"missing_time_in_millis"`
Current int `json:"current"`
} `json:"get"`
Search struct {
OpenContexts int `json:"open_contexts"`
QueryTotal int `json:"query_total"`
QueryTimeInMillis int `json:"query_time_in_millis"`
QueryCurrent int `json:"query_current"`
FetchTotal int `json:"fetch_total"`
FetchTimeInMillis int `json:"fetch_time_in_millis"`
FetchCurrent int `json:"fetch_current"`
ScrollTotal int `json:"scroll_total"`
ScrollTimeInMillis int `json:"scroll_time_in_millis"`
ScrollCurrent int `json:"scroll_current"`
SuggestTotal int `json:"suggest_total"`
SuggestTimeInMillis int `json:"suggest_time_in_millis"`
SuggestCurrent int `json:"suggest_current"`
} `json:"search"`
Merges struct {
Current int `json:"current"`
CurrentDocs int `json:"current_docs"`
CurrentSizeInBytes int `json:"current_size_in_bytes"`
Total int `json:"total"`
TotalTimeInMillis int `json:"total_time_in_millis"`
TotalDocs int `json:"total_docs"`
TotalSizeInBytes int `json:"total_size_in_bytes"`
TotalStoppedTimeInMillis int `json:"total_stopped_time_in_millis"`
TotalThrottledTimeInMillis int `json:"total_throttled_time_in_millis"`
TotalAutoThrottleInBytes int64 `json:"total_auto_throttle_in_bytes"`
} `json:"merges"`
Refresh struct {
Total int `json:"total"`
TotalTimeInMillis int `json:"total_time_in_millis"`
ExternalTotal int `json:"external_total"`
ExternalTotalTimeInMillis int `json:"external_total_time_in_millis"`
Listeners int `json:"listeners"`
} `json:"refresh"`
Flush struct {
Total int `json:"total"`
Periodic int `json:"periodic"`
TotalTimeInMillis int `json:"total_time_in_millis"`
} `json:"flush"`
Warmer struct {
Current int `json:"current"`
Total int `json:"total"`
TotalTimeInMillis int `json:"total_time_in_millis"`
} `json:"warmer"`
QueryCache struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
TotalCount int `json:"total_count"`
HitCount int `json:"hit_count"`
MissCount int `json:"miss_count"`
CacheSize int `json:"cache_size"`
CacheCount int `json:"cache_count"`
Evictions int `json:"evictions"`
} `json:"query_cache"`
Fielddata struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
} `json:"fielddata"`
Completion struct {
SizeInBytes int `json:"size_in_bytes"`
} `json:"completion"`
Segments struct {
Count int `json:"count"`
MemoryInBytes int `json:"memory_in_bytes"`
TermsMemoryInBytes int `json:"terms_memory_in_bytes"`
StoredFieldsMemoryInBytes int `json:"stored_fields_memory_in_bytes"`
TermVectorsMemoryInBytes int `json:"term_vectors_memory_in_bytes"`
NormsMemoryInBytes int `json:"norms_memory_in_bytes"`
PointsMemoryInBytes int `json:"points_memory_in_bytes"`
DocValuesMemoryInBytes int `json:"doc_values_memory_in_bytes"`
IndexWriterMemoryInBytes int `json:"index_writer_memory_in_bytes"`
VersionMapMemoryInBytes int `json:"version_map_memory_in_bytes"`
FixedBitSetMemoryInBytes int `json:"fixed_bit_set_memory_in_bytes"`
MaxUnsafeAutoIDTimestamp int `json:"max_unsafe_auto_id_timestamp"`
FileSizes map[string]int `json:"file_sizes"`
} `json:"segments"`
Translog struct {
Operations int `json:"operations"`
SizeInBytes int `json:"size_in_bytes"`
UncommittedOperations int `json:"uncommitted_operations"`
UncommittedSizeInBytes int `json:"uncommitted_size_in_bytes"`
EarliestLastModifiedAge int `json:"earliest_last_modified_age"`
} `json:"translog"`
RequestCache struct {
MemorySizeInBytes int `json:"memory_size_in_bytes"`
Evictions int `json:"evictions"`
HitCount int `json:"hit_count"`
MissCount int `json:"miss_count"`
} `json:"request_cache"`
Recovery struct {
CurrentAsSource int `json:"current_as_source"`
CurrentAsTarget int `json:"current_as_target"`
ThrottleTimeInMillis int `json:"throttle_time_in_millis"`
} `json:"recovery"`
Bulk struct {
TotalOperations int64 `json:"total_operations"`
TotalTimeInMillis int64 `json:"total_time_in_millis"`
TotalSizeInBytes int64 `json:"total_size_in_bytes"`
AvgTimeInMillis int64 `json:"avg_time_in_millis"`
AvgSizeInBytes int64 `json:"avg_size_in_bytes"`
} `json:"bulk"`
Mappings struct {
TotalCount int64 `json:"total_count"`
TotalEstimatedOverheadInBytes int64 `json:"total_estimated_overhead_in_bytes"`
} `json:"mappings"`
} `json:"indices"`
JVM struct {
Mem struct {
HeapUsedInBytes int `json:"heap_used_in_bytes"`
HeapUsedPercent int `json:"heap_used_percent"`
HeapCommittedInBytes int `json:"heap_committed_in_bytes"`
HeapMaxInBytes int `json:"heap_max_in_bytes"`
NonHeapUsedInBytes int `json:"non_heap_used_in_bytes"`
NonHeapCommittedInBytes int `json:"non_heap_committed_in_bytes"`
Pools map[string]struct {
UsedInBytes int `json:"used_in_bytes"`
MaxInBytes int `json:"max_in_bytes"`
PeakUsedInBytes int `json:"peak_used_in_bytes"`
PeakMaxInBytes int `json:"peak_max_in_bytes"`
} `json:"pools"`
} `json:"mem"`
Gc struct {
Collectors map[string]struct {
CollectionCount int `json:"collection_count"`
CollectionTimeInMillis int `json:"collection_time_in_millis"`
} `json:"collectors"`
} `json:"gc"`
BufferPools map[string]struct {
Count int `json:"count"`
UsedInBytes int `json:"used_in_bytes"`
TotalCapacityInBytes int `json:"total_capacity_in_bytes"`
} `json:"buffer_pools"`
} `json:"jvm"`
OS struct {
Mem struct {
TotalInBytes int64 `json:"total_in_bytes"`
AdjustedTotalInBytes int64 `json:"adjusted_total_in_bytes"`
FreeInBytes int64 `json:"free_in_bytes"`
UsedInBytes int64 `json:"used_in_bytes"`
FreePercent int `json:"free_percent"`
UsedPercent int `json:"used_percent"`
} `json:"mem"`
} `json:"os"`
Process struct {
CPU struct {
Percent int `json:"percent"`
TotalInMillis int64 `json:"total_in_millis"`
} `json:"cpu"`
} `json:"process"`
ThreadPool map[string]struct {
Threads int `json:"threads"`
Queue int `json:"queue"`
Active int `json:"active"`
Rejected int `json:"rejected"`
Largest int `json:"largest"`
Completed int `json:"completed"`
} `json:"thread_pool"`
Transport struct {
ServerOpen int `json:"server_open"`
TotalOutboundConnections int `json:"total_outbound_connections"`
RxCount int `json:"rx_count"`
RxSizeInBytes int `json:"rx_size_in_bytes"`
TxCount int `json:"tx_count"`
TxSizeInBytes int `json:"tx_size_in_bytes"`
InboundHandlingTimeHistogram []struct {
GEMillis int `json:"ge_millis"`
LTMillis int `json:"lt_millis"`
Count int `json:"count"`
} `json:"inbound_handling_time_histogram"`
OutboundHandlingTimeHistogram []struct {
GEMillis int `json:"ge_millis"`
LTMillis int `json:"lt_millis"`
Count int `json:"count"`
} `json:"outbound_handling_time_histogram"`
} `json:"transport"`
}
func GetNodesStats(esClient *elasticsearch.API) (*NodesStats, error) {
req := esClient.Nodes.Stats.WithFilterPath("cluster_name," +
"nodes.*.breakers," +
"nodes.*.indices," +
"nodes.*.jvm.mem," +
"nodes.*.jvm.gc," +
"nodes.*.jvm.buffer_pools," +
"nodes.*.os.mem," +
"nodes.*.process.cpu," +
"nodes.*.thread_pool," +
"nodes.*.transport",
)
resp, err := esClient.Nodes.Stats(req)
if err != nil {
return nil, fmt.Errorf("node stats API call failed: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read Stats API response body: %w", err)
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("unexpected response status for Node Stats (%d): %s: %w", resp.StatusCode, resp.Status(), elasticsearch.NewError(body))
}
var statsResponse NodesStats
if err = json.Unmarshal(body, &statsResponse); err != nil {
return nil, fmt.Errorf("error decoding Node Stats response: %w", err)
}
return &statsResponse, nil
}