in source/plugins/go/src/oms.go [1525:1819]
func InitializePlugin(pluginConfPath string, agentVersion string) {
go func() {
isTest := os.Getenv("ISTEST")
if strings.Compare(strings.ToLower(strings.TrimSpace(isTest)), "true") == 0 {
e1 := http.ListenAndServe("localhost:6060", nil)
if e1 != nil {
Log("HTTP Listen Error: %s \n", e1.Error())
}
}
}()
StdoutIgnoreNsSet = make(map[string]bool)
StderrIgnoreNsSet = make(map[string]bool)
ImageIDMap = make(map[string]string)
NameIDMap = make(map[string]string)
// Keeping the two error hashes separate since we need to keep the config error hash for the lifetime of the container
// whereas the prometheus scrape error hash needs to be refreshed every hour
ConfigErrorEvent = make(map[string]KubeMonAgentEventTags)
PromScrapeErrorEvent = make(map[string]KubeMonAgentEventTags)
// Initializing this to true to skip the first kubemonagentevent flush since the errors are not populated at this time
skipKubeMonEventsFlush = true
enrichContainerLogsSetting := os.Getenv("AZMON_CLUSTER_CONTAINER_LOG_ENRICH")
if strings.Compare(enrichContainerLogsSetting, "true") == 0 {
enrichContainerLogs = true
Log("ContainerLogEnrichment=true \n")
} else {
enrichContainerLogs = false
Log("ContainerLogEnrichment=false \n")
}
pluginConfig, err := ReadConfiguration(pluginConfPath)
if err != nil {
message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error())
Log(message)
SendException(message)
time.Sleep(30 * time.Second)
log.Fatalln(message)
}
ContainerType = os.Getenv(ContainerTypeEnv)
Log("Container Type %s", ContainerType)
osType := os.Getenv("OS_TYPE")
IsWindows = false
// Linux
if strings.Compare(strings.ToLower(osType), "windows") != 0 {
Log("Reading configuration for Linux from %s", pluginConfPath)
WorkspaceID = os.Getenv("WSID")
if WorkspaceID == "" {
message := fmt.Sprintf("WorkspaceID shouldnt be empty")
Log(message)
SendException(message)
time.Sleep(30 * time.Second)
log.Fatalln(message)
}
LogAnalyticsWorkspaceDomain = os.Getenv("DOMAIN")
if LogAnalyticsWorkspaceDomain == "" {
message := fmt.Sprintf("Workspace DOMAIN shouldnt be empty")
Log(message)
SendException(message)
time.Sleep(30 * time.Second)
log.Fatalln(message)
}
OMSEndpoint = "https://" + WorkspaceID + ".ods." + LogAnalyticsWorkspaceDomain + "/OperationalData.svc/PostJsonDataItems"
// Populate Computer field
containerHostName, err1 := ioutil.ReadFile(pluginConfig["container_host_file_path"])
if err1 != nil {
// It is ok to log here and continue, because only the Computer column will be missing,
// which can be deduced from a combination of containerId, and docker logs on the node
message := fmt.Sprintf("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error())
Log(message)
SendException(message)
} else {
Computer = strings.TrimSuffix(ToString(containerHostName), "\n")
}
// read proxyendpoint if proxy configured
ProxyEndpoint = ""
proxySecretPath := pluginConfig["omsproxy_secret_path"]
if _, err := os.Stat(proxySecretPath); err == nil {
Log("Reading proxy configuration for Linux from %s", proxySecretPath)
proxyConfig, err := ioutil.ReadFile(proxySecretPath)
if err != nil {
message := fmt.Sprintf("Error Reading omsproxy configuration %s\n", err.Error())
Log(message)
// if we fail to read proxy secret, AI telemetry might not be working as well
SendException(message)
} else {
ProxyEndpoint = strings.TrimSpace(string(proxyConfig))
}
}
} else {
// windows
IsWindows = true
Computer = os.Getenv("HOSTNAME")
WorkspaceID = os.Getenv("WSID")
logAnalyticsDomain := os.Getenv("DOMAIN")
ProxyEndpoint = os.Getenv("PROXY")
OMSEndpoint = "https://" + WorkspaceID + ".ods." + logAnalyticsDomain + "/OperationalData.svc/PostJsonDataItems"
}
Log("OMSEndpoint %s", OMSEndpoint)
IsAADMSIAuthMode = false
if strings.Compare(strings.ToLower(os.Getenv(AADMSIAuthMode)), "true") == 0 {
IsAADMSIAuthMode = true
Log("AAD MSI Auth Mode Configured")
}
ResourceID = os.Getenv(envAKSResourceID)
if len(ResourceID) > 0 {
//AKS Scenario
ResourceCentric = true
splitted := strings.Split(ResourceID, "/")
ResourceName = splitted[len(splitted)-1]
Log("ResourceCentric: True")
Log("ResourceID=%s", ResourceID)
Log("ResourceName=%s", ResourceID)
}
if ResourceCentric == false {
//AKS-Engine/hybrid scenario
ResourceName = os.Getenv(ResourceNameEnv)
ResourceID = ResourceName
Log("ResourceCentric: False")
Log("ResourceID=%s", ResourceID)
Log("ResourceName=%s", ResourceName)
}
// log runtime info for debug purpose
containerRuntime = os.Getenv(ContainerRuntimeEnv)
Log("Container Runtime engine %s", containerRuntime)
// set useragent to be used by ingestion
dockerCimprovVersionEnv := strings.TrimSpace(os.Getenv("DOCKER_CIMPROV_VERSION"))
if len(dockerCimprovVersionEnv) > 0 {
dockerCimprovVersion = dockerCimprovVersionEnv
}
userAgent = fmt.Sprintf("%s/%s", agentName, dockerCimprovVersion)
Log("Usage-Agent = %s \n", userAgent)
// Initialize image,name map refresh ticker
containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"])
if err != nil {
message := fmt.Sprintf("Error Reading Container Inventory Refresh Interval %s", err.Error())
Log(message)
SendException(message)
Log("Using Default Refresh Interval of %d s\n", defaultContainerInventoryRefreshInterval)
containerInventoryRefreshInterval = defaultContainerInventoryRefreshInterval
}
Log("containerInventoryRefreshInterval = %d \n", containerInventoryRefreshInterval)
ContainerImageNameRefreshTicker = time.NewTicker(time.Second * time.Duration(containerInventoryRefreshInterval))
Log("kubeMonAgentConfigEventFlushInterval = %d \n", kubeMonAgentConfigEventFlushInterval)
KubeMonAgentConfigEventsSendTicker = time.NewTicker(time.Minute * time.Duration(kubeMonAgentConfigEventFlushInterval))
Log("Computer == %s \n", Computer)
ret, err := InitializeTelemetryClient(agentVersion)
if ret != 0 || err != nil {
message := fmt.Sprintf("Error During Telemetry Initialization :%s", err.Error())
fmt.Printf(message)
Log(message)
}
// Initialize KubeAPI Client
config, err := rest.InClusterConfig()
if err != nil {
message := fmt.Sprintf("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
Log(message)
SendException(message)
}
ClientSet, err = kubernetes.NewForConfig(config)
if err != nil {
message := fmt.Sprintf("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
SendException(message)
Log(message)
}
PluginConfiguration = pluginConfig
ContainerLogsRoute := strings.TrimSpace(strings.ToLower(os.Getenv("AZMON_CONTAINER_LOGS_ROUTE")))
Log("AZMON_CONTAINER_LOGS_ROUTE:%s", ContainerLogsRoute)
ContainerLogsRouteV2 = false
ContainerLogsRouteADX = false
if strings.Compare(ContainerLogsRoute, ContainerLogsADXRoute) == 0 {
// Try to read the ADX database name from environment variables. Default to DefaultAdsDatabaseName if not set.
// This SHOULD be set by tomlparser.rb so it's a highly unexpected event if it isn't.
// It should be set by the logic in tomlparser.rb EVEN if ADX logging isn't enabled
AdxDatabaseName := strings.TrimSpace(os.Getenv("AZMON_ADX_DATABASE_NAME"))
// Check the len of the provided name for database and use default if 0, just to be sure
if len(AdxDatabaseName) == 0 {
Log("Adx database name unexpecedly empty (check config AND implementation, should have been set by tomlparser.rb?) - will default to '%s'", DefaultAdxDatabaseName)
AdxDatabaseName = DefaultAdxDatabaseName
}
//check if adx clusteruri, clientid & secret are set
var err error
AdxClusterUri, err = ReadFileContents(PluginConfiguration["adx_cluster_uri_path"])
if err != nil {
Log("Error when reading AdxClusterUri %s", err)
}
if !isValidUrl(AdxClusterUri) {
Log("Invalid AdxClusterUri %s", AdxClusterUri)
AdxClusterUri = ""
}
AdxClientID, err = ReadFileContents(PluginConfiguration["adx_client_id_path"])
if err != nil {
Log("Error when reading AdxClientID %s", err)
}
AdxTenantID, err = ReadFileContents(PluginConfiguration["adx_tenant_id_path"])
if err != nil {
Log("Error when reading AdxTenantID %s", err)
}
AdxClientSecret, err = ReadFileContents(PluginConfiguration["adx_client_secret_path"])
if err != nil {
Log("Error when reading AdxClientSecret %s", err)
}
// AdxDatabaseName should never get in a state where its length is 0, but it doesn't hurt to add the check
if len(AdxClusterUri) > 0 && len(AdxClientID) > 0 && len(AdxClientSecret) > 0 && len(AdxTenantID) > 0 && len(AdxDatabaseName) > 0 {
ContainerLogsRouteADX = true
Log("Routing container logs thru %s route...", ContainerLogsADXRoute)
fmt.Fprintf(os.Stdout, "Routing container logs thru %s route...\n", ContainerLogsADXRoute)
}
} else if strings.Compare(strings.ToLower(osType), "windows") != 0 { //for linux, oneagent will be default route
ContainerLogsRouteV2 = true //default is mdsd route
Log("Routing container logs thru %s route...", ContainerLogsRoute)
fmt.Fprintf(os.Stdout, "Routing container logs thru %s route... \n", ContainerLogsRoute)
}
if ContainerLogsRouteV2 == true {
CreateMDSDClient(ContainerLogV2, ContainerType)
} else if ContainerLogsRouteADX == true {
CreateADXClient()
} else { // v1 or windows
Log("Creating HTTP Client since either OS Platform is Windows or configmap configured with fallback option for ODS direct")
CreateHTTPClient()
}
if IsWindows == false { // mdsd linux specific
Log("Creating MDSD clients for KubeMonAgentEvents & InsightsMetrics")
CreateMDSDClient(KubeMonAgentEvents, ContainerType)
CreateMDSDClient(InsightsMetrics, ContainerType)
}
ContainerLogSchemaVersion := strings.TrimSpace(strings.ToLower(os.Getenv("AZMON_CONTAINER_LOG_SCHEMA_VERSION")))
Log("AZMON_CONTAINER_LOG_SCHEMA_VERSION:%s", ContainerLogSchemaVersion)
ContainerLogSchemaV2 = false //default is v1 schema
if strings.Compare(ContainerLogSchemaVersion, ContainerLogV2SchemaVersion) == 0 && ContainerLogsRouteADX != true {
ContainerLogSchemaV2 = true
Log("Container logs schema=%s", ContainerLogV2SchemaVersion)
fmt.Fprintf(os.Stdout, "Container logs schema=%s... \n", ContainerLogV2SchemaVersion)
}
if strings.Compare(strings.ToLower(os.Getenv("CONTROLLER_TYPE")), "daemonset") == 0 {
populateExcludedStdoutNamespaces()
populateExcludedStderrNamespaces()
//enrichment not applicable for ADX and v2 schema
if enrichContainerLogs == true && ContainerLogsRouteADX != true && ContainerLogSchemaV2 != true {
Log("ContainerLogEnrichment=true; starting goroutine to update containerimagenamemaps \n")
go updateContainerImageNameMaps()
} else {
Log("ContainerLogEnrichment=false \n")
}
// Flush config error records every hour
go flushKubeMonAgentEventRecords()
} else {
Log("Running in replicaset. Disabling container enrichment caching & updates \n")
}
if ContainerLogSchemaV2 == true {
MdsdContainerLogTagName = MdsdContainerLogV2SourceName
} else {
MdsdContainerLogTagName = MdsdContainerLogSourceName
}
MdsdInsightsMetricsTagName = MdsdInsightsMetricsSourceName
MdsdKubeMonAgentEventsTagName = MdsdKubeMonAgentEventsSourceName
Log("ContainerLogsRouteADX: %v, IsWindows: %v, IsAADMSIAuthMode = %v \n", ContainerLogsRouteADX, IsWindows, IsAADMSIAuthMode)
if !ContainerLogsRouteADX && IsWindows && IsAADMSIAuthMode {
Log("defaultIngestionAuthTokenRefreshIntervalSeconds = %d \n", defaultIngestionAuthTokenRefreshIntervalSeconds)
IngestionAuthTokenRefreshTicker = time.NewTicker(time.Second * time.Duration(defaultIngestionAuthTokenRefreshIntervalSeconds))
go refreshIngestionAuthToken()
}
}