pkg/telemetry/telemetry.go (276 lines of code) (raw):
// Copyright 2018-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License"). You may not use this file except in compliance with the License. A copy of the License is located at
//
// http://aws.amazon.com/apache2.0/
//
// or in the "license" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and limitations under the License.
package telemetry
import (
"os"
"sync/atomic"
"time"
"unsafe"
"github.com/aws/aws-xray-daemon/pkg/conn"
"github.com/aws/aws-xray-daemon/pkg/util/timer"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/xray"
log "github.com/cihub/seelog"
)
const dataCutoffIntervalSecs = 60
const bufferSize = 30
const requestSize = 10
// T is instance of Telemetry.
var T *Telemetry
// Telemetry is used to record X-Ray daemon health.
type Telemetry struct {
// Instance of XRay.
client conn.XRay
timer timer.Timer
// Amazon Resource Name (ARN) of the AWS resource running the daemon.
resourceARN string
// Instance id of the EC2 instance running X-Ray daemon.
instanceID string
// Host name of the EC2 instance running X-Ray daemon.
hostname string
// Self pointer.
currentRecord *xray.TelemetryRecord
// Timer channel.
timerChan <-chan time.Time
// Boolean channel, set to true when Quit channel is set to true.
Done chan bool
// Boolean channel, set to true when daemon is closed,
Quit chan bool
// Channel of TelemetryRecord used to send to X-Ray service.
recordChan chan *xray.TelemetryRecord
// When segment is received, postTelemetry is set to true,
// indicating send telemetry data for the received segment.
postTelemetry bool
}
// Init instantiates a new instance of Telemetry.
func Init(awsConfig *aws.Config, s *session.Session, resourceARN string, noMetadata bool) {
T = newT(awsConfig, s, resourceARN, noMetadata)
log.Debug("Telemetry initiated")
}
// EvaluateConnectionError processes error with respect to request failure status code.
func EvaluateConnectionError(err error) {
requestFailure, ok := err.(awserr.RequestFailure)
if ok {
statusCode := requestFailure.StatusCode()
if statusCode >= 500 && statusCode < 600 {
T.Connection5xx(1)
} else if statusCode >= 400 && statusCode < 500 {
T.Connection4xx(1)
} else {
T.ConnectionOther(1)
}
} else {
if conn.IsTimeoutError(err) {
T.ConnectionTimeout(1)
} else {
awsError, ok := err.(awserr.Error)
if ok {
if awsError.Code() == "RequestError" {
T.ConnectionUnknownHost(1)
}
} else {
T.ConnectionOther(1)
}
}
}
}
// GetTestTelemetry returns an empty telemetry record.
func GetTestTelemetry() *Telemetry {
return &Telemetry{
currentRecord: getEmptyTelemetryRecord(),
}
}
// SegmentReceived increments SegmentsReceivedCount for the Telemetry record.
func (t *Telemetry) SegmentReceived(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsReceivedCount, count)
// Only send telemetry data when we receive any segment or else skip any telemetry data
t.postTelemetry = true
}
// SegmentSent increments SegmentsSentCount for the Telemetry record.
func (t *Telemetry) SegmentSent(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsSentCount, count)
}
// SegmentSpillover increments SegmentsSpilloverCount for the Telemetry record.
func (t *Telemetry) SegmentSpillover(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsSpilloverCount, count)
}
// SegmentRejected increments SegmentsRejectedCount for the Telemetry record.
func (t *Telemetry) SegmentRejected(count int64) {
atomic.AddInt64(t.currentRecord.SegmentsRejectedCount, count)
}
// ConnectionTimeout increments TimeoutCount for the Telemetry record.
func (t *Telemetry) ConnectionTimeout(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.TimeoutCount, count)
}
// ConnectionRefusal increments ConnectionRefusedCount for the Telemetry record.
func (t *Telemetry) ConnectionRefusal(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.ConnectionRefusedCount, count)
}
// Connection4xx increments HTTPCode4XXCount for the Telemetry record.
func (t *Telemetry) Connection4xx(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.HTTPCode4XXCount, count)
}
// Connection5xx increments HTTPCode5XXCount count for the Telemetry record.
func (t *Telemetry) Connection5xx(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.HTTPCode5XXCount, count)
}
// ConnectionUnknownHost increments unknown host BackendConnectionErrors count for the Telemetry record.
func (t *Telemetry) ConnectionUnknownHost(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.UnknownHostCount, count)
}
// ConnectionOther increments other BackendConnectionErrors count for the Telemetry record.
func (t *Telemetry) ConnectionOther(count int64) {
atomic.AddInt64(t.currentRecord.BackendConnectionErrors.OtherCount, count)
}
func newT(awsConfig *aws.Config, s *session.Session, resourceARN string, noMetadata bool) *Telemetry {
timer := &timer.Client{}
hostname := ""
instanceID := ""
var metadataClient *ec2metadata.EC2Metadata
if !noMetadata {
metadataClient = ec2metadata.New(s)
}
hostnameEnv := os.Getenv("AWS_HOSTNAME")
if hostnameEnv != "" {
hostname = hostnameEnv
log.Debugf("Fetch hostname %v from environment variables", hostnameEnv)
} else if metadataClient != nil {
hn, err := metadataClient.GetMetadata("hostname")
if err != nil {
log.Debugf("Get hostname metadata failed: %s", err)
} else {
hostname = hn
log.Debugf("Using %v hostname for telemetry records", hostname)
}
} else {
log.Debug("No hostname set for telemetry records")
}
instanceIDEnv := os.Getenv("AWS_INSTANCE_ID")
if instanceIDEnv != "" {
instanceID = instanceIDEnv
log.Debugf("Fetch instance ID %v from environment variables", instanceIDEnv)
} else if metadataClient != nil {
instID, err := metadataClient.GetMetadata("instance-id")
if err != nil {
log.Errorf("Get instance id metadata failed: %s", err)
} else {
instanceID = instID
log.Debugf("Using %v Instance Id for Telemetry records", instanceID)
}
} else {
log.Debug("No Instance Id set for telemetry records")
}
record := getEmptyTelemetryRecord()
t := &Telemetry{
timer: timer,
resourceARN: resourceARN,
instanceID: instanceID,
hostname: hostname,
currentRecord: record,
timerChan: getDataCutoffDelay(timer),
Done: make(chan bool),
Quit: make(chan bool),
recordChan: make(chan *xray.TelemetryRecord, bufferSize),
postTelemetry: false,
}
telemetryClient := conn.NewXRay(awsConfig, s)
t.client = telemetryClient
go t.pushData()
return t
}
func getZeroInt64() *int64 {
var zero int64
zero = 0
return &zero
}
func getEmptyTelemetryRecord() *xray.TelemetryRecord {
return &xray.TelemetryRecord{
SegmentsReceivedCount: getZeroInt64(),
SegmentsRejectedCount: getZeroInt64(),
SegmentsSentCount: getZeroInt64(),
SegmentsSpilloverCount: getZeroInt64(),
BackendConnectionErrors: &xray.BackendConnectionErrors{
HTTPCode4XXCount: getZeroInt64(),
HTTPCode5XXCount: getZeroInt64(),
ConnectionRefusedCount: getZeroInt64(),
OtherCount: getZeroInt64(),
TimeoutCount: getZeroInt64(),
UnknownHostCount: getZeroInt64(),
},
}
}
func (t *Telemetry) pushData() {
for {
quit := false
select {
case <-t.Quit:
quit = true
break
case <-t.timerChan:
}
emptyRecord := getEmptyTelemetryRecord()
recordToReport := unsafe.Pointer(emptyRecord)
recordToPushPointer := unsafe.Pointer(t.currentRecord)
// Rotation Logic:
// Swap current record to record to report.
// Record to report is set to empty record which is set to current record
t.currentRecord = (*xray.TelemetryRecord)(atomic.SwapPointer(&recordToReport,
recordToPushPointer))
currentTime := time.Now()
record := (*xray.TelemetryRecord)(recordToReport)
record.Timestamp = ¤tTime
t.add(record)
t.sendAll()
if quit {
close(t.recordChan)
log.Debug("telemetry: done!")
t.Done <- true
break
} else {
t.timerChan = getDataCutoffDelay(t.timer)
}
}
}
func (t *Telemetry) add(record *xray.TelemetryRecord) {
// Only send telemetry data when we receive first segment or else do not send any telemetry data.
if t.postTelemetry {
select {
case t.recordChan <- record:
default:
select {
case <-t.recordChan:
log.Debug("Telemetry Buffers truncated")
t.add(record)
default:
log.Debug("Telemetry Buffers dequeued")
}
}
} else {
log.Debug("Skipped telemetry data as no segments found")
}
}
func (t *Telemetry) sendAll() {
records := t.collectAllRecords()
recordsNoSend, err := t.sendRecords(records)
if err != nil {
log.Debugf("Failed to send telemetry %v record(s). Re-queue records. %v", len(records), err)
// There might be possibility that new records might be archived during re-queue records.
// But as timer is set after records are send this will not happen
for _, record := range recordsNoSend {
t.add(record)
}
}
}
func (t *Telemetry) collectAllRecords() []*xray.TelemetryRecord {
records := make([]*xray.TelemetryRecord, bufferSize)
records = records[:0]
var record *xray.TelemetryRecord
done := false
for !done {
select {
case record = <-t.recordChan:
recordLen := len(records)
if recordLen < bufferSize {
records = append(records, record)
}
default:
done = true
}
}
return records
}
func (t *Telemetry) sendRecords(records []*xray.TelemetryRecord) ([]*xray.TelemetryRecord, error) {
if len(records) > 0 {
for i := 0; i < len(records); i = i + requestSize {
endIndex := len(records)
if endIndex > i+requestSize {
endIndex = i + requestSize
}
recordsToSend := records[i:endIndex]
input := xray.PutTelemetryRecordsInput{
EC2InstanceId: &t.instanceID,
Hostname: &t.hostname,
ResourceARN: &t.resourceARN,
TelemetryRecords: recordsToSend,
}
_, err := t.client.PutTelemetryRecords(&input)
if err != nil {
EvaluateConnectionError(err)
return records[i:], err
}
}
log.Debugf("Send %v telemetry record(s)", len(records))
}
return nil, nil
}
func getDataCutoffDelay(timer timer.Timer) <-chan time.Time {
return timer.After(time.Duration(time.Second * dataCutoffIntervalSecs))
}