receiver/splunkenterprisereceiver/scraper.go (1,482 lines of code) (raw):
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package splunkenterprisereceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver"
import (
"context"
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/scraper/scrapererror"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkenterprisereceiver/internal/metadata"
)
var errMaxSearchWaitTimeExceeded = errors.New("maximum search wait time exceeded for metric")
type splunkScraper struct {
splunkClient *splunkEntClient
settings component.TelemetrySettings
conf *Config
mb *metadata.MetricsBuilder
}
func newSplunkMetricsScraper(params receiver.Settings, cfg *Config) splunkScraper {
return splunkScraper{
settings: params.TelemetrySettings,
conf: cfg,
mb: metadata.NewMetricsBuilder(cfg.MetricsBuilderConfig, params),
}
}
// Create a client instance and add to the splunkScraper
func (s *splunkScraper) start(ctx context.Context, h component.Host) (err error) {
client, err := newSplunkEntClient(ctx, s.conf, h, s.settings)
if err != nil {
return err
}
s.splunkClient = client
return nil
}
// listens to the error channel and combines errors sent from different metric scrape functions,
// returning the combined error list should context timeout or a nil error value is sent in the
// channel signifying the end of a scrape cycle
func errorListener(ctx context.Context, eQueue <-chan error, eOut chan<- *scrapererror.ScrapeErrors) {
errs := &scrapererror.ScrapeErrors{}
for {
select {
case <-ctx.Done():
eOut <- errs
return
case err, ok := <-eQueue:
if !ok {
eOut <- errs
return
}
errs.Add(err)
}
}
}
// The big one: Describes how all scraping tasks should be performed. Part of the scraper interface
func (s *splunkScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
var wg sync.WaitGroup
var errs *scrapererror.ScrapeErrors
now := pcommon.NewTimestampFromTime(time.Now())
errOut := make(chan *scrapererror.ScrapeErrors)
metricScrapes := []func(context.Context, pcommon.Timestamp, infoDict, chan error){
s.scrapeLicenseUsageByIndex,
s.scrapeIndexThroughput,
s.scrapeIndexesTotalSize,
s.scrapeIndexesEventCount,
s.scrapeIndexesBucketCount,
s.scrapeIndexesRawSize,
s.scrapeIndexesBucketEventCount,
s.scrapeIndexesBucketHotWarmCount,
s.scrapeIntrospectionQueues,
s.scrapeIntrospectionQueuesBytes,
s.scrapeAvgExecLatencyByHost,
s.scrapeIndexerPipelineQueues,
s.scrapeBucketsSearchableStatus,
s.scrapeIndexesBucketCountAdHoc,
s.scrapeSchedulerCompletionRatioByHost,
s.scrapeIndexerRawWriteSecondsByHost,
s.scrapeIndexerCPUSecondsByHost,
s.scrapeAvgIopsByHost,
s.scrapeSchedulerRunTimeByHost,
s.scrapeIndexerAvgRate,
s.scrapeKVStoreStatus,
s.scrapeSearchArtifacts,
s.scrapeHealth,
}
errChan := make(chan error, len(metricScrapes))
go func() {
errorListener(ctx, errChan, errOut)
}()
// if the build and version info has been configured that is pulled here
var info infoDict
if s.conf.VersionInfo {
info = s.scrapeInfo(ctx, now, errChan)
} else {
info = make(infoDict)
nullInfo := Info{Host: "", Entries: make([]InfoEntry, 1)}
info[typeCm] = nullInfo
info[typeSh] = nullInfo
info[typeIdx] = nullInfo
}
for _, fn := range metricScrapes {
wg.Add(1)
go func(
fn func(ctx context.Context, now pcommon.Timestamp, info infoDict, errs chan error),
ctx context.Context,
now pcommon.Timestamp,
info infoDict,
errs chan error,
) {
// actual function body
defer wg.Done()
fn(ctx, now, info, errs)
}(fn, ctx, now, info, errChan)
}
wg.Wait()
close(errChan)
errs = <-errOut
return s.mb.Emit(), errs.Combine()
}
// Each metric has its own scrape function associated with it
func (s *splunkScraper) scrapeLicenseUsageByIndex(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkLicenseIndexUsage.Enabled || !s.splunkClient.isConfigured(typeCm) {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkLicenseIndexUsageSearch`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var indexName string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "indexname":
indexName = f.Value
continue
case "By":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkLicenseIndexUsageDataPoint(now, int64(v), indexName, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeAvgExecLatencyByHost(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkSchedulerAvgExecutionLatency.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkSchedulerAvgExecLatencySearch`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "latency_avg_exec":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkSchedulerAvgExecutionLatencyDataPoint(now, v, host, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeIndexerAvgRate(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkIndexerAvgRate.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkIndexerAvgRate`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 200 {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "indexer_avg_kbps":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexerAvgRateDataPoint(now, v, host, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeIndexerPipelineQueues(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkAggregationQueueRatio.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkPipelineQueues`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 200 {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
var ps int64
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "agg_queue_ratio":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkAggregationQueueRatioDataPoint(now, v, host, i.Build, i.Version)
case "index_queue_ratio":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexerQueueRatioDataPoint(now, v, host, i.Build, i.Version)
case "parse_queue_ratio":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkParseQueueRatioDataPoint(now, v, host, i.Build, i.Version)
case "pipeline_sets":
v, err := strconv.ParseInt(f.Value, 10, 64)
ps = v
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkPipelineSetCountDataPoint(now, ps, host, i.Build, i.Version)
case "typing_queue_ratio":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkTypingQueueRatioDataPoint(now, v, host, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeBucketsSearchableStatus(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkBucketsSearchableStatus.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkBucketsSearchableStatus`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 200 {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
var searchable string
var bc int64
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "is_searchable":
searchable = f.Value
continue
case "bucket_count":
v, err := strconv.ParseInt(f.Value, 10, 64)
bc = v
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkBucketsSearchableStatusDataPoint(now, bc, host, searchable, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeIndexesBucketCountAdHoc(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkIndexesSize.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkIndexesData`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 200 {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var indexer string
var bc int64
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "title":
indexer = f.Value
continue
case "total_size_gb":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexesSizeDataPoint(now, v, indexer, i.Build, i.Version)
case "average_size_gb":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexesAvgSizeDataPoint(now, v, indexer, i.Build, i.Version)
case "average_usage_perc":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexesAvgUsageDataPoint(now, v, indexer, i.Build, i.Version)
case "median_data_age":
v, err := strconv.ParseInt(f.Value, 10, 64)
bc = v
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexesMedianDataAgeDataPoint(now, bc, indexer, i.Build, i.Version)
case "bucket_count":
v, err := strconv.ParseInt(f.Value, 10, 64)
bc = v
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexesBucketCountDataPoint(now, bc, indexer, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeSchedulerCompletionRatioByHost(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkSchedulerCompletionRatio.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkSchedulerCompletionRatio`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "completion_ratio":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkSchedulerCompletionRatioDataPoint(now, v, host, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeIndexerRawWriteSecondsByHost(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkIndexerRawWriteTime.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkIndexerRawWriteSeconds`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "raw_data_write_seconds":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexerRawWriteTimeDataPoint(now, v, host, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeIndexerCPUSecondsByHost(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkIndexerCPUTime.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkIndexerCpuSeconds`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "service_cpu_seconds":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIndexerCPUTimeDataPoint(now, v, host, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeAvgIopsByHost(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkIoAvgIops.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkIoAvgIops`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "iops":
v, err := strconv.ParseInt(f.Value, 10, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkIoAvgIopsDataPoint(now, v, host, i.Build, i.Version)
}
}
}
func (s *splunkScraper) scrapeSchedulerRunTimeByHost(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// Because we have to utilize network resources for each KPI we should check that each metrics
// is enabled before proceeding
if !s.conf.Metrics.SplunkSchedulerAvgRunTime.Enabled {
return
}
i := info[typeCm].Entries[0].Content
sr := searchResponse{
search: searchDict[`SplunkSchedulerAvgRunTime`],
}
var (
req *http.Request
res *http.Response
err error
)
start := time.Now()
for {
req, err = s.splunkClient.createRequest(typeCm, &sr)
if err != nil {
errs <- err
return
}
res, err = s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
// if its a 204 the body will be empty because we are still waiting on search results
err = unmarshallSearchReq(res, &sr)
if err != nil {
errs <- err
}
res.Body.Close()
// if no errors and 200 returned scrape was successful, return. Note we must make sure that
// the 200 is coming after the first request which provides a jobId to retrieve results
if sr.Return == 200 && sr.Jobid != nil {
break
}
if sr.Return == 204 {
time.Sleep(2 * time.Second)
}
if sr.Return == 400 {
break
}
if time.Since(start) > s.conf.Timeout {
errs <- errMaxSearchWaitTimeExceeded
return
}
}
// Record the results
var host string
for _, f := range sr.Fields {
switch fieldName := f.FieldName; fieldName {
case "host":
host = f.Value
continue
case "run_time_avg":
v, err := strconv.ParseFloat(f.Value, 64)
if err != nil {
errs <- err
continue
}
s.mb.RecordSplunkSchedulerAvgRunTimeDataPoint(now, v, host, i.Build, i.Version)
}
}
}
// Helper function for unmarshaling search endpoint requests
func unmarshallSearchReq(res *http.Response, sr *searchResponse) error {
sr.Return = res.StatusCode
if res.ContentLength == 0 {
return nil
}
body, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("failed to read response: %w", err)
}
err = xml.Unmarshal(body, &sr)
if err != nil {
return fmt.Errorf("failed to unmarshall response: %w", err)
}
return nil
}
// Scrape index throughput introspection endpoint
func (s *splunkScraper) scrapeIndexThroughput(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkIndexerThroughput.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it indexThroughput
ept := apiDict[`SplunkIndexerThroughput`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
for _, entry := range it.Entries {
s.mb.RecordSplunkIndexerThroughputDataPoint(now, 1000*entry.Content.AvgKb, entry.Content.Status, i.Build, i.Version)
}
}
// Scrape indexes extended total size
func (s *splunkScraper) scrapeIndexesTotalSize(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkDataIndexesExtendedTotalSize.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it indexesExtended
ept := apiDict[`SplunkDataIndexesExtended`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
var totalSize int64
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
if f.Content.TotalSize != "" {
mb, err := strconv.ParseFloat(f.Content.TotalSize, 64)
totalSize = int64(mb * 1024 * 1024)
if err != nil {
errs <- err
}
}
s.mb.RecordSplunkDataIndexesExtendedTotalSizeDataPoint(now, totalSize, name, i.Build, i.Version)
}
}
// Scrape indexes extended total event count
func (s *splunkScraper) scrapeIndexesEventCount(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkDataIndexesExtendedEventCount.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it indexesExtended
ept := apiDict[`SplunkDataIndexesExtended`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
totalEventCount := int64(f.Content.TotalEventCount)
s.mb.RecordSplunkDataIndexesExtendedEventCountDataPoint(now, totalEventCount, name, i.Build, i.Version)
}
}
// Scrape indexes extended total bucket count
func (s *splunkScraper) scrapeIndexesBucketCount(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkDataIndexesExtendedBucketCount.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it indexesExtended
ept := apiDict[`SplunkDataIndexesExtended`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
var totalBucketCount int64
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
if f.Content.TotalBucketCount != "" {
totalBucketCount, err = strconv.ParseInt(f.Content.TotalBucketCount, 10, 64)
if err != nil {
errs <- err
}
}
s.mb.RecordSplunkDataIndexesExtendedBucketCountDataPoint(now, totalBucketCount, name, i.Build, i.Version)
}
}
// Scrape indexes extended raw size
func (s *splunkScraper) scrapeIndexesRawSize(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkDataIndexesExtendedRawSize.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it indexesExtended
ept := apiDict[`SplunkDataIndexesExtended`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
var totalRawSize int64
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
if f.Content.TotalRawSize != "" {
mb, err := strconv.ParseFloat(f.Content.TotalRawSize, 64)
totalRawSize = int64(mb * 1024 * 1024)
if err != nil {
errs <- err
}
}
s.mb.RecordSplunkDataIndexesExtendedRawSizeDataPoint(now, totalRawSize, name, i.Build, i.Version)
}
}
// Scrape indexes extended bucket event count
func (s *splunkScraper) scrapeIndexesBucketEventCount(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkDataIndexesExtendedBucketEventCount.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it indexesExtended
ept := apiDict[`SplunkDataIndexesExtended`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
var bucketDir string
var bucketEventCount int64
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
if f.Content.BucketDirs.Cold.EventCount != "" {
bucketDir = "cold"
bucketEventCount, err = strconv.ParseInt(f.Content.BucketDirs.Cold.EventCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkDataIndexesExtendedBucketEventCountDataPoint(now, bucketEventCount, name, bucketDir, i.Build, i.Version)
}
if f.Content.BucketDirs.Home.EventCount != "" {
bucketDir = "home"
bucketEventCount, err = strconv.ParseInt(f.Content.BucketDirs.Home.EventCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkDataIndexesExtendedBucketEventCountDataPoint(now, bucketEventCount, name, bucketDir, i.Build, i.Version)
}
if f.Content.BucketDirs.Thawed.EventCount != "" {
bucketDir = "thawed"
bucketEventCount, err = strconv.ParseInt(f.Content.BucketDirs.Thawed.EventCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkDataIndexesExtendedBucketEventCountDataPoint(now, bucketEventCount, name, bucketDir, i.Build, i.Version)
}
}
}
// Scrape indexes extended bucket hot/warm count
func (s *splunkScraper) scrapeIndexesBucketHotWarmCount(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkDataIndexesExtendedBucketHotCount.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it indexesExtended
ept := apiDict[`SplunkDataIndexesExtended`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
var bucketDir string
var bucketHotCount int64
var bucketWarmCount int64
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
if f.Content.BucketDirs.Home.HotBucketCount != "" {
bucketHotCount, err = strconv.ParseInt(f.Content.BucketDirs.Home.HotBucketCount, 10, 64)
bucketDir = "hot"
if err != nil {
errs <- err
}
s.mb.RecordSplunkDataIndexesExtendedBucketHotCountDataPoint(now, bucketHotCount, name, bucketDir, i.Build, i.Version)
}
if f.Content.BucketDirs.Home.WarmBucketCount != "" {
bucketWarmCount, err = strconv.ParseInt(f.Content.BucketDirs.Home.WarmBucketCount, 10, 64)
bucketDir = "warm"
if err != nil {
errs <- err
}
s.mb.RecordSplunkDataIndexesExtendedBucketWarmCountDataPoint(now, bucketWarmCount, name, bucketDir, i.Build, i.Version)
}
}
}
// Scrape introspection queues
func (s *splunkScraper) scrapeIntrospectionQueues(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkServerIntrospectionQueuesCurrent.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it introspectionQueues
ept := apiDict[`SplunkIntrospectionQueues`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
currentQueuesSize := int64(f.Content.CurrentSize)
s.mb.RecordSplunkServerIntrospectionQueuesCurrentDataPoint(now, currentQueuesSize, name, i.Build, i.Version)
}
}
// Scrape introspection queues bytes
func (s *splunkScraper) scrapeIntrospectionQueuesBytes(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkServerIntrospectionQueuesCurrentBytes.Enabled || !s.splunkClient.isConfigured(typeIdx) {
return
}
i := info[typeIdx].Entries[0].Content
var it introspectionQueues
ept := apiDict[`SplunkIntrospectionQueues`]
req, err := s.splunkClient.createAPIRequest(typeIdx, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &it)
if err != nil {
errs <- err
return
}
var name string
for _, f := range it.Entries {
if f.Name != "" {
name = f.Name
}
currentQueueSizeBytes := int64(f.Content.CurrentSizeBytes)
s.mb.RecordSplunkServerIntrospectionQueuesCurrentBytesDataPoint(now, currentQueueSizeBytes, name, i.Build, i.Version)
}
}
// Scrape introspection kv store status
func (s *splunkScraper) scrapeKVStoreStatus(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkKvstoreStatus.Enabled ||
!s.conf.Metrics.SplunkKvstoreReplicationStatus.Enabled ||
!s.conf.Metrics.SplunkKvstoreBackupStatus.Enabled ||
!s.splunkClient.isConfigured(typeCm) {
return
}
i := info[typeCm].Entries[0].Content
var kvs kvStoreStatus
ept := apiDict[`SplunkKVStoreStatus`]
req, err := s.splunkClient.createAPIRequest(typeCm, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&kvs); err != nil {
errs <- err
return
}
var st, brs, rs, se, ext string
for _, kv := range kvs.Entries {
st = kv.Content.Current.Status // overall status
brs = kv.Content.Current.BackupRestoreStatus
rs = kv.Content.Current.ReplicationStatus
se = kv.Content.Current.StorageEngine
ext = kv.Content.KVService.Status
// a 0 gauge value means that the metric was not reported in the api call
// to the introspection endpoint.
if st == "" {
st = kvStatusUnknown
// set to 0 to indicate no status being reported
s.mb.RecordSplunkKvstoreStatusDataPoint(now, 0, se, ext, st, i.Build, i.Version)
} else {
s.mb.RecordSplunkKvstoreStatusDataPoint(now, 1, se, ext, st, i.Build, i.Version)
}
if rs == "" {
rs = kvRestoreStatusUnknown
s.mb.RecordSplunkKvstoreReplicationStatusDataPoint(now, 0, rs, i.Build, i.Version)
} else {
s.mb.RecordSplunkKvstoreReplicationStatusDataPoint(now, 1, rs, i.Build, i.Version)
}
if brs == "" {
brs = kvBackupStatusFailed
s.mb.RecordSplunkKvstoreBackupStatusDataPoint(now, 0, brs, i.Build, i.Version)
} else {
s.mb.RecordSplunkKvstoreBackupStatusDataPoint(now, 1, brs, i.Build, i.Version)
}
}
}
// Scrape dispatch artifacts
func (s *splunkScraper) scrapeSearchArtifacts(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
// if NONE of the metrics set in this scrape are set we return early
if !s.conf.Metrics.SplunkServerSearchartifactsAdhoc.Enabled && !s.conf.Metrics.SplunkServerSearchartifactsScheduled.Enabled && !s.conf.Metrics.SplunkServerSearchartifactsCompleted.Enabled && !s.conf.Metrics.SplunkServerSearchartifactsIncomplete.Enabled && !s.conf.Metrics.SplunkServerSearchartifactsInvalid.Enabled && !s.conf.Metrics.SplunkServerSearchartifactsSavedsearches.Enabled && !s.conf.Metrics.SplunkServerSearchartifactsJobCacheSize.Enabled && !s.conf.Metrics.SplunkServerSearchartifactsJobCacheCount.Enabled {
return
}
if !s.splunkClient.isConfigured(typeSh) {
return
}
i := info[typeSh].Entries[0].Content
var da dispatchArtifacts
ept := apiDict[`SplunkDispatchArtifacts`]
req, err := s.splunkClient.createAPIRequest(typeSh, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return
}
err = json.Unmarshal(body, &da)
if err != nil {
errs <- err
return
}
for _, f := range da.Entries {
if s.conf.Metrics.SplunkServerSearchartifactsAdhoc.Enabled {
adhocCount, err := strconv.ParseInt(f.Content.AdhocCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsAdhocDataPoint(now, adhocCount, s.conf.SHEndpoint.Endpoint, i.Build, i.Version)
}
if s.conf.Metrics.SplunkServerSearchartifactsScheduled.Enabled {
scheduledCount, err := strconv.ParseInt(f.Content.ScheduledCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsScheduledDataPoint(now, scheduledCount, s.conf.SHEndpoint.Endpoint, i.Build, i.Version)
}
if s.conf.Metrics.SplunkServerSearchartifactsCompleted.Enabled {
completedCount, err := strconv.ParseInt(f.Content.CompletedCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsCompletedDataPoint(now, completedCount, s.conf.SHEndpoint.Endpoint, i.Build, i.Version)
}
if s.conf.Metrics.SplunkServerSearchartifactsIncomplete.Enabled {
incompleteCount, err := strconv.ParseInt(f.Content.IncompleteCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsIncompleteDataPoint(now, incompleteCount, s.conf.SHEndpoint.Endpoint, i.Build, i.Version)
}
if s.conf.Metrics.SplunkServerSearchartifactsInvalid.Enabled {
invalidCount, err := strconv.ParseInt(f.Content.InvalidCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsInvalidDataPoint(now, invalidCount, s.conf.SHEndpoint.Endpoint, i.Build, i.Version)
}
if s.conf.Metrics.SplunkServerSearchartifactsSavedsearches.Enabled {
savedSearchesCount, err := strconv.ParseInt(f.Content.SavedSearchesCount, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsSavedsearchesDataPoint(now, savedSearchesCount, s.conf.SHEndpoint.Endpoint, i.Build, i.Version)
}
if s.conf.Metrics.SplunkServerSearchartifactsJobCacheSize.Enabled {
infoCacheSize, err := strconv.ParseInt(f.Content.InfoCacheSize, 10, 64)
if err != nil {
errs <- err
}
statusCacheSize, err := strconv.ParseInt(f.Content.StatusCacheSize, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsJobCacheSizeDataPoint(now, infoCacheSize, s.conf.SHEndpoint.Endpoint, "info", i.Build, i.Version)
s.mb.RecordSplunkServerSearchartifactsJobCacheSizeDataPoint(now, statusCacheSize, s.conf.SHEndpoint.Endpoint, "status", i.Build, i.Version)
}
if s.conf.Metrics.SplunkServerSearchartifactsJobCacheCount.Enabled {
cacheTotalEntries, err := strconv.ParseInt(f.Content.CacheTotalEntries, 10, 64)
if err != nil {
errs <- err
}
s.mb.RecordSplunkServerSearchartifactsJobCacheCountDataPoint(now, cacheTotalEntries, s.conf.SHEndpoint.Endpoint, i.Build, i.Version)
}
}
}
// Scrape Health Introspection Endpoint
func (s *splunkScraper) scrapeHealth(_ context.Context, now pcommon.Timestamp, info infoDict, errs chan error) {
if !s.conf.Metrics.SplunkHealth.Enabled {
return
}
i := info[typeCm].Entries[0].Content
ept := apiDict[`SplunkHealth`]
var ha healthArtifacts
req, err := s.splunkClient.createAPIRequest(typeCm, ept)
if err != nil {
errs <- err
return
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&ha); err != nil {
errs <- err
return
}
s.settings.Logger.Debug(fmt.Sprintf("Features: %s", ha.Entries))
for _, details := range ha.Entries {
s.traverseHealthDetailFeatures(details.Content, now, i)
}
}
func (s *splunkScraper) traverseHealthDetailFeatures(details healthDetails, now pcommon.Timestamp, i InfoContent) {
if details.Features == nil {
return
}
for k, feature := range details.Features {
if feature.Health != "red" {
s.settings.Logger.Debug(feature.Health)
s.mb.RecordSplunkHealthDataPoint(now, 1, k, feature.Health, i.Build, i.Version)
} else {
s.settings.Logger.Debug(feature.Health)
s.mb.RecordSplunkHealthDataPoint(now, 0, k, feature.Health, i.Build, i.Version)
}
s.traverseHealthDetailFeatures(feature, now, i)
}
}
// somewhat unique scrape function for gathering the info attribute
func (s *splunkScraper) scrapeInfo(_ context.Context, _ pcommon.Timestamp, errs chan error) map[any]Info {
// there could be an endpoint configured for each type (never more than 3)
info := make(infoDict)
nullInfo := Info{Host: "", Entries: make([]InfoEntry, 1)}
info[typeCm] = nullInfo
info[typeSh] = nullInfo
info[typeIdx] = nullInfo
for cliType := range s.splunkClient.clients {
var i Info
ept := apiDict[`SplunkInfo`]
req, err := s.splunkClient.createAPIRequest(cliType, ept)
if err != nil {
errs <- err
return info
}
res, err := s.splunkClient.makeRequest(req)
if err != nil {
errs <- err
return info
}
defer res.Body.Close()
body, err := io.ReadAll(res.Body)
if err != nil {
errs <- err
return info
}
err = json.Unmarshal(body, &i)
if err != nil {
errs <- err
return info
}
info[cliType] = i
}
return info
}