in agent/taskengine/scheduletask.go [82:185]
func Fetch(from_kick bool, taskId string, taskType int) int {
logger := log.GetLogger().WithField("from_kick", from_kick)
// Fetching task should be allowed before all core components of agent have
// been correctly initialized. This critical indicator would be set at the
// end of program.run method
if !isEnabledFetchingTask() {
logger.Infoln("Fetching tasks is disabled due to network is not ready")
return 0
}
// NOTE: sync.Mutex from Go standard library does not support try-lock
// operation like std::mutex in C++ STL, which makes it slightly hard for
// goroutines of fetching tasks and checking updates to coopearate gracefully.
// Futhermore, it does not support try-lock operation with specified timeout,
// which makes it hard for goroutines of fetching tasks to wait in queue but
// just throw many message about lock accquisition failure confusing others.
// THUS heavy weight lock from github.com/viney-shih/go-lock library is used
// to provide graceful locking mechanism for goroutine coopeartion. The cost
// would be, some performance lost.
if !FetchingTaskLock.TryLockWithTimeout(time.Duration(2) * time.Second) {
logger.Infoln("Fetching tasks is canceled due to another running startupFetchRetrying or updating process.")
return ErrUpdatingProcedureRunning
}
// Immediately release fetchingTaskLock to let other goroutine fetching
// tasks go, but keep updating safe
FetchingTaskLock.Unlock()
// Increase fetchingTaskCounter to indicate there is a goroutine fetching
// tasks, which the updating goroutine MUST notice and decrease it to let
// updating goroutine go.
FetchingTaskCounter.Add(1)
defer FetchingTaskCounter.Add(-1)
var task_size int
var fetchErr error
// `isColdstart` only make sense for FetchOnStartup
isColdstart, _ := flagging.IsColdstart()
// Fetch tasks for kickoff if startup tasks has been fetched.
if _startupFetchedFinished {
task_size = fetchForKickOff(taskId, taskType, isColdstart)
return task_size
}
_startupFetchLock.Lock()
// Check again whether the startup tasks has been fetched.
if _startupFetchedFinished {
_startupFetchLock.Unlock()
task_size = fetchForKickOff(taskId, taskType, isColdstart)
return task_size
}
// Fetch tasks for startup.
task_size, fetchErr = fetchTasks(FetchOnStartup, taskId, taskType, isColdstart)
if fetchErr == nil {
_startupFetchedFinished = true
close(_startupFetchedDone)
_startupFetchLock.Unlock()
return task_size
}
_startupFetchLock.Unlock()
// The from_kick=false Fetch() is responsible for retrying.
// There should be only one Fetch() with from_kick=false.
if !from_kick {
// In order not to block Fetch(false, ...) a new goroutine to be created
// for continuous retrying.
go func() {
aontherFetchErr := fetchErr
ticker := time.NewTicker(fetchRetryInterval)
defer ticker.Stop()
for aontherFetchErr != nil {
select {
case <-ticker.C:
_startupFetchLock.Lock()
if _startupFetchedFinished {
_startupFetchLock.Unlock()
return
}
_, aontherFetchErr = fetchTasks(FetchOnStartup, taskId, taskType, isColdstart)
if aontherFetchErr == nil {
_startupFetchedFinished = true
close(_startupFetchedDone)
_startupFetchLock.Unlock()
return
}
_startupFetchLock.Unlock()
ticker.Reset(fetchRetryInterval)
case <-_startupFetchedDone:
// Another goroutine has finished fetching tasks for startup.
return
}
}
}()
}
return 0
}