func Fetch()

in agent/taskengine/scheduletask.go [82:185]


func Fetch(from_kick bool, taskId string, taskType int) int {
	logger := log.GetLogger().WithField("from_kick", from_kick)
	// Fetching task should be allowed before all core components of agent have
	// been correctly initialized. This critical indicator would be set at the
	// end of program.run method
	if !isEnabledFetchingTask() {
		logger.Infoln("Fetching tasks is disabled due to network is not ready")
		return 0
	}

	// NOTE: sync.Mutex from Go standard library does not support try-lock
	// operation like std::mutex in C++ STL, which makes it slightly hard for
	// goroutines of fetching tasks and checking updates to coopearate gracefully.
	// Futhermore, it does not support try-lock operation with specified timeout,
	// which makes it hard for goroutines of fetching tasks to wait in queue but
	// just throw many message about lock accquisition failure confusing others.
	// THUS heavy weight lock from github.com/viney-shih/go-lock library is used
	// to provide graceful locking mechanism for goroutine coopeartion. The cost
	// would be, some performance lost.
	if !FetchingTaskLock.TryLockWithTimeout(time.Duration(2) * time.Second) {
		logger.Infoln("Fetching tasks is canceled due to another running startupFetchRetrying or updating process.")
		return ErrUpdatingProcedureRunning
	}
	// Immediately release fetchingTaskLock to let other goroutine fetching
	// tasks go, but keep updating safe
	FetchingTaskLock.Unlock()

	// Increase fetchingTaskCounter to indicate there is a goroutine fetching
	// tasks, which the updating goroutine MUST notice and decrease it to let
	// updating goroutine go.
	FetchingTaskCounter.Add(1)
	defer FetchingTaskCounter.Add(-1)

	var task_size int
	var fetchErr error
	// `isColdstart` only make sense for FetchOnStartup
	isColdstart, _ := flagging.IsColdstart()

	// Fetch tasks for kickoff if startup tasks has been fetched.
	if _startupFetchedFinished {
		task_size = fetchForKickOff(taskId, taskType, isColdstart)
		return task_size
	}

	_startupFetchLock.Lock()

	// Check again whether the startup tasks has been fetched.
	if _startupFetchedFinished {
		_startupFetchLock.Unlock()

		task_size = fetchForKickOff(taskId, taskType, isColdstart)
		return task_size
	}

	// Fetch tasks for startup.
	task_size, fetchErr = fetchTasks(FetchOnStartup, taskId, taskType, isColdstart)
	if fetchErr == nil {
		_startupFetchedFinished = true
		close(_startupFetchedDone)

		_startupFetchLock.Unlock()
		return task_size
	}

	_startupFetchLock.Unlock()

	// The from_kick=false Fetch() is responsible for retrying.
	// There should be only one Fetch() with from_kick=false.
	if !from_kick {
		// In order not to block Fetch(false, ...) a new goroutine to be created
		// for continuous retrying.
		go func() {
			aontherFetchErr := fetchErr
			ticker := time.NewTicker(fetchRetryInterval)
			defer ticker.Stop()
			for aontherFetchErr != nil {
				select {
				case <-ticker.C:

					_startupFetchLock.Lock()
					if _startupFetchedFinished {
						_startupFetchLock.Unlock()
						return
					}
					_, aontherFetchErr = fetchTasks(FetchOnStartup, taskId, taskType, isColdstart)
					if aontherFetchErr == nil {
						_startupFetchedFinished = true
						close(_startupFetchedDone)
						_startupFetchLock.Unlock()
						return
					}
					_startupFetchLock.Unlock()

					ticker.Reset(fetchRetryInterval)
				case <-_startupFetchedDone:
					// Another goroutine has finished fetching tasks for startup.
					return
				}
			}
		}()
	}

	return 0
}