in core/workerprovider/longrunningprovider/provider/provider.go [70:181]
func (w *WorkerProvider) discoverWorkers(configs map[string]*model.WorkerConfig, pingResults []*message.Message) {
logger := w.context.Log()
defer func() {
if msg := recover(); msg != nil {
logger.Errorf("worker provider run panic: %v", msg)
logger.Errorf("%s: %s", msg, debug.Stack())
}
}()
// get all running processes from the process tree
allProcesses, err := w.exec.Processes()
if err != nil {
logger.Errorf("failed to retrieve processes tree, %s", err.Error())
}
// delete worker from worker pool if the config is no longer available (removed)
for _, worker := range w.workerPool {
if _, ok := configs[worker.Name]; !ok {
delete(w.workerPool, worker.Name)
}
}
// update worker pool with the new worker config (added)
for _, config := range configs {
if _, ok := w.workerPool[config.Name]; !ok {
w.workerPool[config.Name] = &model.Worker{
Name: config.Name,
Config: config,
Processes: make(map[int]*model.Process),
}
}
}
// set worker pool status to unknown, since it's been 60 seconds without updates
for _, worker := range w.workerPool {
for _, process := range worker.Processes {
process.Status = model.Unknown
}
}
// update worker pool status bases on the health ping results
logger.Debugf("Update worker pool base on the health ping results")
for _, pingResult := range pingResults {
var payload *message.HealthResultPayload
if pingResult.Topic != message.GetWorkerHealthResult {
logger.Warnf("unsupported message topic: %s, %s", pingResult.Topic, pingResult)
continue
}
if err := json.Unmarshal(pingResult.Payload, &payload); err != nil {
logger.Warnf("unable to unmarshal payload: %s", pingResult)
continue
}
logger.Debugf("unmarshal payload content: %v", payload)
if _, ok := w.workerPool[payload.Name]; ok {
if _, ok := w.workerPool[payload.Name].Processes[payload.Pid]; ok {
logger.Tracef("%s process (pid:%v) exists the worker pool, update status", payload.Name, payload.Pid)
w.workerPool[payload.Name].Processes[payload.Pid].Status = model.Active
} else {
logger.Tracef("Found running %s process (pid:%v), add to the worker pool", payload.Name, payload.Pid)
w.workerPool[payload.Name].Processes[payload.Pid] = &model.Process{
Pid: payload.Pid,
Status: model.Active,
}
}
}
}
// update worker pool status bases on the process tree
// health ping is less reliable when multiple instances of core agent are running
logger.Debugf("Update worker pool base on the process tree")
for _, worker := range w.workerPool {
for _, process := range allProcesses {
logger.Tracef("process id %v, executable %s", process.Pid, process.Executable)
if process.Executable == worker.Config.BinaryName {
if _, ok := w.workerPool[worker.Name].Processes[process.Pid]; ok {
logger.Tracef("%s process (pid:%v) exists the worker pool, update status", process.Executable, process.Pid)
w.workerPool[worker.Name].Processes[process.Pid].Status = model.Active
} else {
logger.Tracef("Found running %s process (pid:%v), add to the worker pool", process.Executable, process.Pid)
w.workerPool[worker.Name].Processes[process.Pid] = &model.Process{
Pid: process.Pid,
Status: model.Active,
}
}
}
}
}
// remove the unknown process since it's most likely terminated
for _, worker := range w.workerPool {
for _, process := range worker.Processes {
if process.Status == model.Unknown {
logger.Infof(
"Process %s (pid:%v) has been terminated, remove from worker pool",
worker.Name,
strconv.Itoa(process.Pid))
// Clean up process in case it was not terminated correctly
if err := w.exec.Kill(process.Pid); err != nil {
logger.Debugf("Failed to clean up process %v for worker %s, %s", process.Pid, worker.Name, err)
}
delete(worker.Processes, process.Pid)
}
}
}
return
}