pkg/jobmgr/task/event/update.go (529 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package event
import (
"context"
"strings"
"time"
mesos "github.com/uber/peloton/.gen/mesos/v1"
pbjob "github.com/uber/peloton/.gen/peloton/api/v0/job"
pb_task "github.com/uber/peloton/.gen/peloton/api/v0/task"
"github.com/uber/peloton/.gen/peloton/api/v0/volume"
pb_eventstream "github.com/uber/peloton/.gen/peloton/private/eventstream"
pbeventstream "github.com/uber/peloton/.gen/peloton/private/eventstream"
v1pbevent "github.com/uber/peloton/.gen/peloton/private/eventstream/v1alpha/event"
"github.com/uber/peloton/pkg/common"
"github.com/uber/peloton/pkg/common/api"
"github.com/uber/peloton/pkg/common/eventstream"
"github.com/uber/peloton/pkg/common/statusupdate"
"github.com/uber/peloton/pkg/common/util"
v1eventstream "github.com/uber/peloton/pkg/common/v1alpha/eventstream"
"github.com/uber/peloton/pkg/jobmgr/cached"
"github.com/uber/peloton/pkg/jobmgr/goalstate"
jobmgr_task "github.com/uber/peloton/pkg/jobmgr/task"
"github.com/uber/peloton/pkg/jobmgr/task/lifecyclemgr"
taskutil "github.com/uber/peloton/pkg/jobmgr/util/task"
"github.com/uber/peloton/pkg/storage"
"github.com/gogo/protobuf/proto"
log "github.com/sirupsen/logrus"
"github.com/uber-go/tally"
"go.uber.org/yarpc"
"go.uber.org/yarpc/yarpcerrors"
)
const (
// Mesos event message that indicates duplicate task ID
_msgMesosDuplicateID = "Task has duplicate ID"
// _numOrphanTaskKillAttempts is number of attempts to
// kill orphan task in case of error from host manager
_numOrphanTaskKillAttempts = 3
// _waitForRetryOnError is the time between successive retries
// to kill orphan task in case of error from host manager
_waitForRetryOnErrorOrphanTaskKill = 5 * time.Millisecond
)
// Declare a Now function so that we can mock it in unit tests.
var now = time.Now
// StatusUpdate is the interface for task status updates
type StatusUpdate interface {
Start()
Stop()
}
// Listener is the interface for StatusUpdate listener
type Listener interface {
OnV0Events(events []*pbeventstream.Event)
Start()
Stop()
}
// StatusUpdate reads and processes the task state change events from HM
type statusUpdate struct {
jobStore storage.JobStore
taskStore storage.TaskStore
volumeStore storage.PersistentVolumeStore
eventClients map[string]StatusUpdate
lm lifecyclemgr.Manager
applier *asyncEventProcessor
jobFactory cached.JobFactory
goalStateDriver goalstate.Driver
listeners []Listener
rootCtx context.Context
metrics *Metrics
}
// NewTaskStatusUpdate creates a statusUpdate
func NewTaskStatusUpdate(
d *yarpc.Dispatcher,
jobStore storage.JobStore,
taskStore storage.TaskStore,
volumeStore storage.PersistentVolumeStore,
jobFactory cached.JobFactory,
goalStateDriver goalstate.Driver,
listeners []Listener,
parentScope tally.Scope,
hmVersion api.Version,
) StatusUpdate {
statusUpdater := &statusUpdate{
jobStore: jobStore,
taskStore: taskStore,
volumeStore: volumeStore,
rootCtx: context.Background(),
metrics: NewMetrics(parentScope.SubScope("status_updater")),
eventClients: make(map[string]StatusUpdate),
jobFactory: jobFactory,
goalStateDriver: goalStateDriver,
listeners: listeners,
lm: lifecyclemgr.New(hmVersion, d, parentScope),
}
// TODO: add config for BucketEventProcessor
statusUpdater.applier = newBucketEventProcessor(statusUpdater, 100, 10000)
if hmVersion.IsV1() {
v1eventClient := v1eventstream.NewEventStreamClient(
d,
common.PelotonJobManager,
common.PelotonHostManager,
statusUpdater,
parentScope.SubScope("HostmgrV1EventStreamClient"))
statusUpdater.eventClients[common.PelotonV1HostManager] = v1eventClient
} else {
eventClient := eventstream.NewEventStreamClient(
d,
common.PelotonJobManager,
common.PelotonHostManager,
statusUpdater,
parentScope.SubScope("HostmgrEventStreamClient"))
statusUpdater.eventClients[common.PelotonHostManager] = eventClient
}
eventClientRM := eventstream.NewEventStreamClient(
d,
common.PelotonJobManager,
common.PelotonResourceManager,
statusUpdater,
parentScope.SubScope("ResmgrEventStreamClient"))
statusUpdater.eventClients[common.PelotonResourceManager] = eventClientRM
return statusUpdater
}
// OnV0Event is the callback function notifying an event
func (p *statusUpdate) OnV0Event(event *pb_eventstream.Event) {
log.WithField("event_offset", event.Offset).Debug("JobMgr received v0 event")
if event.GetType() != pbeventstream.Event_HOST_EVENT {
p.applier.addV0Event(event)
}
}
// OnV1Event is the callback function notifying an event
func (p *statusUpdate) OnV1Event(event *v1pbevent.Event) {
log.WithField("event_offset", event.Offset).Debug("JobMgr received v1 event")
p.applier.addV1Event(event)
}
// GetEventProgress returns the progress of the event progressing
func (p *statusUpdate) GetEventProgress() uint64 {
return p.applier.GetEventProgress()
}
// ProcessStatusUpdate processes the actual task status
func (p *statusUpdate) ProcessStatusUpdate(
ctx context.Context,
updateEvent *statusupdate.Event,
) error {
var currTaskResourceUsage map[string]float64
p.logTaskMetrics(updateEvent)
isOrphanTask, taskInfo, err := p.isOrphanTaskEvent(ctx, updateEvent)
if err != nil {
return err
}
if isOrphanTask {
p.metrics.SkipOrphanTasksTotal.Inc(1)
taskInfo := &pb_task.TaskInfo{
Runtime: &pb_task.RuntimeInfo{
State: updateEvent.State(),
MesosTaskId: updateEvent.MesosTaskID(),
AgentID: updateEvent.AgentID(),
},
}
// Kill the orphan task
for i := 0; i < _numOrphanTaskKillAttempts; i++ {
err = jobmgr_task.KillOrphanTask(ctx, p.lm, taskInfo)
if err == nil {
return nil
}
time.Sleep(_waitForRetryOnErrorOrphanTaskKill)
}
return nil
}
// whether to skip or not if instance state is similar before and after
if isDuplicateStateUpdate(taskInfo, updateEvent) {
return nil
}
if updateEvent.State() == pb_task.TaskState_RUNNING &&
taskInfo.GetConfig().GetVolume() != nil &&
len(taskInfo.GetRuntime().GetVolumeID().GetValue()) != 0 {
// Update volume state to be CREATED upon task RUNNING.
if err := p.updatePersistentVolumeState(ctx, taskInfo); err != nil {
return err
}
}
newRuntime := proto.Clone(taskInfo.GetRuntime()).(*pb_task.RuntimeInfo)
// Persist the reason and message for mesos updates
newRuntime.Message = updateEvent.StatusMsg()
newRuntime.Reason = ""
// Persist healthy field if health check is enabled
if taskInfo.GetConfig().GetHealthCheck() != nil {
reason := updateEvent.Reason()
healthy := updateEvent.Healthy()
p.persistHealthyField(updateEvent.State(), reason, healthy, newRuntime)
}
// Update FailureCount
updateFailureCount(updateEvent.State(), taskInfo.GetRuntime(), newRuntime)
switch updateEvent.State() {
case pb_task.TaskState_FAILED:
reason := updateEvent.Reason()
msg := updateEvent.Message()
if reason == mesos.TaskStatus_REASON_TASK_INVALID.String() &&
strings.Contains(msg, _msgMesosDuplicateID) {
log.WithField("task_id", updateEvent.TaskID()).
Info("ignoring duplicate task id failure")
return nil
}
// task failed, do not place the task on the same host for retry,
// in case it is a machine failure
newRuntime.DesiredHost = ""
newRuntime.Reason = reason
newRuntime.State = updateEvent.State()
newRuntime.Message = msg
// TODO p2k: can we build TerminationStatus from PodEvent?
termStatus := &pb_task.TerminationStatus{
Reason: pb_task.TerminationStatus_TERMINATION_STATUS_REASON_FAILED,
}
if code, err := taskutil.GetExitStatusFromMessage(msg); err == nil {
termStatus.ExitCode = code
} else if yarpcerrors.IsNotFound(err) == false {
log.WithField("task_id", updateEvent.TaskID()).
WithField("error", err).
Debug("Failed to extract exit status from message")
}
if sig, err := taskutil.GetSignalFromMessage(msg); err == nil {
termStatus.Signal = sig
} else if yarpcerrors.IsNotFound(err) == false {
log.WithField("task_id", updateEvent.TaskID()).
WithField("error", err).
Debug("Failed to extract termination signal from message")
}
newRuntime.TerminationStatus = termStatus
case pb_task.TaskState_LOST:
newRuntime.Reason = updateEvent.Reason()
if util.IsPelotonStateTerminal(taskInfo.GetRuntime().GetState()) {
// Skip LOST status update if current state is terminal state.
log.WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": updateEvent.MesosTaskStatus(),
}).Debug("skip reschedule lost task as it is already in terminal state")
return nil
}
if taskInfo.GetRuntime().GetGoalState() == pb_task.TaskState_KILLED {
// Do not take any action for killed tasks, just mark it killed.
// Same message will go to resource manager which will release the placement.
log.WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": updateEvent.MesosTaskStatus(),
}).Debug("mark stopped task as killed due to LOST")
newRuntime.State = pb_task.TaskState_KILLED
newRuntime.Message = "Stopped task LOST event: " + updateEvent.StatusMsg()
break
}
if taskInfo.GetConfig().GetVolume() != nil &&
len(taskInfo.GetRuntime().GetVolumeID().GetValue()) != 0 {
// Do not reschedule stateful task. Storage layer will decide
// whether to start or replace this task.
newRuntime.State = pb_task.TaskState_LOST
break
}
log.WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": updateEvent.MesosTaskStatus(),
}).Info("reschedule lost task if needed")
// task failed due to lost, do not place the task on the same host for retry
newRuntime.DesiredHost = ""
newRuntime.State = pb_task.TaskState_LOST
newRuntime.Message = "Task LOST: " + updateEvent.StatusMsg()
newRuntime.Reason = updateEvent.Reason()
// Calculate resource usage for TaskState_LOST using time.Now() as
// completion time
currTaskResourceUsage = getCurrTaskResourceUsage(
updateEvent.TaskID(), updateEvent.State(), taskInfo.GetConfig().GetResource(),
taskInfo.GetRuntime().GetStartTime(),
now().UTC().Format(time.RFC3339Nano))
default:
newRuntime.State = updateEvent.State()
}
cachedJob := p.jobFactory.AddJob(taskInfo.GetJobId())
// Update task start and completion timestamps
if newRuntime.GetState() == pb_task.TaskState_RUNNING {
if updateEvent.State() != taskInfo.GetRuntime().GetState() {
// StartTime is set at the time of first RUNNING event
// CompletionTime may have been set (e.g. task has been set),
// which could make StartTime larger than CompletionTime.
// Reset CompletionTime every time a task transits to RUNNING state.
newRuntime.StartTime = now().UTC().Format(time.RFC3339Nano)
newRuntime.CompletionTime = ""
// when task is RUNNING, reset the desired host field. Therefore,
// the task would be scheduled onto a different host when the task
// restarts (e.g due to health check or fail retry)
newRuntime.DesiredHost = ""
if len(taskInfo.GetRuntime().GetDesiredHost()) != 0 {
p.metrics.TasksInPlacePlacementTotal.Inc(1)
if taskInfo.GetRuntime().GetDesiredHost() == taskInfo.GetRuntime().GetHost() {
p.metrics.TasksInPlacePlacementSuccess.Inc(1)
} else {
log.WithField("job_id", taskInfo.GetJobId().GetValue()).
WithField("instance_id", taskInfo.GetInstanceId()).
Info("task fail to place on desired host")
}
}
}
} else if util.IsPelotonStateTerminal(newRuntime.GetState()) &&
cachedJob.GetJobType() == pbjob.JobType_BATCH {
// only update resource count when a batch job is in terminal state
completionTime := now().UTC().Format(time.RFC3339Nano)
newRuntime.CompletionTime = completionTime
currTaskResourceUsage = getCurrTaskResourceUsage(
updateEvent.TaskID(), updateEvent.State(), taskInfo.GetConfig().GetResource(),
taskInfo.GetRuntime().GetStartTime(), completionTime)
if len(currTaskResourceUsage) > 0 {
// current task resource usage was updated by this event, so we should
// add it to aggregated resource usage for the task and update runtime
aggregateTaskResourceUsage := taskInfo.GetRuntime().GetResourceUsage()
if len(aggregateTaskResourceUsage) > 0 {
for k, v := range currTaskResourceUsage {
aggregateTaskResourceUsage[k] += v
}
newRuntime.ResourceUsage = aggregateTaskResourceUsage
}
}
} else if cachedJob.GetJobType() == pbjob.JobType_SERVICE {
// for service job, reset resource usage
currTaskResourceUsage = nil
newRuntime.ResourceUsage = nil
}
// Update the task update times in job cache and then update the task runtime in cache and DB
cachedJob.SetTaskUpdateTime(updateEvent.Timestamp())
if _, err = cachedJob.CompareAndSetTask(
ctx,
taskInfo.GetInstanceId(),
newRuntime,
false,
); err != nil {
log.WithError(err).
WithFields(log.Fields{
"task_id": updateEvent.TaskID(),
"state": updateEvent.State().String()}).
Error("Fail to update runtime for taskID")
return err
}
// Enqueue task to goal state
p.goalStateDriver.EnqueueTask(
taskInfo.GetJobId(),
taskInfo.GetInstanceId(),
time.Now())
// Enqueue job to goal state as well
goalstate.EnqueueJobWithDefaultDelay(
taskInfo.GetJobId(), p.goalStateDriver, cachedJob)
// Update job's resource usage with the current task resource usage.
// This is a noop in case currTaskResourceUsage is nil
// This operation is not idempotent. So we will update job resource usage
// in cache only after successfully updating task resource usage in DB
// In case of errors in PatchTasks(), ProcessStatusUpdate will be retried
// indefinitely until errors are resolved.
cachedJob.UpdateResourceUsage(currTaskResourceUsage)
return nil
}
// logTaskMetrics logs events metrics
func (p *statusUpdate) logTaskMetrics(event *statusupdate.Event) {
if event.V0() == nil {
return
}
// Update task state counter for non-reconcilication update.
reason := event.MesosTaskStatus().GetReason()
if reason != mesos.TaskStatus_REASON_RECONCILIATION {
switch event.State() {
case pb_task.TaskState_RUNNING:
p.metrics.TasksRunningTotal.Inc(1)
case pb_task.TaskState_SUCCEEDED:
p.metrics.TasksSucceededTotal.Inc(1)
case pb_task.TaskState_FAILED:
p.metrics.TasksFailedTotal.Inc(1)
p.metrics.TasksFailedReason[int32(reason)].Inc(1)
log.WithFields(log.Fields{
"task_id": event.TaskID(),
"failed_reason": mesos.TaskStatus_Reason_name[int32(reason)],
}).Debug("received failed task")
case pb_task.TaskState_KILLED:
p.metrics.TasksKilledTotal.Inc(1)
case pb_task.TaskState_LOST:
p.metrics.TasksLostTotal.Inc(1)
case pb_task.TaskState_LAUNCHED:
p.metrics.TasksLaunchedTotal.Inc(1)
case pb_task.TaskState_STARTING:
p.metrics.TasksStartingTotal.Inc(1)
}
} else {
p.metrics.TasksReconciledTotal.Inc(1)
}
}
// isOrphanTaskEvent returns if a task event is from orphan task,
// it returns the TaskInfo if task is not orphan
func (p *statusUpdate) isOrphanTaskEvent(
ctx context.Context,
event *statusupdate.Event,
) (bool, *pb_task.TaskInfo, error) {
taskInfo, err := p.taskStore.GetTaskByID(ctx, event.TaskID())
if err != nil {
if yarpcerrors.IsNotFound(err) {
// if task runtime or config is not present in the DB,
// then the task is orphan
log.WithFields(log.Fields{
"mesos_task_id": event.MesosTaskStatus(),
"task_status_event≠": event.State().String(),
}).Info("received status update for task not found in DB")
return true, nil, nil
}
log.WithError(err).
WithField("task_id", event.TaskID()).
WithField("task_status_event", event.MesosTaskStatus()).
WithField("state", event.State().String()).
Error("fail to find taskInfo for taskID for mesos event")
return false, nil, err
}
// TODO p2k: verify v1 pod id in taskInfo
if event.V0() != nil {
dbTaskID := taskInfo.GetRuntime().GetMesosTaskId().GetValue()
if dbTaskID != event.MesosTaskStatus().GetTaskId().GetValue() {
log.WithFields(log.Fields{
"orphan_task_id": event.MesosTaskStatus().GetTaskId().GetValue(),
"db_task_id": dbTaskID,
"db_task_runtime_state": taskInfo.GetRuntime().GetState().String(),
"mesos_event_state": event.State().String(),
}).Info("received status update for orphan mesos task")
return true, nil, nil
}
}
return false, taskInfo, nil
}
// updatePersistentVolumeState updates volume state to be CREATED.
func (p *statusUpdate) updatePersistentVolumeState(ctx context.Context, taskInfo *pb_task.TaskInfo) error {
// Update volume state to be created if task enters RUNNING state.
volumeInfo, err := p.volumeStore.GetPersistentVolume(ctx, taskInfo.GetRuntime().GetVolumeID())
if err != nil {
log.WithError(err).WithFields(log.Fields{
"job_id": taskInfo.GetJobId().GetValue(),
"instance_id": taskInfo.GetInstanceId(),
"db_task_runtime": taskInfo.GetRuntime(),
"volume_id": taskInfo.GetRuntime().GetVolumeID(),
}).Error("Failed to read db for given volume")
_, ok := err.(*storage.VolumeNotFoundError)
if !ok {
// Do not ack status update running if db read error.
return err
}
return nil
}
// Do not update volume db if state is already CREATED or goalstate is DELETED.
if volumeInfo.GetState() == volume.VolumeState_CREATED ||
volumeInfo.GetGoalState() == volume.VolumeState_DELETED {
return nil
}
volumeInfo.State = volume.VolumeState_CREATED
return p.volumeStore.UpdatePersistentVolume(ctx, volumeInfo)
}
// ProcessListeners is for v0 only as we will remove the eventforwarder in v1.
func (p *statusUpdate) ProcessListeners(event *statusupdate.Event) {
if event != nil && event.V1() != nil {
return
}
for _, listener := range p.listeners {
listener.OnV0Events([]*pb_eventstream.Event{event.V0()})
}
}
// OnEvents is the callback function notifying a batch of events
func (p *statusUpdate) OnV0Events(events []*pb_eventstream.Event) {}
func (p *statusUpdate) OnV1Events(events []*v1pbevent.Event) {}
// Start starts processing status update events
func (p *statusUpdate) Start() {
p.applier.start()
for _, client := range p.eventClients {
client.Start()
}
log.Info("Task status updater started")
for _, listener := range p.listeners {
listener.Start()
}
}
// Stop stops processing status update events
func (p *statusUpdate) Stop() {
for _, client := range p.eventClients {
client.Stop()
}
log.Info("Task status updater stopped")
for _, listener := range p.listeners {
listener.Stop()
}
p.applier.drainAndShutdown()
}
func getCurrTaskResourceUsage(taskID string, state pb_task.TaskState,
resourceCfg *pb_task.ResourceConfig,
startTime, completionTime string) map[string]float64 {
currTaskResourceUsage, err := jobmgr_task.CreateResourceUsageMap(
resourceCfg, startTime, completionTime)
if err != nil {
// only log the error here and continue processing the event
// in this case resource usage map will be nil
log.WithError(err).
WithFields(log.Fields{
"task_id": taskID,
"state": state}).
Error("failed to calculate resource usage")
}
return currTaskResourceUsage
}
// persistHealthyField update the healthy field in runtimeDiff
func (p *statusUpdate) persistHealthyField(
state pb_task.TaskState,
reason string,
healthy bool,
newRuntime *pb_task.RuntimeInfo) {
switch {
case util.IsPelotonStateTerminal(state):
// Set healthy to INVALID for all terminal state
newRuntime.Healthy = pb_task.HealthState_INVALID
case state == pb_task.TaskState_RUNNING:
// Only record the health check result when
// the reason for the event is TASK_HEALTH_CHECK_STATUS_UPDATED
if reason == mesos.TaskStatus_REASON_TASK_HEALTH_CHECK_STATUS_UPDATED.String() {
newRuntime.Reason = reason
if healthy {
newRuntime.Healthy = pb_task.HealthState_HEALTHY
p.metrics.TasksHealthyTotal.Inc(1)
} else {
newRuntime.Healthy = pb_task.HealthState_UNHEALTHY
p.metrics.TasksUnHealthyTotal.Inc(1)
}
}
}
}
func updateFailureCount(
eventState pb_task.TaskState,
runtime *pb_task.RuntimeInfo,
newRuntime *pb_task.RuntimeInfo) {
if !util.IsPelotonStateTerminal(eventState) {
return
}
if runtime.GetConfigVersion() != runtime.GetDesiredConfigVersion() {
// do not increment the failure count if config version has changed
return
}
switch {
case eventState == pb_task.TaskState_FAILED:
newRuntime.FailureCount = runtime.GetFailureCount() + 1
case eventState == pb_task.TaskState_SUCCEEDED &&
runtime.GetGoalState() == pb_task.TaskState_RUNNING:
newRuntime.FailureCount = runtime.GetFailureCount() + 1
case eventState == pb_task.TaskState_KILLED &&
runtime.GetGoalState() != pb_task.TaskState_KILLED:
// This KILLED event is unexpected
newRuntime.FailureCount = runtime.GetFailureCount() + 1
}
}
// isDuplicateStateUpdate validates if the current instance state is left unchanged
// by this status update.
// If it is left unchanged, then the status update should be ignored.
// The state is said to be left unchanged
// if any of the following conditions is satisfied.
//
// 1. State is the same and that state is not running.
// 2. State is the same, that state is running, and health check is not configured.
// 3. State is the same, that state is running, and the update is not due to health check result.
// 4. State is the same, that state is running, the update is due to health check result and the task is healthy.
//
// Each unhealthy state needs to be logged into the pod events table.
func isDuplicateStateUpdate(
taskInfo *pb_task.TaskInfo,
updateEvent *statusupdate.Event,
) bool {
if updateEvent.State() != taskInfo.GetRuntime().GetState() {
return false
}
mesosTaskStatus := updateEvent.MesosTaskStatus()
podEvent := updateEvent.PodEvent()
if updateEvent.State() != pb_task.TaskState_RUNNING {
log.WithFields(log.Fields{
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": mesosTaskStatus,
"pod_event": podEvent,
}).Debug("skip same status update if state is not RUNNING")
return true
}
if taskInfo.GetConfig().GetHealthCheck() == nil ||
!taskInfo.GetConfig().GetHealthCheck().GetEnabled() {
log.WithFields(log.Fields{
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": mesosTaskStatus,
"pod_event": podEvent,
}).Debug("skip same status update if health check is not configured or " +
"disabled")
return true
}
newStateReason := updateEvent.Reason()
// TODO p2k: not sure which kubelet reason matches this.
// Should we skip some status updates from kubelets?
if newStateReason != mesos.TaskStatus_REASON_TASK_HEALTH_CHECK_STATUS_UPDATED.String() {
log.WithFields(log.Fields{
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": mesosTaskStatus,
"pod_event": podEvent,
}).Debug("skip same status update if status update reason is not from health check")
return true
}
// Current behavior will log consecutive negative health check results
// ToDo (varung): Evaluate if consecutive negative results should be logged or not
isPreviousStateHealthy := taskInfo.GetRuntime().GetHealthy() == pb_task.HealthState_HEALTHY
if !isPreviousStateHealthy {
log.WithFields(log.Fields{
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": mesosTaskStatus,
"pod_event": podEvent,
}).Debug("log each negative health check result")
return false
}
if updateEvent.Healthy() == isPreviousStateHealthy {
log.WithFields(log.Fields{
"db_task_runtime": taskInfo.GetRuntime(),
"task_status_event": mesosTaskStatus,
"pod_event": podEvent,
}).Debug("skip same status update if health check result is positive consecutively")
return true
}
return false
}