pkg/jobmgr/cached/update.go (908 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cached
import (
"context"
"sync"
"time"
pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job"
"github.com/uber/peloton/.gen/peloton/api/v0/peloton"
pbtask "github.com/uber/peloton/.gen/peloton/api/v0/task"
pbupdate "github.com/uber/peloton/.gen/peloton/api/v0/update"
"github.com/uber/peloton/.gen/peloton/private/models"
"github.com/uber/peloton/pkg/common/concurrency"
"github.com/uber/peloton/pkg/common/taskconfig"
"github.com/uber/peloton/pkg/common/util"
jobmgrcommon "github.com/uber/peloton/pkg/jobmgr/common"
"github.com/uber/peloton/pkg/storage"
"github.com/uber/peloton/pkg/storage/objects"
log "github.com/sirupsen/logrus"
"go.uber.org/yarpc/yarpcerrors"
)
const (
// number of workers to do has task config changed for
// get job update diff
_defaultHasTaskConfigChangeWorkers = 25
)
// Update of a job being stored in the cache.
type Update interface {
WorkflowStrategy
// Identifier of the update
ID() *peloton.UpdateID
// Job identifier the update belongs to
JobID() *peloton.JobID
// Create creates the update in DB and cache
Create(
ctx context.Context,
jobID *peloton.JobID,
jobConfig jobmgrcommon.JobConfig,
prevJobConfig *pbjob.JobConfig,
configAddOn *models.ConfigAddOn,
instanceAdded []uint32,
instanceUpdated []uint32,
instanceRemoved []uint32,
workflowType models.WorkflowType,
updateConfig *pbupdate.UpdateConfig,
opaqueData *peloton.OpaqueData,
) error
// Modify modifies the update in DB and cache
Modify(
ctx context.Context,
instancesAdded []uint32,
instancesUpdated []uint32,
instancesRemoved []uint32,
) error
// Update updates the update in DB and cache
WriteProgress(ctx context.Context,
state pbupdate.State,
instancesDone []uint32,
instanceFailed []uint32,
instancesCurrent []uint32) error
// Pause pauses the current update progress
Pause(ctx context.Context, opaqueData *peloton.OpaqueData) error
// Resume resumes a paused update, and update would change
// to the state before pause
Resume(ctx context.Context, opaqueData *peloton.OpaqueData) error
// Recover recovers the update from DB into the cache
Recover(ctx context.Context) error
// Cancel is used to cancel the update
Cancel(ctx context.Context, opaqueData *peloton.OpaqueData) error
// Rollback is used to rollback the update.
Rollback(
ctx context.Context,
currentConfig *pbjob.JobConfig,
targetConfig *pbjob.JobConfig,
) error
// GetState returns the state of the update
GetState() *UpdateStateVector
// GetGoalState returns the goal state of the update
GetGoalState() *UpdateStateVector
// GetPrevState returns the previous state of the update
GetPrevState() pbupdate.State
// GetInstancesAdded returns the instance to be added with this update
GetInstancesAdded() []uint32
// GetInstancesUpdated returns the existing instances to be updated
// with this update
GetInstancesUpdated() []uint32
// GetInstancesRemoved returns the existing instances to be removed
// with this update
GetInstancesRemoved() []uint32
// GetInstancesCurrent returns the current set of instances being updated
GetInstancesCurrent() []uint32
// GetInstanceFailed returns the current set of instances marked as failed
GetInstancesFailed() []uint32
// GetInstancesDone returns the current set of instances updated
GetInstancesDone() []uint32
GetUpdateConfig() *pbupdate.UpdateConfig
GetWorkflowType() models.WorkflowType
// GetJobVersion returns job configuration version
GetJobVersion() uint64
// GetJobPrevVersion returns previous job configuration version
GetJobPrevVersion() uint64
// IsTaskInUpdateProgress returns true if a given task is
// in progress for the given update, else returns false
IsTaskInUpdateProgress(instanceID uint32) bool
// IsTaskInFailed returns true if a given task is in the
// instancesFailed list for the given update, else returns false
IsTaskInFailed(instanceID uint32) bool
// GetLastUpdateTime return the last update time of update object
GetLastUpdateTime() time.Time
}
// UpdateStateVector is used to the represent the state and goal state
// of an update to the goal state engine.
type UpdateStateVector struct {
// current update state
State pbupdate.State
// for state, it will be the old job config version
// for goal state, it will be the desired job config version
JobVersion uint64
// For state, it will store the instances which have already been updated,
// and for goal state, it will store all the instances which
// need to be updated.
Instances []uint32
}
// newUpdate creates a new cache update object
func newUpdate(
updateID *peloton.UpdateID,
jobFactory *jobFactory) *update {
update := &update{
id: updateID,
jobFactory: jobFactory,
}
return update
}
// IsUpdateStateTerminal returns true if the update has reach terminal state
func IsUpdateStateTerminal(state pbupdate.State) bool {
switch state {
case pbupdate.State_SUCCEEDED, pbupdate.State_ABORTED,
pbupdate.State_FAILED, pbupdate.State_ROLLED_BACK:
return true
}
return false
}
// IsUpdateStateActive returns true if the update is in active state
func IsUpdateStateActive(state pbupdate.State) bool {
switch state {
case pbupdate.State_ROLLING_FORWARD, pbupdate.State_ROLLING_BACKWARD:
return true
}
return false
}
// update structure holds the information about a given job update in the cache
type update struct {
// Mutex to acquire before accessing any update information in cache
sync.RWMutex
WorkflowStrategy
jobID *peloton.JobID // Parent job identifier
id *peloton.UpdateID // update identifier
jobFactory *jobFactory // Pointer to the job factory object
state pbupdate.State // current update state
prevState pbupdate.State // previous update state
// the update configuration provided in the create request
updateConfig *pbupdate.UpdateConfig
// type of the update workflow
workflowType models.WorkflowType
// list of instances which will be updated with this update
instancesTotal []uint32
// list of instances which have already been updated successfully
instancesDone []uint32
// list of instances which have been marked as failed
instancesFailed []uint32
// list of instances which are currently being updated
instancesCurrent []uint32
// list of instances which have been added
instancesAdded []uint32
// list of existing instance which have been updated;
instancesUpdated []uint32
// list of existing instances which have been deleted;
// instancesTotal should be union of instancesAdded,
// instancesUpdated and instancesRemoved
instancesRemoved []uint32
jobVersion uint64 // job configuration version
jobPrevVersion uint64 // previous job configuration version
lastUpdateTime time.Time // last update time of update object
}
func (u *update) ID() *peloton.UpdateID {
u.RLock()
defer u.RUnlock()
return u.id
}
func (u *update) JobID() *peloton.JobID {
u.RLock()
defer u.RUnlock()
return u.jobID
}
func (u *update) Create(
ctx context.Context,
jobID *peloton.JobID,
jobConfig jobmgrcommon.JobConfig,
prevJobConfig *pbjob.JobConfig,
configAddOn *models.ConfigAddOn,
instanceAdded []uint32,
instanceUpdated []uint32,
instanceRemoved []uint32,
workflowType models.WorkflowType,
updateConfig *pbupdate.UpdateConfig,
opaqueData *peloton.OpaqueData) error {
u.Lock()
defer u.Unlock()
var state pbupdate.State
var prevState pbupdate.State
if updateConfig.GetStartPaused() == true {
state = pbupdate.State_PAUSED
prevState = pbupdate.State_INITIALIZED
} else {
state = pbupdate.State_INITIALIZED
}
updateModel := &models.UpdateModel{
UpdateID: u.id,
JobID: jobID,
UpdateConfig: updateConfig,
JobConfigVersion: jobConfig.GetChangeLog().GetVersion(),
PrevJobConfigVersion: prevJobConfig.GetChangeLog().GetVersion(),
State: state,
PrevState: prevState,
InstancesAdded: instanceAdded,
InstancesUpdated: instanceUpdated,
InstancesRemoved: instanceRemoved,
InstancesTotal: uint32(len(instanceUpdated) + len(instanceAdded) + len(instanceRemoved)),
Type: workflowType,
OpaqueData: opaqueData,
CreationTime: time.Now().Format(time.RFC3339Nano),
UpdateTime: time.Now().Format(time.RFC3339Nano),
}
// write initialized workflow state for instances on create update
instancesInUpdate := append(instanceAdded, instanceUpdated...)
instancesInUpdate = append(instancesInUpdate, instanceRemoved...)
if err := u.writeWorkflowEvents(
ctx,
instancesInUpdate,
workflowType,
state,
); err != nil {
return err
}
// Store the new update in DB
if err := u.jobFactory.updateStore.CreateUpdate(ctx, updateModel); err != nil {
return err
}
u.populateCache(updateModel)
return nil
}
func (u *update) Modify(
ctx context.Context,
instancesAdded []uint32,
instancesUpdated []uint32,
instancesRemoved []uint32) error {
u.Lock()
defer u.Unlock()
// TODO: do recovery automatically when read state
if err := u.recover(ctx); err != nil {
return err
}
now := time.Now()
updateModel := &models.UpdateModel{
UpdateID: u.id,
JobConfigVersion: u.jobVersion,
PrevJobConfigVersion: u.jobPrevVersion,
State: u.state,
PrevState: u.prevState,
InstancesAdded: instancesAdded,
InstancesUpdated: instancesUpdated,
InstancesRemoved: instancesRemoved,
InstancesDone: uint32(len(u.instancesDone)),
InstancesFailed: uint32(len(u.instancesFailed)),
InstancesCurrent: u.instancesCurrent,
InstancesTotal: uint32(len(instancesUpdated) + len(instancesAdded) + len(instancesRemoved)),
UpdateTime: now.Format(time.RFC3339Nano),
}
// write current workflow state for all instances on modify update
instancesInUpdate := append(instancesAdded, instancesUpdated...)
instancesInUpdate = append(instancesInUpdate, instancesRemoved...)
if err := u.writeWorkflowEvents(
ctx,
instancesInUpdate,
u.workflowType,
u.state,
); err != nil {
u.clearCache()
return err
}
// Store the new update in DB
if err := u.jobFactory.updateStore.ModifyUpdate(ctx, updateModel); err != nil {
u.clearCache()
return err
}
// populate in cache
u.instancesAdded = instancesAdded
u.instancesUpdated = instancesUpdated
u.instancesRemoved = instancesRemoved
u.instancesTotal = append(instancesUpdated, instancesAdded...)
u.instancesTotal = append(u.instancesTotal, instancesRemoved...)
u.lastUpdateTime = now
log.WithField("update_id", u.id.GetValue()).
WithField("instances_total", len(u.instancesTotal)).
WithField("instances_added", len(u.instancesAdded)).
WithField("instance_updated", len(u.instancesUpdated)).
WithField("instance_removed", len(u.instancesRemoved)).
Debug("update is modified")
return nil
}
func (u *update) WriteProgress(
ctx context.Context,
state pbupdate.State,
instancesDone []uint32,
instancesFailed []uint32,
instancesCurrent []uint32) error {
u.Lock()
defer u.Unlock()
// TODO: do recovery automatically when read state
if err := u.recover(ctx); err != nil {
return err
}
// if state is PAUSED, does not WriteProgress
// to overwrite the state, because it should be
// only be overwritten by Resume and Cancel.
// The branch can be reached when state is changed
// to PAUSED in handler, and the update is being
// processed in goal state engine
if u.state == pbupdate.State_PAUSED {
state = pbupdate.State_PAUSED
}
return u.writeProgress(
ctx,
state,
instancesDone,
instancesFailed,
instancesCurrent,
nil,
)
}
func (u *update) Pause(ctx context.Context, opaqueData *peloton.OpaqueData) error {
u.Lock()
defer u.Unlock()
// TODO: do recovery automatically when read state
if err := u.recover(ctx); err != nil {
return err
}
// already paused, do nothing
if u.state == pbupdate.State_PAUSED {
return nil
}
return u.writeProgress(
ctx,
pbupdate.State_PAUSED,
u.instancesDone,
u.instancesFailed,
u.instancesCurrent,
opaqueData,
)
}
func (u *update) Resume(ctx context.Context, opaqueData *peloton.OpaqueData) error {
u.Lock()
defer u.Unlock()
// TODO: do recovery automatically when read state
if err := u.recover(ctx); err != nil {
return err
}
// already unpaused, do nothing
if u.state != pbupdate.State_PAUSED {
return nil
}
return u.writeProgress(
ctx,
u.prevState,
u.instancesDone,
u.instancesFailed,
u.instancesCurrent,
opaqueData,
)
}
// writeProgress write update progress into cache and db,
// it is not concurrency safe and must be called with lock held.
func (u *update) writeProgress(
ctx context.Context,
state pbupdate.State,
instancesDone []uint32,
instancesFailed []uint32,
instancesCurrent []uint32,
opaqueData *peloton.OpaqueData) error {
// once an update is in terminal state, it should
// not have any more state change
if IsUpdateStateTerminal(u.state) {
return nil
}
// only update the prevState if it has changed, otherwise
// we would lose the prevState if WriteProgress is called
// with the same state for several times
prevState := u.prevState
if u.state != state {
prevState = u.state
// write job update event on update's state change
if err := u.jobFactory.jobUpdateEventsOps.Create(
ctx,
u.id,
u.workflowType,
state); err != nil {
u.clearCache()
return err
}
}
now := time.Now()
// Only change UpdateTime if no state changes.
// It is necessary because UpdateTime is used to check
// if a workflow is stale (i.e. not updated for a long period of time).
// For example, if a workflow is run with bad config that results in
// instances crash loop, the state would not be changed. However,
// the workflow should not be considered as stale.
if !u.updateStateChanged(
state,
prevState,
instancesDone,
instancesFailed,
instancesCurrent,
) {
// ignore error here, since UpdateTime is informational,
u.jobFactory.updateStore.WriteUpdateProgress(
ctx,
&models.UpdateModel{UpdateID: u.id, UpdateTime: now.Format(time.RFC3339Nano)})
u.lastUpdateTime = now
return nil
}
updateModel := &models.UpdateModel{
UpdateID: u.id,
PrevState: prevState,
State: state,
InstancesDone: uint32(len(instancesDone)),
InstancesFailed: uint32(len(instancesFailed)),
InstancesCurrent: instancesCurrent,
OpaqueData: opaqueData,
UpdateTime: now.Format(time.RFC3339Nano),
}
if IsUpdateStateTerminal(state) {
updateModel.CompletionTime = now.Format(time.RFC3339Nano)
}
if err := u.jobFactory.updateStore.WriteUpdateProgress(ctx, updateModel); err != nil {
// clear the cache on DB error to avoid cache inconsistency
u.clearCache()
return err
}
// TODO: Add error handling, if accurate persistence of workflow events
// is required.
u.writeWorkflowProgressForInstances(
ctx,
u.id,
instancesCurrent,
instancesDone,
instancesFailed,
u.workflowType,
state)
u.prevState = prevState
u.instancesCurrent = instancesCurrent
u.instancesFailed = instancesFailed
u.state = state
u.instancesDone = instancesDone
u.lastUpdateTime = now
return nil
}
// returns if the new state changes compared with the in-memory state.
// it is used to decide if update progress in db need to be updated,
// so when in doubt, it would return true.
func (u *update) updateStateChanged(
state pbupdate.State,
prevState pbupdate.State,
instancesDone []uint32,
instancesFailed []uint32,
instancesCurrent []uint32,
) bool {
// cache is invalidated, when in doubt assume the state changed.
if u.state == pbupdate.State_INVALID {
return true
}
return state != u.state ||
prevState != u.prevState ||
!equalInstances(instancesCurrent, u.instancesCurrent) ||
!equalInstances(instancesDone, u.instancesDone) ||
!equalInstances(instancesFailed, u.instancesFailed)
}
// returns if the two lists of instances are the same
func equalInstances(instances1 []uint32, instances2 []uint32) bool {
if len(instances1) != len(instances2) {
return false
}
// if the two instance list are equal, length of intersection
// should be the same as length of original list
intersect := util.IntersectSlice(instances1, instances2)
return len(intersect) == len(instances1)
}
func (u *update) Recover(ctx context.Context) error {
u.Lock()
defer u.Unlock()
return u.recover(ctx)
}
func (u *update) recover(ctx context.Context) error {
// update is already recovered
if u.state != pbupdate.State_INVALID {
return nil
}
updateModel, err := u.jobFactory.updateStore.GetUpdate(ctx, u.id)
if err != nil {
return err
}
u.populateCache(updateModel)
// Skip recovering terminated update for performance
if IsUpdateStateTerminal(updateModel.State) {
log.WithFields(log.Fields{
"update_id": u.id.GetValue(),
"job_id": u.jobID.GetValue(),
"state": u.state.String(),
}).Debug("skip recover update progress for terminated update")
return nil
}
// TODO: optimize the recover path, since it needs to read from task store
// for each task
// recover update progress
u.instancesCurrent, u.instancesDone, u.instancesFailed, err = getUpdateProgress(
ctx,
u.jobID,
u.WorkflowStrategy,
u.updateConfig.GetMaxInstanceAttempts(),
u.jobVersion,
u.instancesTotal,
u.instancesRemoved,
u.jobFactory.taskStore,
)
if err != nil {
u.clearCache()
return err
}
return nil
}
func (u *update) Cancel(ctx context.Context, opaqueData *peloton.OpaqueData) error {
u.Lock()
defer u.Unlock()
// TODO: do recovery automatically when read state
if err := u.recover(ctx); err != nil {
return err
}
return u.writeProgress(
ctx,
pbupdate.State_ABORTED,
u.instancesDone,
u.instancesFailed,
u.instancesCurrent,
opaqueData,
)
}
// Rollback rolls back the current update.
func (u *update) Rollback(
ctx context.Context,
currentConfig *pbjob.JobConfig,
targetConfig *pbjob.JobConfig,
) error {
u.Lock()
defer u.Unlock()
// TODO: do recovery automatically when read state
if err := u.recover(ctx); err != nil {
return err
}
// Rollback should only happen when an update is in progress.
// If an update is in terminal state, a new update with
// the old config should be created.
if IsUpdateStateTerminal(u.state) {
return nil
}
// update is already rolling back, this can happen due to error retry
if u.state == pbupdate.State_ROLLING_BACKWARD {
return nil
}
instancesAdded, instancesUpdated, instancesRemoved, _, err := GetInstancesToProcessForUpdate(
ctx,
u.jobID,
currentConfig,
targetConfig,
u.jobFactory.taskStore,
u.jobFactory.taskConfigV2Ops,
)
if err != nil {
return err
}
updateModel := &models.UpdateModel{
UpdateID: u.id,
PrevState: u.state,
State: pbupdate.State_ROLLING_BACKWARD,
InstancesCurrent: []uint32{},
InstancesAdded: instancesAdded,
InstancesRemoved: instancesRemoved,
InstancesUpdated: instancesUpdated,
InstancesTotal: uint32(len(instancesAdded) + len(instancesRemoved) + len(instancesUpdated)),
JobConfigVersion: targetConfig.GetChangeLog().GetVersion(),
PrevJobConfigVersion: currentConfig.GetChangeLog().GetVersion(),
InstancesDone: 0,
InstancesFailed: 0,
UpdateTime: time.Now().Format(time.RFC3339Nano),
}
// writes ROLLING_BACKWARD workflow state for all instances in an update
// to be rolled back.
// Event is added for all instances even though the instances
// have not gone through rollback yet.
instancesInUpdate := append(instancesAdded, instancesUpdated...)
instancesInUpdate = append(instancesInUpdate, instancesRemoved...)
if err := u.writeWorkflowEvents(
ctx,
instancesInUpdate,
u.workflowType,
pbupdate.State_ROLLING_BACKWARD,
); err != nil {
u.clearCache()
return err
}
if err := u.jobFactory.updateStore.ModifyUpdate(ctx, updateModel); err != nil {
u.clearCache()
return err
}
u.instancesCurrent = []uint32{}
u.instancesDone = []uint32{}
u.instancesFailed = []uint32{}
u.populateCache(updateModel)
return nil
}
func (u *update) GetState() *UpdateStateVector {
u.RLock()
defer u.RUnlock()
instancesDone := make([]uint32, len(u.instancesDone))
copy(instancesDone, u.instancesDone)
instancesFailed := make([]uint32, len(u.instancesFailed))
copy(instancesFailed, u.instancesFailed)
return &UpdateStateVector{
State: u.state,
Instances: append(instancesDone, instancesFailed...),
JobVersion: u.jobPrevVersion,
}
}
func (u *update) GetGoalState() *UpdateStateVector {
u.RLock()
defer u.RUnlock()
instances := make([]uint32, len(u.instancesTotal))
copy(instances, u.instancesTotal)
return &UpdateStateVector{
Instances: instances,
JobVersion: u.jobVersion,
}
}
func (u *update) GetPrevState() pbupdate.State {
u.RLock()
defer u.RUnlock()
return u.prevState
}
func (u *update) GetInstancesAdded() []uint32 {
u.RLock()
defer u.RUnlock()
instances := make([]uint32, len(u.instancesAdded))
copy(instances, u.instancesAdded)
return instances
}
func (u *update) GetInstancesUpdated() []uint32 {
u.RLock()
defer u.RUnlock()
instances := make([]uint32, len(u.instancesUpdated))
copy(instances, u.instancesUpdated)
return instances
}
func (u *update) GetInstancesDone() []uint32 {
u.RLock()
defer u.RUnlock()
instances := make([]uint32, len(u.instancesDone))
copy(instances, u.instancesDone)
return instances
}
func (u *update) GetInstancesRemoved() []uint32 {
u.RLock()
defer u.RUnlock()
instances := make([]uint32, len(u.instancesRemoved))
copy(instances, u.instancesRemoved)
return instances
}
func (u *update) GetInstancesCurrent() []uint32 {
u.RLock()
defer u.RUnlock()
instances := make([]uint32, len(u.instancesCurrent))
copy(instances, u.instancesCurrent)
return instances
}
func (u *update) GetInstancesFailed() []uint32 {
u.RLock()
defer u.RUnlock()
instances := make([]uint32, len(u.instancesFailed))
copy(instances, u.instancesFailed)
return instances
}
func (u *update) GetUpdateConfig() *pbupdate.UpdateConfig {
u.RLock()
defer u.RUnlock()
if u.updateConfig == nil {
return nil
}
updateConfig := *u.updateConfig
return &updateConfig
}
func (u *update) GetWorkflowType() models.WorkflowType {
u.RLock()
defer u.RUnlock()
return u.workflowType
}
func (u *update) GetJobVersion() uint64 {
u.RLock()
defer u.RUnlock()
return u.jobVersion
}
func (u *update) GetJobPrevVersion() uint64 {
u.RLock()
defer u.RUnlock()
return u.jobPrevVersion
}
// IsTaskInUpdateProgress returns true if a given task is
// in progress for the given update, else returns false
func (u *update) IsTaskInUpdateProgress(instanceID uint32) bool {
for _, currentInstance := range u.GetInstancesCurrent() {
if instanceID == currentInstance {
return true
}
}
return false
}
func (u *update) IsTaskInFailed(instanceID uint32) bool {
for _, currentInstance := range u.GetInstancesFailed() {
if instanceID == currentInstance {
return true
}
}
return false
}
// populate info in updateModel into update
func (u *update) populateCache(updateModel *models.UpdateModel) {
if updateModel.GetUpdateConfig() != nil {
u.updateConfig = updateModel.GetUpdateConfig()
}
if updateModel.GetType() != models.WorkflowType_UNKNOWN {
u.workflowType = updateModel.GetType()
}
if updateModel.GetJobID() != nil {
u.jobID = updateModel.GetJobID()
}
u.state = updateModel.GetState()
u.prevState = updateModel.GetPrevState()
u.instancesCurrent = updateModel.GetInstancesCurrent()
u.instancesAdded = updateModel.GetInstancesAdded()
u.instancesRemoved = updateModel.GetInstancesRemoved()
u.instancesUpdated = updateModel.GetInstancesUpdated()
u.jobVersion = updateModel.GetJobConfigVersion()
u.jobPrevVersion = updateModel.GetPrevJobConfigVersion()
u.instancesTotal = append(updateModel.GetInstancesUpdated(), updateModel.GetInstancesAdded()...)
u.instancesTotal = append(u.instancesTotal, updateModel.GetInstancesRemoved()...)
u.WorkflowStrategy = getWorkflowStrategy(updateModel.GetState(), updateModel.GetType())
u.lastUpdateTime, _ = time.Parse(time.RFC3339Nano, updateModel.GetUpdateTime())
}
func (u *update) clearCache() {
u.state = pbupdate.State_INVALID
u.prevState = pbupdate.State_INVALID
u.instancesTotal = nil
u.instancesDone = nil
u.instancesFailed = nil
u.instancesCurrent = nil
u.instancesAdded = nil
u.instancesUpdated = nil
u.instancesRemoved = nil
}
// GetUpdateProgress iterates through instancesToCheck and check if they are running and
// their current config version is the same as the desired config version.
// TODO: find the right place to put the func
func GetUpdateProgress(
ctx context.Context,
jobID *peloton.JobID,
cachedUpdate Update,
desiredConfigVersion uint64,
instancesToCheck []uint32,
taskStore storage.TaskStore,
) (instancesCurrent []uint32, instancesDone []uint32, instancesFailed []uint32, err error) {
// TODO: figure out if cache can be used to read task runtime
return getUpdateProgress(
ctx,
jobID,
cachedUpdate,
cachedUpdate.GetUpdateConfig().GetMaxInstanceAttempts(),
desiredConfigVersion,
instancesToCheck,
cachedUpdate.GetInstancesRemoved(),
taskStore,
)
}
// getUpdateProgress is the internal version of GetUpdateProgress, which does not depend on cachedUpdate.
// Therefore it can be used inside of cachedUpdate without deadlock risk.
func getUpdateProgress(
ctx context.Context,
jobID *peloton.JobID,
strategy WorkflowStrategy,
maxInstanceAttempts uint32,
desiredConfigVersion uint64,
instancesToCheck []uint32,
instancesRemoved []uint32,
taskStore storage.TaskStore,
) (instancesCurrent []uint32, instancesDone []uint32, instancesFailed []uint32, err error) {
for _, instID := range instancesToCheck {
runtime, err := taskStore.GetTaskRuntime(ctx, jobID, instID)
if err != nil {
if yarpcerrors.IsNotFound(err) {
if contains(instID, instancesRemoved) {
instancesDone = append(instancesDone, instID)
} else {
log.WithFields(log.Fields{
"job_id": jobID.GetValue(),
"instance_id": instID,
}).Error(
"instance in cache but runtime is missing from DB during update")
}
continue
}
return nil, nil, nil, err
}
if strategy.IsInstanceComplete(desiredConfigVersion, runtime) {
instancesDone = append(instancesDone, instID)
} else if strategy.IsInstanceFailed(runtime, maxInstanceAttempts) {
instancesFailed = append(instancesFailed, instID)
} else if strategy.IsInstanceInProgress(desiredConfigVersion, runtime) {
// instances set to desired configuration, but has not entered RUNNING state
instancesCurrent = append(instancesCurrent, instID)
}
}
return instancesCurrent, instancesDone, instancesFailed, nil
}
// hasInstanceConfigChanged is a helper function to determine if the configuration
// of a given task has changed from its current configuration.
func hasInstanceConfigChanged(
ctx context.Context,
jobID *peloton.JobID,
instID uint32,
configVersion uint64,
newJobConfig *pbjob.JobConfig,
taskConfigV2Ops objects.TaskConfigV2Ops,
) (bool, error) {
// Get the current task configuration. Cannot use prevTaskConfig to do
// so because the task may be still be on an older configuration
// version because the previous update may not have succeeded.
// So, fetch the task configuration of the task from the DB.
prevTaskConfig, _, err := taskConfigV2Ops.GetTaskConfig(
ctx, jobID, instID, configVersion)
if err != nil {
if yarpcerrors.IsNotFound(err) {
// configuration not found, just update it
return true, nil
}
return false, err
}
newTaskConfig := taskconfig.Merge(
newJobConfig.GetDefaultConfig(),
newJobConfig.GetInstanceConfig()[instID])
return taskconfig.HasTaskConfigChanged(prevTaskConfig, newTaskConfig), nil
}
type instanceConfigChange struct {
instanceID uint32
isChanged bool
err error
}
// GetInstancesToProcessForUpdate determines the instances which have been updated in a given
// job update. Both the old and the new job configurations are provided as
// inputs, and it returns the instances which have been added and existing
// instances which have been updated.
func GetInstancesToProcessForUpdate(
ctx context.Context,
jobID *peloton.JobID,
prevJobConfig *pbjob.JobConfig,
newJobConfig *pbjob.JobConfig,
taskStore storage.TaskStore,
taskConfigV2Ops objects.TaskConfigV2Ops,
) (
instancesAdded []uint32,
instancesUpdated []uint32,
instancesRemoved []uint32,
instancesUnchanged []uint32,
err error,
) {
var lock sync.RWMutex
// reason to create instancesUpdate and instancesUnchange because if
// one of the go-routine fails in mapper then it does early exit which
// leads to assign nil for instancesUpdated and instancesUnchanged
var instancesToCheck []uint32
var instancesToUpdate []uint32
var instancesNotChanged []uint32
var taskRuntimes map[uint32]*pbtask.RuntimeInfo
taskRuntimes, err = taskStore.GetTaskRuntimesForJobByRange(
ctx,
jobID,
nil,
)
if err != nil {
return
}
for instID := uint32(0); instID < newJobConfig.GetInstanceCount(); instID++ {
if _, ok := taskRuntimes[instID]; !ok {
// new instance added
instancesAdded = append(instancesAdded, instID)
} else {
instancesToCheck = append(instancesToCheck, instID)
}
}
f := func(ctx context.Context, instID interface{}) (interface{}, error) {
lock.RLock()
instanceID := instID.(uint32)
runtime := taskRuntimes[instanceID]
lock.RUnlock()
changed, err := hasInstanceConfigChanged(
ctx,
jobID,
instanceID,
runtime.GetConfigVersion(),
newJobConfig,
taskConfigV2Ops,
)
if err != nil {
return nil, err
}
lock.Lock()
defer lock.Unlock()
if changed || runtime.GetConfigVersion() != runtime.GetDesiredConfigVersion() {
// Update if configuration has changed.
// Also, if the configuration version is not the same as the desired
// configuration version, then lets treat this instance as one
// which needs to be updated irrespective of whether the current
// config it has is the same as provided in the new configuration.
// In some cases it may result in an unneeded restart of a
// few instances, but this ensures correctness.
instancesToUpdate = append(instancesToUpdate, instanceID)
} else {
instancesNotChanged = append(instancesNotChanged, instanceID)
}
// TODO what happens if the update does not change the instance
// configuration, but it was being updates as part of the
// previous aborted update.
delete(taskRuntimes, instanceID)
return nil, nil
}
var inputs []interface{}
for _, i := range instancesToCheck {
inputs = append(inputs, i)
}
_, err = concurrency.Map(
ctx,
concurrency.MapperFunc(f),
inputs,
_defaultHasTaskConfigChangeWorkers)
if err != nil {
return nil, nil, nil, nil, err
}
for instID := range taskRuntimes {
// instance has been removed
instancesRemoved = append(instancesRemoved, instID)
}
instancesUpdated = instancesToUpdate
instancesUnchanged = instancesNotChanged
return
}
// writeWorkflowEvents writes update state event for job and impacted instances
// Persistenting job update event is required on any workflow action, and
// if error occurs in this function, caller must retry an update
// such that update events for instance and job are guaranteed to be persisted
// Currently, this function is called on CREATE, MODIFY and ROLLING_BACKWARD
// a workflow.
func (u *update) writeWorkflowEvents(
ctx context.Context,
instances []uint32,
workflowType models.WorkflowType,
state pbupdate.State,
) error {
if err := u.addWorkflowEventForInstances(
ctx,
u.id,
workflowType,
state,
instances); err != nil {
return err
}
// only need to add job update events if new state is different from existing state
if state != u.state {
if err := u.jobFactory.jobUpdateEventsOps.Create(
ctx,
u.id,
workflowType,
state); err != nil {
return err
}
}
return nil
}
func (u *update) GetLastUpdateTime() time.Time {
u.RLock()
defer u.RUnlock()
return u.lastUpdateTime
}
// writeWorkflowProgressForInstances writes workflow progress for instances,
// in process of updating or are already updated (success/failure).
// - Add instances that succeeded
// - Add instances that failed
// - Add instances that are in process
// Workflow events are persisted for debugging a job update progress,
// and failure to persist workflow events will not retry an update
// for following scenarios, since retry cost for a scenario is high
// - Progress on ROLLING_FORWARD/ROLLING_BACKWARD
// - Write state change (PAUSED/RESUME/ABORT) for current instances only,
// unprocessed instances won't have update state change event.
func (u *update) writeWorkflowProgressForInstances(
ctx context.Context,
updateID *peloton.UpdateID,
instancesCurrent []uint32,
instancesDone []uint32,
instancesFailed []uint32,
workflowType models.WorkflowType,
state pbupdate.State) error {
// instances updated successfully
succeededInstances := util.IntersectSlice(u.instancesCurrent, instancesDone)
if err := u.addWorkflowEventForInstances(
ctx,
updateID,
workflowType,
pbupdate.State_SUCCEEDED,
succeededInstances); err != nil {
return err
}
// instances update failed
failedInstances := util.IntersectSlice(u.instancesCurrent, instancesFailed)
if err := u.addWorkflowEventForInstances(
ctx,
updateID,
workflowType,
pbupdate.State_FAILED,
failedInstances); err != nil {
return err
}
// new instances to update
if err := u.addWorkflowEventForInstances(
ctx,
updateID,
workflowType,
state,
instancesCurrent); err != nil {
return err
}
return nil
}
// addWorkflowEventForInstances writes the workflow state & type for
// instances that are part of the update.
func (u *update) addWorkflowEventForInstances(
ctx context.Context,
updateID *peloton.UpdateID,
workflowType models.WorkflowType,
workflowState pbupdate.State,
instances []uint32) error {
// addWorkflowEvent persists the workflow event for an instance at storage
addWorkflowEvent := func(id uint32) error {
return u.jobFactory.updateStore.AddWorkflowEvent(
ctx,
updateID,
id,
workflowType,
workflowState)
}
// add workflow events for provided instances in parallel batches.
return util.RunInParallel(u.id.GetValue(), instances, addWorkflowEvent)
}
// contains is a helper function to check if an element is present in the list
func contains(element uint32, slice []uint32) bool {
for _, v := range slice {
if v == element {
return true
}
}
return false
}