aitelemetry/telemetrywrapper.go (274 lines of code) (raw):

package aitelemetry import ( "fmt" "os" "runtime" "time" "github.com/Azure/azure-container-networking/common" "github.com/Azure/azure-container-networking/log" "github.com/Azure/azure-container-networking/processlock" "github.com/Azure/azure-container-networking/store" "github.com/microsoft/ApplicationInsights-Go/appinsights" "github.com/microsoft/ApplicationInsights-Go/appinsights/contracts" ) const ( resourceGroupStr = "ResourceGroup" vmSizeStr = "VMSize" osVersionStr = "OSVersion" osStr = "OS" locationStr = "Region" appNameStr = "AppName" subscriptionIDStr = "SubscriptionID" vmNameStr = "VMName" vmIDStr = "VMID" versionStr = "AppVersion" azurePublicCloudStr = "AzurePublicCloud" hostNameKey = "hostname" defaultTimeout = 10 maxCloseTimeoutInSeconds = 30 defaultBatchIntervalInSecs = 15 defaultBatchSizeInBytes = 32768 defaultGetEnvRetryCount = 5 defaultGetEnvRetryWaitTimeInSecs = 3 defaultRefreshTimeoutInSecs = 10 ) type Level = contracts.SeverityLevel const ( DebugLevel Level = contracts.Verbose InfoLevel Level = contracts.Information WarnLevel Level = contracts.Warning ErrorLevel Level = contracts.Error PanicLevel Level = contracts.Critical FatalLevel Level = contracts.Critical ) var debugMode bool func setAIConfigDefaults(config *AIConfig) { if config.RefreshTimeout == 0 { config.RefreshTimeout = defaultRefreshTimeoutInSecs } if config.BatchInterval == 0 { config.BatchInterval = defaultBatchIntervalInSecs } if config.BatchSize == 0 { config.BatchSize = defaultBatchSizeInBytes } if config.GetEnvRetryCount == 0 { config.GetEnvRetryCount = defaultGetEnvRetryCount } if config.GetEnvRetryWaitTimeInSecs == 0 { config.GetEnvRetryWaitTimeInSecs = defaultGetEnvRetryWaitTimeInSecs } } func messageListener() appinsights.DiagnosticsMessageListener { if debugMode { return appinsights.NewDiagnosticsMessageListener(func(msg string) error { debugLog("[AppInsights] [%s] %s\n", time.Now().Format(time.UnixDate), msg) return nil }) } return nil } func debugLog(format string, args ...interface{}) { if debugMode { log.Printf(format, args...) } } func getMetadata(th *telemetryHandle) { var metadata common.Metadata var err error if th.refreshTimeout < 4 { th.refreshTimeout = defaultTimeout } // check if metadata in memory otherwise initiate wireserver request for { metadata, err = common.GetHostMetadata(metadataFile) if err == nil || th.disableMetadataRefreshThread { break } debugLog("[AppInsights] Error getting metadata %v. Sleep for %d", err, th.refreshTimeout) time.Sleep(time.Duration(th.refreshTimeout) * time.Second) } if err != nil { debugLog("[AppInsights] Error getting metadata %v", err) return } // acquire write lock before writing metadata to telemetry handle th.rwmutex.Lock() th.metadata = metadata th.rwmutex.Unlock() lockclient, err := processlock.NewFileLock(metadataFile + store.LockExtension) if err != nil { log.Printf("Error initializing file lock:%v", err) return } // Save metadata retrieved from wireserver to a file kvs, err := store.NewJsonFileStore(metadataFile, lockclient, nil) if err != nil { debugLog("[AppInsights] Error initializing kvs store: %v", err) return } // Acquire store lock. if err = kvs.Lock(store.DefaultLockTimeout); err != nil { log.Errorf("getMetadata: Not able to acquire lock:%v", err) return } metadataErr := common.SaveHostMetadata(th.metadata, metadataFile) err = kvs.Unlock() if err != nil { log.Errorf("getMetadata: Not able to release lock:%v", err) } if metadataErr != nil { debugLog("[AppInsights] saving host metadata failed with :%v", err) } } func isPublicEnvironment(url string, retryCount, waitTimeInSecs int) (bool, error) { var ( cloudName string err error ) for i := 0; i < retryCount; i++ { cloudName, err = common.GetAzureCloud(url) if cloudName == azurePublicCloudStr { debugLog("[AppInsights] CloudName: %s\n", cloudName) return true, nil } else if err == nil { debugLog("[AppInsights] This is not azure public cloud:%s", cloudName) return false, fmt.Errorf("Not an azure public cloud: %s", cloudName) } debugLog("GetAzureCloud returned err :%v", err) time.Sleep(time.Duration(waitTimeInSecs) * time.Second) } return false, err } // NewAITelemetry creates telemetry handle with user specified appinsights id. func NewAITelemetry( azEnvUrl string, id string, aiConfig AIConfig, ) (TelemetryHandle, error) { debugMode = aiConfig.DebugMode if id == "" { debugLog("Empty AI key") return nil, fmt.Errorf("AI key is empty") } setAIConfigDefaults(&aiConfig) // check if azure instance is in public cloud isPublic, err := isPublicEnvironment(azEnvUrl, aiConfig.GetEnvRetryCount, aiConfig.GetEnvRetryWaitTimeInSecs) if !isPublic { return nil, err } telemetryConfig := appinsights.NewTelemetryConfiguration(id) telemetryConfig.MaxBatchSize = aiConfig.BatchSize telemetryConfig.MaxBatchInterval = time.Duration(aiConfig.BatchInterval) * time.Second th := &telemetryHandle{ client: appinsights.NewTelemetryClientFromConfig(telemetryConfig), appName: aiConfig.AppName, appVersion: aiConfig.AppVersion, diagListener: messageListener(), disableMetadataRefreshThread: aiConfig.DisableMetadataRefreshThread, refreshTimeout: aiConfig.RefreshTimeout, } if th.disableMetadataRefreshThread { getMetadata(th) } else { go getMetadata(th) } return th, nil } // TrackLog function sends report (trace) to appinsights resource. It overrides few of the existing columns with app information // and for rest it uses custom dimesion func (th *telemetryHandle) TrackLog(report Report) { // Initialize new trace message trace := appinsights.NewTraceTelemetry(report.Message, report.Level) // will be empty if cns used as telemetry service for cni if th.appVersion == "" { th.appVersion = report.AppVersion } // Override few of existing columns with metadata trace.Tags.User().SetAuthUserId(runtime.GOOS) trace.Tags.Operation().SetId(report.Context) trace.Tags.Operation().SetParentId(th.appVersion) trace.Tags.Application().SetVer(th.appVersion) trace.Properties[hostNameKey], _ = os.Hostname() // copy app specified custom dimension for key, value := range report.CustomDimensions { trace.Properties[key] = value } trace.Properties[appNameStr] = th.appName // Acquire read lock to read metadata th.rwmutex.RLock() metadata := th.metadata th.rwmutex.RUnlock() // Check if metadata is populated if metadata.SubscriptionID != "" { // copy metadata from wireserver to trace trace.Tags.User().SetAccountId(metadata.SubscriptionID) trace.Tags.User().SetId(metadata.VMName) trace.Properties[locationStr] = metadata.Location trace.Properties[resourceGroupStr] = metadata.ResourceGroupName trace.Properties[vmSizeStr] = metadata.VMSize trace.Properties[osVersionStr] = metadata.OSVersion trace.Properties[vmIDStr] = metadata.VMID trace.Tags.Session().SetId(metadata.VMID) } // send to appinsights resource th.client.Track(trace) } // TrackEvent function sends events to appinsights resource. It overrides a few of the existing columns // with app information. func (th *telemetryHandle) TrackEvent(event Event) { // Initialize new event message aiEvent := appinsights.NewEventTelemetry(event.EventName) // OperationId => resourceID (e.g.: NCID) aiEvent.Tags.Operation().SetId(event.ResourceID) // Copy the properties, if supplied if event.Properties != nil { for key, value := range event.Properties { aiEvent.Properties[key] = value } } // Acquire read lock to read metadata th.rwmutex.RLock() metadata := th.metadata th.rwmutex.RUnlock() // Add metadata if metadata.SubscriptionID != "" { aiEvent.Tags.User().SetAccountId(metadata.SubscriptionID) // AnonId => VMName aiEvent.Tags.User().SetId(metadata.VMName) // SessionId => VMID aiEvent.Tags.Session().SetId(metadata.VMID) aiEvent.Properties[locationStr] = metadata.Location aiEvent.Properties[resourceGroupStr] = metadata.ResourceGroupName aiEvent.Properties[vmSizeStr] = metadata.VMSize aiEvent.Properties[osVersionStr] = metadata.OSVersion aiEvent.Properties[vmIDStr] = metadata.VMID aiEvent.Properties[vmNameStr] = metadata.VMName } aiEvent.Tags.Operation().SetParentId(th.appVersion) aiEvent.Tags.User().SetAuthUserId(runtime.GOOS) aiEvent.Properties[osStr] = runtime.GOOS aiEvent.Properties[appNameStr] = th.appName aiEvent.Properties[versionStr] = th.appVersion th.client.Track(aiEvent) } // TrackMetric function sends metric to appinsights resource. It overrides few of the existing columns with app information // and for rest it uses custom dimesion func (th *telemetryHandle) TrackMetric(metric Metric) { // Initialize new metric aimetric := appinsights.NewMetricTelemetry(metric.Name, metric.Value) // Acquire read lock to read metadata th.rwmutex.RLock() metadata := th.metadata th.rwmutex.RUnlock() if th.appVersion == "" { th.appVersion = metric.AppVersion } // Check if metadata is populated if metadata.SubscriptionID != "" { aimetric.Properties[locationStr] = metadata.Location aimetric.Properties[subscriptionIDStr] = metadata.SubscriptionID aimetric.Properties[vmNameStr] = metadata.VMName aimetric.Properties[versionStr] = th.appVersion aimetric.Properties[resourceGroupStr] = th.metadata.ResourceGroupName aimetric.Properties[vmIDStr] = metadata.VMID aimetric.Properties[osStr] = runtime.GOOS aimetric.Tags.Session().SetId(metadata.VMID) } // copy custom dimensions for key, value := range metric.CustomDimensions { aimetric.Properties[key] = value } // send metric to appinsights th.client.Track(aimetric) } // Close - should be called for each NewAITelemetry call. Will release resources acquired func (th *telemetryHandle) Close(timeout int) { if timeout <= 0 { timeout = defaultTimeout } // max wait is the minimum of the timeout and maxCloseTimeoutInSeconds maxWaitTimeInSeconds := timeout if maxWaitTimeInSeconds < maxCloseTimeoutInSeconds { maxWaitTimeInSeconds = maxCloseTimeoutInSeconds } // wait for items to be sent otherwise timeout // similar to the example in the appinsights-go repo: https://github.com/microsoft/ApplicationInsights-Go#shutdown timer := time.NewTimer(time.Duration(maxWaitTimeInSeconds) * time.Second) defer timer.Stop() select { case <-th.client.Channel().Close(time.Duration(timeout) * time.Second): // timeout specified for retries. // If we got here, then all telemetry was submitted // successfully, and we can proceed to exiting. case <-timer.C: // absolute timeout. This covers any // previous telemetry submission that may not have // completed before Close was called. // There are a number of reasons we could have // reached here. We gave it a go, but telemetry // submission failed somewhere. Perhaps old events // were still retrying, or perhaps we're throttled. // Either way, we don't want to wait around for it // to complete, so let's just exit. } // Remove diganostic message listener if th.diagListener != nil { th.diagListener.Remove() th.diagListener = nil } } // Flush - forces the current queue to be sent func (th *telemetryHandle) Flush() { th.client.Channel().Flush() }