func()

in plugins/inputs/influxdb/influxdb.go [133:258]


func (i *InfluxDB) gatherURL(
	acc telegraf.Accumulator,
	url string,
) error {
	shardCounter := 0
	now := time.Now()

	resp, err := i.client.Get(url)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	// It would be nice to be able to decode into a map[string]point, but
	// we'll get a decoder error like:
	// `json: cannot unmarshal array into Go value of type influxdb.point`
	// if any of the values aren't objects.
	// To avoid that error, we decode by hand.
	dec := json.NewDecoder(resp.Body)

	// Parse beginning of object
	if t, err := dec.Token(); err != nil {
		return err
	} else if t != json.Delim('{') {
		return errors.New("document root must be a JSON object")
	}

	// Loop through rest of object
	for {
		// Nothing left in this object, we're done
		if !dec.More() {
			break
		}

		// Read in a string key. We don't do anything with the top-level keys,
		// so it's discarded.
		key, err := dec.Token()
		if err != nil {
			return err
		}

		if keyStr, ok := key.(string); ok {
			if keyStr == "memstats" {
				var m memstats
				if err := dec.Decode(&m); err != nil {
					continue
				}
				acc.AddFields("influxdb_memstats",
					map[string]interface{}{
						"alloc":           m.Alloc,
						"total_alloc":     m.TotalAlloc,
						"sys":             m.Sys,
						"lookups":         m.Lookups,
						"mallocs":         m.Mallocs,
						"frees":           m.Frees,
						"heap_alloc":      m.HeapAlloc,
						"heap_sys":        m.HeapSys,
						"heap_idle":       m.HeapIdle,
						"heap_inuse":      m.HeapInuse,
						"heap_released":   m.HeapReleased,
						"heap_objects":    m.HeapObjects,
						"stack_inuse":     m.StackInuse,
						"stack_sys":       m.StackSys,
						"mspan_inuse":     m.MSpanInuse,
						"mspan_sys":       m.MSpanSys,
						"mcache_inuse":    m.MCacheInuse,
						"mcache_sys":      m.MCacheSys,
						"buck_hash_sys":   m.BuckHashSys,
						"gc_sys":          m.GCSys,
						"other_sys":       m.OtherSys,
						"next_gc":         m.NextGC,
						"last_gc":         m.LastGC,
						"pause_total_ns":  m.PauseTotalNs,
						"pause_ns":        m.PauseNs[(m.NumGC+255)%256],
						"num_gc":          m.NumGC,
						"gcc_pu_fraction": m.GCCPUFraction,
					},
					map[string]string{
						"url": url,
					})
			}
		}

		// Attempt to parse a whole object into a point.
		// It might be a non-object, like a string or array.
		// If we fail to decode it into a point, ignore it and move on.
		var p point
		if err := dec.Decode(&p); err != nil {
			continue
		}

		if p.Tags == nil {
			p.Tags = make(map[string]string)
		}

		// If the object was a point, but was not fully initialized,
		// ignore it and move on.
		if p.Name == "" || p.Values == nil || len(p.Values) == 0 {
			continue
		}

		if p.Name == "shard" {
			shardCounter++
		}

		// Add a tag to indicate the source of the data.
		p.Tags["url"] = url

		acc.AddFields(
			"influxdb_"+p.Name,
			p.Values,
			p.Tags,
			now,
		)
	}

	acc.AddFields("influxdb",
		map[string]interface{}{
			"n_shards": shardCounter,
		},
		nil,
		now,
	)

	return nil
}