vermeer/apps/structure/task.go (463 lines of code) (raw):
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with this
work for additional information regarding copyright ownership. The ASF
licenses this file to You under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations
under the License.
*/
package structure
import (
"encoding/json"
"fmt"
"path"
"reflect"
"sort"
"strconv"
"sync"
"sync/atomic"
"time"
"vermeer/apps/common"
storage "vermeer/apps/storage"
"github.com/sirupsen/logrus"
)
type TaskWorker struct {
Name string `json:"name,omitempty"`
State TaskState `json:"state,omitempty"`
}
type TaskInfo struct {
ID int32
State TaskState
CreateUser string
CreateType string
CreateTime time.Time
StartTime time.Time
UpdateTime time.Time
GraphName string
SpaceName string
Type string
Params map[string]string
Workers []*TaskWorker
ErrMessage string
wg *sync.WaitGroup
Action int32
StatisticsResult map[string]any
}
func (ti *TaskInfo) SetState(state TaskState) {
ti.State = state
for _, w := range ti.Workers {
w.State = state
}
ti.UpdateTime = time.Now()
}
func (ti *TaskInfo) SetErrMsg(msg string) {
ti.ErrMessage = msg
ti.UpdateTime = time.Now()
}
func (ti *TaskInfo) SetErr(msg string) {
ti.State = TaskStateError
ti.ErrMessage = msg
ti.UpdateTime = time.Now()
}
func (ti *TaskInfo) SetWorkerState(workerName string, state TaskState) {
ti.UpdateTime = time.Now()
for _, w := range ti.Workers {
if w.Name == workerName {
w.State = state
break
}
}
}
func (ti *TaskInfo) CheckTaskState(state TaskState) bool {
for _, w := range ti.Workers {
if w.State != state {
return false
}
}
return true
}
func (ti *TaskInfo) GetWg() *sync.WaitGroup {
return ti.wg
}
func (ti *TaskInfo) Equivalent(other *TaskInfo) bool {
if other == nil {
return false
}
return ti.GraphName == other.GraphName &&
ti.SpaceName == other.SpaceName &&
ti.Type == other.Type &&
ti.CreateUser == other.CreateUser &&
reflect.DeepEqual(ti.Params, other.Params)
}
// -----------------------TaskManager----------------------------------------
var TaskManager = &taskManager{}
type taskManager struct {
MutexLocker
tasks map[int32]*TaskInfo
spaceTasks map[string]map[int32]*TaskInfo
idSeed int32
store storage.Store
}
func (tm *taskManager) Init() {
tm.tasks = make(map[int32]*TaskInfo)
tm.spaceTasks = make(map[string]map[int32]*TaskInfo)
tm.idSeed = 1
}
func (tm *taskManager) CreateTask(spaceName string, taskType string, taskID int32) (*TaskInfo, error) {
if spaceName == "" {
return nil, fmt.Errorf("the argument `spaceName` is empty")
}
if taskType == "" {
return nil, fmt.Errorf("the argument `taskType` is empty")
}
defer tm.Unlock(tm.Lock())
task := TaskInfo{}
task.CreateTime = time.Now()
task.State = TaskStateCreated
task.Type = taskType
task.wg = &sync.WaitGroup{}
task.Workers = make([]*TaskWorker, 0)
task.SpaceName = spaceName
if taskID < 0 {
task.ID = taskID
return &task, nil
} else if taskID == 0 {
task.ID = tm.idSeed
tm.idSeed++
} else {
task.ID = taskID
tm.idSeed = taskID + 1
}
common.PrometheusMetrics.TaskCnt.WithLabelValues(taskType).Inc()
common.PrometheusMetrics.TaskRunningCnt.WithLabelValues(taskType).Inc()
return &task, nil
}
func (tm *taskManager) appendToSpace(task *TaskInfo) {
buf := tm.spaceTasks[task.SpaceName]
if buf == nil {
buf = make(map[int32]*TaskInfo)
tm.spaceTasks[task.SpaceName] = buf
}
buf[task.ID] = task
}
func (tm *taskManager) deleteToSpace(task *TaskInfo) {
buf := tm.spaceTasks[task.SpaceName]
if buf == nil {
return
}
delete(buf, task.ID)
}
// AddTask return false when the task exists.
func (tm *taskManager) AddTask(task *TaskInfo) (ok bool, err error) {
if task == nil {
return false, fmt.Errorf("the argument `task` is nil")
}
defer tm.Unlock(tm.Lock())
if _, ok := tm.tasks[task.ID]; ok {
logrus.Errorf("AddTask error, task exists: %d", task.ID)
return false, nil
}
if tm.idSeed <= task.ID {
tm.idSeed = task.ID + 1
}
if tm.IsFinished(task) {
return false, nil
}
tm.tasks[task.ID] = task
tm.appendToSpace(task)
return true, nil
}
func (tm *taskManager) GetAllTasks(limit int) []*TaskInfo {
defer tm.Unlock(tm.Lock())
tasks := make([]*TaskInfo, 0, len(tm.tasks))
var err error
for taskID := tm.idSeed - 1; taskID > 0; taskID-- {
if len(tasks) >= limit {
break
}
task, ok := tm.tasks[taskID]
if !ok && tm.store != nil {
task, err = tm.readTask(taskID)
if err != nil {
logrus.Errorf("GetAllTask read task failed, taskID=%d, err=%v", taskID, err)
}
}
if task != nil {
tasks = append(tasks, task)
}
}
return SortTaskDesc(tasks)
}
func (tm *taskManager) GetTasks(spaceName string, limit int) []*TaskInfo {
defer tm.Unlock(tm.Lock())
buf := tm.spaceTasks[spaceName]
tasks := make([]*TaskInfo, 0, len(buf))
var err error
for taskID := tm.idSeed - 1; taskID > 0; taskID-- {
if len(tasks) >= limit {
break
}
task, ok := buf[taskID]
if ok {
tasks = append(tasks, task)
continue
}
task, ok = tm.tasks[taskID]
if !ok && tm.store != nil {
task, err = tm.readTask(taskID)
if err != nil {
logrus.Errorf("GetAllTask read task failed, taskID=%d, err=%v", taskID, err)
}
}
if task != nil && task.SpaceName == spaceName {
tasks = append(tasks, task)
}
}
return SortTaskDesc(tasks)
}
func (tm *taskManager) GetTaskByID(taskID int32) *TaskInfo {
defer tm.Unlock(tm.Lock())
task, ok := tm.tasks[taskID]
if !ok {
if tm.store != nil {
// store 不为 nil,代表在master上查询
taskInfo, err := tm.readTask(taskID)
if err != nil {
logrus.Errorf("GetTaskByID read task failed, taskID=%d, err=%v", taskID, err)
return nil
}
return taskInfo
}
logrus.Errorf("get task ID:%v not exist", taskID)
return nil
}
return task
}
func (tm *taskManager) GetAllRunningTasks() []*TaskInfo {
defer tm.Unlock(tm.Lock())
tasks := make([]*TaskInfo, 0, len(tm.tasks))
for _, v := range tm.tasks {
if tm.isRunningTask(v) {
tasks = append(tasks, v)
}
}
return SortTaskDesc(tasks)
}
func (tm *taskManager) GetAllWaitingTasks() []*TaskInfo {
defer tm.Unlock(tm.Lock())
tasks := make([]*TaskInfo, 0, len(tm.tasks))
for _, v := range tm.tasks {
if v.State == TaskStateWaiting {
tasks = append(tasks, v)
}
}
return SortTaskDesc(tasks)
}
func (tm *taskManager) GetTaskRunning(spaceName string) []*TaskInfo {
defer tm.Unlock(tm.Lock())
buf := tm.spaceTasks[spaceName]
tasks := make([]*TaskInfo, 0, len(buf))
for _, v := range buf {
if tm.isRunningTask(v) {
tasks = append(tasks, v)
}
}
return SortTaskDesc(tasks)
}
func (tm *taskManager) isRunningTask(task *TaskInfo) bool {
return task.State != TaskStateComplete &&
task.State != TaskStateLoaded &&
task.State != TaskStateCanceled &&
task.State != TaskStateError &&
task.State != TaskStateCreated &&
task.State != TaskStateWaiting
}
func (tm *taskManager) IsFinished(task *TaskInfo) bool {
return task.State == TaskStateComplete ||
task.State == TaskStateLoaded ||
task.State == TaskStateError ||
task.State == TaskStateCanceled
}
func (tm *taskManager) FinishTask(taskID int32) error {
defer tm.Unlock(tm.Lock())
task, ok := tm.tasks[taskID]
if !ok {
logrus.Errorf("get task ID:%v not exist", taskID)
return fmt.Errorf("get task ID:%v not exist", taskID)
}
if !tm.IsFinished(task) {
logrus.Errorf("task not finished:%v", task.State)
return fmt.Errorf("task not finished:%v", task.State)
}
tm.deleteToSpace(tm.tasks[taskID])
delete(tm.tasks, taskID)
return nil
}
func (tm *taskManager) GetTaskByGraph(spaceName, graphName string) []*TaskInfo {
defer tm.Unlock(tm.Lock())
tasks := make([]*TaskInfo, 0)
if tm.spaceTasks[spaceName] == nil {
return tasks
}
for _, taskInfo := range tm.spaceTasks[spaceName] {
if taskInfo.GraphName == graphName {
tasks = append(tasks, taskInfo)
}
}
return SortTaskDesc(tasks)
}
func (tm *taskManager) DeleteTask(taskID int32) error {
defer tm.Unlock(tm.Lock())
if tm.tasks[taskID] == nil {
return nil
}
tm.deleteToSpace(tm.tasks[taskID])
delete(tm.tasks, taskID)
return nil
}
func (tm *taskManager) InitStore() error {
p, err := common.GetCurrentPath()
if err != nil {
logrus.Errorf("get current path error:%v", err)
return err
}
dir := path.Join(p, "vermeer_data", "task_info")
tm.store, err = storage.StoreMaker(storage.StoreOption{
StoreName: storage.StoreTypePebble,
Path: dir,
Fsync: true,
})
if err != nil {
logrus.Errorf("create store error:%v", err)
return err
}
return nil
}
func (tm *taskManager) SaveTask(taskID int32) error {
defer tm.Unlock(tm.Lock())
task, ok := tm.tasks[taskID]
if !ok {
logrus.Infof("get task ID:%v not exist", taskID)
return nil
}
return tm.doSaveTask(task)
}
func (tm *taskManager) ForceState(task *TaskInfo, state TaskState) bool {
if task == nil {
logrus.Error("TaskManager.ForceState: the argument `task` is nil")
return false
}
if state == "" {
logrus.Error("TaskManager.ForceState: the argument `state` is empty")
return false
}
defer tm.Unlock(tm.Lock())
task.SetState(state)
if err := tm.doSaveTask(task); err != nil {
logrus.Errorf("failed to save task '%d' state '%s', caused by: %v", task.ID, state, err)
return false
}
return true
}
func (tm *taskManager) SetState(task *TaskInfo, state TaskState) error {
if task == nil {
return fmt.Errorf("the argument `task` is nil")
}
if state == "" {
return fmt.Errorf("the argument `state` is empty")
}
defer tm.Unlock(tm.Lock())
prevState := task.State
preTime := task.UpdateTime
task.SetState(state)
if err := tm.doSaveTask(task); err != nil {
task.SetState(prevState)
task.UpdateTime = preTime
logrus.Errorf("failed to save task '%d' state '%s', caused by: %v", task.ID, state, err)
return err
}
return nil
}
func (tm *taskManager) SetError(task *TaskInfo, msg string) bool {
if task == nil {
logrus.Errorf("SaveError: the argument `task` is nil")
return false
}
defer tm.Unlock(tm.Lock())
task.SetState(TaskStateError)
task.ErrMessage = msg
if err := tm.doSaveTask(task); err != nil {
logrus.Errorf("failed to save task '%d' error state , caused by: %v", task.ID, err)
return false
}
return true
}
func (tm *taskManager) doSaveTask(task *TaskInfo) error {
key := strconv.Itoa(int(task.ID))
bytes, err := json.Marshal(task)
if err != nil {
return err
}
batch := tm.store.NewBatch()
err = batch.Set([]byte(key), bytes)
if err != nil {
return err
}
err = batch.Commit()
if err != nil {
return err
}
return nil
}
func (tm *taskManager) ReadTask(taskID int32) (*TaskInfo, error) {
defer tm.Unlock(tm.Lock())
return tm.readTask(taskID)
}
func (tm *taskManager) readTask(taskID int32) (*TaskInfo, error) {
key := strconv.Itoa(int(taskID))
bytes, err := tm.store.Get([]byte(key))
if err != nil {
return nil, err
}
task := &TaskInfo{}
err = json.Unmarshal(bytes, task)
if err != nil {
return nil, err
}
return task, nil
}
func (tm *taskManager) ReadAllTask() ([]*TaskInfo, error) {
defer tm.Unlock(tm.Lock())
return tm.readAllTask()
}
func (tm *taskManager) readAllTask() ([]*TaskInfo, error) {
scan := tm.store.Scan()
tasks := make([]*TaskInfo, 0)
for kv := range scan {
task := &TaskInfo{}
err := json.Unmarshal(kv.Value, task)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
}
return tasks, nil
}
const (
ActionDoNoting int32 = iota
ActionCancelTask
ActionPauseTask
ActionResumeTask
)
func (tm *taskManager) SetAction(task *TaskInfo, action int32) error {
if task == nil {
return fmt.Errorf("the argument `task` is nil")
}
if action < 1 || action > 3 {
return fmt.Errorf("invalid action type")
}
if task.Action == ActionCancelTask {
return fmt.Errorf("the action is `cancel`, cannot changed")
}
defer tm.Unlock(tm.Lock())
atomic.StoreInt32(&task.Action, action)
return nil
}
func SortTaskDesc(tasks []*TaskInfo) []*TaskInfo {
if tasks == nil {
return nil
}
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].ID > tasks[j].ID
})
return tasks
}