func InitializePlugin()

in source/plugins/go/src/oms.go [1525:1819]


func InitializePlugin(pluginConfPath string, agentVersion string) {
	go func() {
		isTest := os.Getenv("ISTEST")
		if strings.Compare(strings.ToLower(strings.TrimSpace(isTest)), "true") == 0 {
			e1 := http.ListenAndServe("localhost:6060", nil)
			if e1 != nil {
				Log("HTTP Listen Error: %s \n", e1.Error())
			}
		}
	}()
	StdoutIgnoreNsSet = make(map[string]bool)
	StderrIgnoreNsSet = make(map[string]bool)
	ImageIDMap = make(map[string]string)
	NameIDMap = make(map[string]string)
	// Keeping the two error hashes separate since we need to keep the config error hash for the lifetime of the container
	// whereas the prometheus scrape error hash needs to be refreshed every hour
	ConfigErrorEvent = make(map[string]KubeMonAgentEventTags)
	PromScrapeErrorEvent = make(map[string]KubeMonAgentEventTags)
	// Initializing this to true to skip the first kubemonagentevent flush since the errors are not populated at this time
	skipKubeMonEventsFlush = true

	enrichContainerLogsSetting := os.Getenv("AZMON_CLUSTER_CONTAINER_LOG_ENRICH")
	if strings.Compare(enrichContainerLogsSetting, "true") == 0 {
		enrichContainerLogs = true
		Log("ContainerLogEnrichment=true \n")
	} else {
		enrichContainerLogs = false
		Log("ContainerLogEnrichment=false \n")
	}

	pluginConfig, err := ReadConfiguration(pluginConfPath)
	if err != nil {
		message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error())
		Log(message)
		SendException(message)
		time.Sleep(30 * time.Second)
		log.Fatalln(message)
	}

	ContainerType = os.Getenv(ContainerTypeEnv)
	Log("Container Type %s", ContainerType)

	osType := os.Getenv("OS_TYPE")
	IsWindows = false
	// Linux
	if strings.Compare(strings.ToLower(osType), "windows") != 0 {
		Log("Reading configuration for Linux from %s", pluginConfPath)
		WorkspaceID = os.Getenv("WSID")
		if WorkspaceID == "" {
			message := fmt.Sprintf("WorkspaceID shouldnt be empty")
			Log(message)
			SendException(message)
			time.Sleep(30 * time.Second)
			log.Fatalln(message)
		}
		LogAnalyticsWorkspaceDomain = os.Getenv("DOMAIN")
		if LogAnalyticsWorkspaceDomain == "" {
			message := fmt.Sprintf("Workspace DOMAIN shouldnt be empty")
			Log(message)
			SendException(message)
			time.Sleep(30 * time.Second)
			log.Fatalln(message)
		}
		OMSEndpoint = "https://" + WorkspaceID + ".ods." + LogAnalyticsWorkspaceDomain + "/OperationalData.svc/PostJsonDataItems"
		// Populate Computer field
		containerHostName, err1 := ioutil.ReadFile(pluginConfig["container_host_file_path"])
		if err1 != nil {
			// It is ok to log here and continue, because only the Computer column will be missing,
			// which can be deduced from a combination of containerId, and docker logs on the node
			message := fmt.Sprintf("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error())
			Log(message)
			SendException(message)
		} else {
			Computer = strings.TrimSuffix(ToString(containerHostName), "\n")
		}
		// read proxyendpoint if proxy configured
		ProxyEndpoint = ""
		proxySecretPath := pluginConfig["omsproxy_secret_path"]
		if _, err := os.Stat(proxySecretPath); err == nil {
			Log("Reading proxy configuration for Linux from %s", proxySecretPath)
			proxyConfig, err := ioutil.ReadFile(proxySecretPath)
			if err != nil {
				message := fmt.Sprintf("Error Reading omsproxy configuration %s\n", err.Error())
				Log(message)
				// if we fail to read proxy secret, AI telemetry might not be working as well
				SendException(message)
			} else {
				ProxyEndpoint = strings.TrimSpace(string(proxyConfig))
			}
		}
	} else {
		// windows
		IsWindows = true
		Computer = os.Getenv("HOSTNAME")
		WorkspaceID = os.Getenv("WSID")
		logAnalyticsDomain := os.Getenv("DOMAIN")
		ProxyEndpoint = os.Getenv("PROXY")
		OMSEndpoint = "https://" + WorkspaceID + ".ods." + logAnalyticsDomain + "/OperationalData.svc/PostJsonDataItems"
	}

	Log("OMSEndpoint %s", OMSEndpoint)
	IsAADMSIAuthMode = false
	if strings.Compare(strings.ToLower(os.Getenv(AADMSIAuthMode)), "true") == 0 {
		IsAADMSIAuthMode = true
		Log("AAD MSI Auth Mode Configured")
	}
	ResourceID = os.Getenv(envAKSResourceID)

	if len(ResourceID) > 0 {
		//AKS Scenario
		ResourceCentric = true
		splitted := strings.Split(ResourceID, "/")
		ResourceName = splitted[len(splitted)-1]
		Log("ResourceCentric: True")
		Log("ResourceID=%s", ResourceID)
		Log("ResourceName=%s", ResourceID)
	}
	if ResourceCentric == false {
		//AKS-Engine/hybrid scenario
		ResourceName = os.Getenv(ResourceNameEnv)
		ResourceID = ResourceName
		Log("ResourceCentric: False")
		Log("ResourceID=%s", ResourceID)
		Log("ResourceName=%s", ResourceName)
	}

	// log runtime info for debug purpose
	containerRuntime = os.Getenv(ContainerRuntimeEnv)
	Log("Container Runtime engine %s", containerRuntime)

	// set useragent to be used by ingestion
	dockerCimprovVersionEnv := strings.TrimSpace(os.Getenv("DOCKER_CIMPROV_VERSION"))
	if len(dockerCimprovVersionEnv) > 0 {
		dockerCimprovVersion = dockerCimprovVersionEnv
	}

	userAgent = fmt.Sprintf("%s/%s", agentName, dockerCimprovVersion)

	Log("Usage-Agent = %s \n", userAgent)

	// Initialize image,name map refresh ticker
	containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"])
	if err != nil {
		message := fmt.Sprintf("Error Reading Container Inventory Refresh Interval %s", err.Error())
		Log(message)
		SendException(message)
		Log("Using Default Refresh Interval of %d s\n", defaultContainerInventoryRefreshInterval)
		containerInventoryRefreshInterval = defaultContainerInventoryRefreshInterval
	}
	Log("containerInventoryRefreshInterval = %d \n", containerInventoryRefreshInterval)
	ContainerImageNameRefreshTicker = time.NewTicker(time.Second * time.Duration(containerInventoryRefreshInterval))

	Log("kubeMonAgentConfigEventFlushInterval = %d \n", kubeMonAgentConfigEventFlushInterval)
	KubeMonAgentConfigEventsSendTicker = time.NewTicker(time.Minute * time.Duration(kubeMonAgentConfigEventFlushInterval))

	Log("Computer == %s \n", Computer)

	ret, err := InitializeTelemetryClient(agentVersion)
	if ret != 0 || err != nil {
		message := fmt.Sprintf("Error During Telemetry Initialization :%s", err.Error())
		fmt.Printf(message)
		Log(message)
	}

	// Initialize KubeAPI Client
	config, err := rest.InClusterConfig()
	if err != nil {
		message := fmt.Sprintf("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
		Log(message)
		SendException(message)
	}

	ClientSet, err = kubernetes.NewForConfig(config)
	if err != nil {
		message := fmt.Sprintf("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
		SendException(message)
		Log(message)
	}

	PluginConfiguration = pluginConfig

	ContainerLogsRoute := strings.TrimSpace(strings.ToLower(os.Getenv("AZMON_CONTAINER_LOGS_ROUTE")))
	Log("AZMON_CONTAINER_LOGS_ROUTE:%s", ContainerLogsRoute)

	ContainerLogsRouteV2 = false
	ContainerLogsRouteADX = false

	if strings.Compare(ContainerLogsRoute, ContainerLogsADXRoute) == 0 {
		// Try to read the ADX database name from environment variables. Default to DefaultAdsDatabaseName if not set.
		// This SHOULD be set by tomlparser.rb so it's a highly unexpected event if it isn't.
		// It should be set by the logic in tomlparser.rb EVEN if ADX logging isn't enabled
		AdxDatabaseName := strings.TrimSpace(os.Getenv("AZMON_ADX_DATABASE_NAME"))

		// Check the len of the provided name for database and use default if 0, just to be sure
		if len(AdxDatabaseName) == 0 {
			Log("Adx database name unexpecedly empty (check config AND implementation, should have been set by tomlparser.rb?) - will default to '%s'", DefaultAdxDatabaseName)
			AdxDatabaseName = DefaultAdxDatabaseName
		}

		//check if adx clusteruri, clientid & secret are set
		var err error
		AdxClusterUri, err = ReadFileContents(PluginConfiguration["adx_cluster_uri_path"])
		if err != nil {
			Log("Error when reading AdxClusterUri %s", err)
		}
		if !isValidUrl(AdxClusterUri) {
			Log("Invalid AdxClusterUri %s", AdxClusterUri)
			AdxClusterUri = ""
		}

		AdxClientID, err = ReadFileContents(PluginConfiguration["adx_client_id_path"])
		if err != nil {
			Log("Error when reading AdxClientID %s", err)
		}

		AdxTenantID, err = ReadFileContents(PluginConfiguration["adx_tenant_id_path"])
		if err != nil {
			Log("Error when reading AdxTenantID %s", err)
		}

		AdxClientSecret, err = ReadFileContents(PluginConfiguration["adx_client_secret_path"])
		if err != nil {
			Log("Error when reading AdxClientSecret %s", err)
		}

		// AdxDatabaseName should never get in a state where its length is 0, but it doesn't hurt to add the check
		if len(AdxClusterUri) > 0 && len(AdxClientID) > 0 && len(AdxClientSecret) > 0 && len(AdxTenantID) > 0 && len(AdxDatabaseName) > 0 {
			ContainerLogsRouteADX = true
			Log("Routing container logs thru %s route...", ContainerLogsADXRoute)
			fmt.Fprintf(os.Stdout, "Routing container logs thru %s route...\n", ContainerLogsADXRoute)
		}
	} else if strings.Compare(strings.ToLower(osType), "windows") != 0 { //for linux, oneagent will be default route
		ContainerLogsRouteV2 = true //default is mdsd route
		Log("Routing container logs thru %s route...", ContainerLogsRoute)
		fmt.Fprintf(os.Stdout, "Routing container logs thru %s route... \n", ContainerLogsRoute)
	}

	if ContainerLogsRouteV2 == true {
		CreateMDSDClient(ContainerLogV2, ContainerType)
	} else if ContainerLogsRouteADX == true {
		CreateADXClient()
	} else { // v1 or windows
		Log("Creating HTTP Client since either OS Platform is Windows or configmap configured with fallback option for ODS direct")
		CreateHTTPClient()
	}

	if IsWindows == false { // mdsd linux specific
		Log("Creating MDSD clients for KubeMonAgentEvents & InsightsMetrics")
		CreateMDSDClient(KubeMonAgentEvents, ContainerType)
		CreateMDSDClient(InsightsMetrics, ContainerType)
	}

	ContainerLogSchemaVersion := strings.TrimSpace(strings.ToLower(os.Getenv("AZMON_CONTAINER_LOG_SCHEMA_VERSION")))
	Log("AZMON_CONTAINER_LOG_SCHEMA_VERSION:%s", ContainerLogSchemaVersion)

	ContainerLogSchemaV2 = false //default is v1 schema

	if strings.Compare(ContainerLogSchemaVersion, ContainerLogV2SchemaVersion) == 0 && ContainerLogsRouteADX != true {
		ContainerLogSchemaV2 = true
		Log("Container logs schema=%s", ContainerLogV2SchemaVersion)
		fmt.Fprintf(os.Stdout, "Container logs schema=%s... \n", ContainerLogV2SchemaVersion)
	}

	if strings.Compare(strings.ToLower(os.Getenv("CONTROLLER_TYPE")), "daemonset") == 0 {
		populateExcludedStdoutNamespaces()
		populateExcludedStderrNamespaces()
		//enrichment not applicable for ADX and v2 schema
		if enrichContainerLogs == true && ContainerLogsRouteADX != true && ContainerLogSchemaV2 != true {
			Log("ContainerLogEnrichment=true; starting goroutine to update containerimagenamemaps \n")
			go updateContainerImageNameMaps()
		} else {
			Log("ContainerLogEnrichment=false \n")
		}

		// Flush config error records every hour
		go flushKubeMonAgentEventRecords()
	} else {
		Log("Running in replicaset. Disabling container enrichment caching & updates \n")
	}

	if ContainerLogSchemaV2 == true {
		MdsdContainerLogTagName = MdsdContainerLogV2SourceName
	} else {
		MdsdContainerLogTagName = MdsdContainerLogSourceName
	}

	MdsdInsightsMetricsTagName = MdsdInsightsMetricsSourceName
	MdsdKubeMonAgentEventsTagName = MdsdKubeMonAgentEventsSourceName
	Log("ContainerLogsRouteADX: %v, IsWindows: %v, IsAADMSIAuthMode = %v \n", ContainerLogsRouteADX, IsWindows, IsAADMSIAuthMode)
	if !ContainerLogsRouteADX && IsWindows && IsAADMSIAuthMode {
		Log("defaultIngestionAuthTokenRefreshIntervalSeconds = %d \n", defaultIngestionAuthTokenRefreshIntervalSeconds)
		IngestionAuthTokenRefreshTicker = time.NewTicker(time.Second * time.Duration(defaultIngestionAuthTokenRefreshIntervalSeconds))
		go refreshIngestionAuthToken()
	}
}