func()

in pkg/helper/container_discover_controller.go [162:270]


func (c *ContainerDiscoverManager) Init() bool {
	defer dockerCenterRecover()

	// discover which runtime is valid
	if IsCRIRuntimeValid(containerdUnixSocket) {
		var err error
		criRuntimeWrapper, err = NewCRIRuntimeWrapper(dockerCenterInstance)
		if err != nil {
			logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "[CRIRuntime] creare cri-runtime client error: %v", err)
			criRuntimeWrapper = nil
		} else {
			logger.Infof(context.Background(), "[CRIRuntime] create cri-runtime client successfully")
		}
	}
	if ok, err := util.PathExists(DefaultLogtailMountPath); err == nil {
		if !ok {
			logger.Info(context.Background(), "no docker mount path", "set empty")
			DefaultLogtailMountPath = ""
		}
	} else {
		logger.Warning(context.Background(), "check docker mount path error", err.Error())
	}
	c.enableCRIDiscover = criRuntimeWrapper != nil
	c.enableDockerDiscover = dockerCenterInstance.initClient() == nil
	c.enableStaticDiscover = isStaticContainerInfoEnabled()
	discoverdRuntime := c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover
	if !discoverdRuntime {
		return false
	}

	// try to connect to runtime
	logger.Info(context.Background(), "input", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover)
	listenLoopIntervalSec := 0
	// Get env in the same order as in C Logtail
	listenLoopIntervalStr := os.Getenv("docker_config_update_interval")
	if len(listenLoopIntervalStr) > 0 {
		listenLoopIntervalSec, _ = strconv.Atoi(listenLoopIntervalStr)
	}
	listenLoopIntervalStr = os.Getenv("ALIYUN_LOGTAIL_DOCKER_CONFIG_UPDATE_INTERVAL")
	if len(listenLoopIntervalStr) > 0 {
		listenLoopIntervalSec, _ = strconv.Atoi(listenLoopIntervalStr)
	}
	// Keep this env var for compatibility
	listenLoopIntervalStr = os.Getenv("CONTAINERD_LISTEN_LOOP_INTERVAL")
	if len(listenLoopIntervalStr) > 0 {
		listenLoopIntervalSec, _ = strconv.Atoi(listenLoopIntervalStr)
	}
	if listenLoopIntervalSec > 0 {
		DefaultSyncContainersPeriod = time.Second * time.Duration(listenLoopIntervalSec)
	}
	// @note config for Fetch All Interval
	fetchAllSec := (int)(FetchAllInterval.Seconds())
	if err := util.InitFromEnvInt("DOCKER_FETCH_ALL_INTERVAL", &fetchAllSec, fetchAllSec); err != nil {
		c.LogAlarm(err, "initialize env DOCKER_FETCH_ALL_INTERVAL error")
	}
	if fetchAllSec > 0 && fetchAllSec < 3600*24 {
		FetchAllInterval = time.Duration(fetchAllSec) * time.Second
	}
	logger.Info(context.Background(), "init docker center, fetch all seconds", FetchAllInterval.String())
	{
		timeoutSec := int(fetchAllSuccessTimeout.Seconds())
		if err := util.InitFromEnvInt("DOCKER_FETCH_ALL_SUCCESS_TIMEOUT", &timeoutSec, timeoutSec); err != nil {
			c.LogAlarm(err, "initialize env DOCKER_FETCH_ALL_SUCCESS_TIMEOUT error")
		}
		if timeoutSec > int(FetchAllInterval.Seconds()) && timeoutSec <= 3600*24 {
			fetchAllSuccessTimeout = time.Duration(timeoutSec) * time.Second
		}
	}
	logger.Info(context.Background(), "init docker center, fecth all success timeout", fetchAllSuccessTimeout.String())
	{
		timeoutSec := int(DockerCenterTimeout.Seconds())
		if err := util.InitFromEnvInt("DOCKER_CLIENT_REQUEST_TIMEOUT", &timeoutSec, timeoutSec); err != nil {
			c.LogAlarm(err, "initialize env DOCKER_CLIENT_REQUEST_TIMEOUT error")
		}
		if timeoutSec > 0 {
			DockerCenterTimeout = time.Duration(timeoutSec) * time.Second
		}
	}
	logger.Info(context.Background(), "init docker center, client request timeout", DockerCenterTimeout.String())
	{
		count := int(MaxFetchOneTriggerPerSecond)
		if err := util.InitFromEnvInt("CONTAINER_FETCH_ONE_MAX_COUNT_PER_SECOND", &count, count); err != nil {
			c.LogAlarm(err, "initialize env CONTAINER_FETCH_ONE_MAX_COUNT_PER_SECOND error")
		}
		if count > 0 {
			MaxFetchOneTriggerPerSecond = int32(count)
		}
	}
	logger.Info(context.Background(), "init docker center, max fetchOne count per second", MaxFetchOneTriggerPerSecond)

	var err error
	if c.enableDockerDiscover {
		if err = c.fetchDocker(); err != nil {
			c.enableDockerDiscover = false
			logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch docker containers error, close docker discover, will retry")
		}
	}
	if c.enableCRIDiscover {
		if err = c.fetchCRI(); err != nil {
			c.enableCRIDiscover = false
			logger.Errorf(context.Background(), "DOCKER_CENTER_ALARM", "fetch cri containers error, close cri discover, will retry")
		}
	}
	if c.enableStaticDiscover {
		c.fetchStatic()
	}
	logger.Info(context.Background(), "final", "param", "docker discover", c.enableDockerDiscover, "cri discover", c.enableCRIDiscover, "static discover", c.enableStaticDiscover)
	return c.enableCRIDiscover || c.enableDockerDiscover || c.enableStaticDiscover
}