pkg/common/statusupdate/event.go (188 lines of code) (raw):
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package statusupdate
import (
"fmt"
"time"
pbmesos "github.com/uber/peloton/.gen/mesos/v1"
pbtask "github.com/uber/peloton/.gen/peloton/api/v0/task"
pbpod "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod"
pbeventstream "github.com/uber/peloton/.gen/peloton/private/eventstream"
v1pbevent "github.com/uber/peloton/.gen/peloton/private/eventstream/v1alpha/event"
"github.com/uber/peloton/pkg/common/api"
"github.com/uber/peloton/pkg/common/util"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// Event provides a common interface to v0 and v1 event.
type Event struct {
taskID string
state pbtask.TaskState
// either v0event or v1event is set
v0event *pbeventstream.Event
v1event *v1pbevent.Event
}
// New an Event from a v0 event.
func NewV0(event *pbeventstream.Event) (*Event, error) {
return convertV0Event(event)
}
// New an Event from a v1 event.
func NewV1(event *v1pbevent.Event) (*Event, error) {
return convertV1Event(event)
}
// V0 returns underlying v0 event.
func (sue *Event) V0() *pbeventstream.Event {
if sue == nil {
return nil
}
return sue.v0event
}
// V1 returns the underlying v1 event.
func (sue *Event) V1() *v1pbevent.Event {
if sue == nil {
return nil
}
return sue.v1event
}
// TaskID returns task ID of the event.
// It is the peloton task id for v0 event and pod id for v1 event.
func (sue *Event) TaskID() string {
return sue.taskID
}
// State returns task state.
func (sue *Event) State() pbtask.TaskState {
return sue.state
}
// StatusMsg returns status message.
func (sue *Event) StatusMsg() string {
if sue.v0event != nil {
return sue.v0event.GetMesosTaskStatus().GetMessage()
}
return sue.v1event.GetPodEvent().GetMessage()
}
// MesosTaskID returns mesos task ID.
func (sue *Event) MesosTaskID() *pbmesos.TaskID {
if sue.v0event != nil {
return sue.v0event.MesosTaskStatus.GetTaskId()
}
return &pbmesos.TaskID{Value: &sue.taskID}
}
// AgentID returns agent ID.
func (sue *Event) AgentID() *pbmesos.AgentID {
if sue.v0event != nil {
return sue.v0event.MesosTaskStatus.GetAgentId()
}
return &pbmesos.AgentID{Value: &sue.v1event.PodEvent.Hostname}
}
// MesosTaskStatus returns mesos task status for v0 event only.
func (sue *Event) MesosTaskStatus() *pbmesos.TaskStatus {
if sue.v0event != nil {
return sue.v0event.GetMesosTaskStatus()
}
return nil
}
// Offset returns offset.
func (sue *Event) Offset() uint64 {
if sue.v0event != nil {
return sue.v0event.Offset
}
return sue.v1event.Offset
}
// PodEvent returns pod event for v1 event only.
func (sue *Event) PodEvent() *pbpod.PodEvent {
if sue.v1event != nil {
return sue.v1event.PodEvent
}
return nil
}
// Reason returns reason.
func (sue *Event) Reason() string {
if sue.v0event != nil {
return sue.v0event.GetMesosTaskStatus().GetReason().String()
}
return sue.v1event.PodEvent.GetReason()
}
// Healthy returns healthy bit.
func (sue *Event) Healthy() bool {
if sue.v0event != nil {
return sue.v0event.GetMesosTaskStatus().GetHealthy()
}
return sue.v1event.PodEvent.GetHealthy() == pbpod.HealthState_HEALTH_STATE_HEALTHY.String()
}
// Message returns message.
func (sue *Event) Message() string {
if sue.v0event != nil {
return sue.v0event.GetMesosTaskStatus().GetMessage()
}
return sue.v1event.PodEvent.GetMessage()
}
// Timestamp returns timestamp in second.
func (sue *Event) Timestamp() *float64 {
var ts float64
if sue.v0event != nil {
ts = sue.v0event.GetMesosTaskStatus().GetTimestamp()
} else {
t, err := time.Parse(time.RFC3339, sue.v1event.PodEvent.GetTimestamp())
if err != nil {
log.WithField("pod_timestamp", sue.v1event.PodEvent.GetTimestamp()).Error("invalid timestamp")
} else {
ts = float64(t.UnixNano()) / 1e9
}
}
return &ts
}
func convertV0Event(event *pbeventstream.Event) (*Event, error) {
var err error
updateEvent := &Event{v0event: event}
if event.Type == pbeventstream.Event_HOST_EVENT {
return nil, fmt.Errorf("cannot convert host-event to statusupdate event")
} else if event.Type == pbeventstream.Event_MESOS_TASK_STATUS {
mesosTaskID := event.MesosTaskStatus.GetTaskId().GetValue()
updateEvent.taskID, err = util.ParseTaskIDFromMesosTaskID(mesosTaskID)
if err != nil {
return nil, errors.Wrap(err,
fmt.Sprintf(
"failed to parse taskID from mesosTaskID: %v",
mesosTaskID,
),
)
}
updateEvent.state = util.MesosStateToPelotonState(event.MesosTaskStatus.GetState())
log.WithFields(log.Fields{
"task_id": updateEvent.taskID,
"state": updateEvent.state.String(),
}).Debug("Adding Mesos Event ")
} else if event.Type == pbeventstream.Event_PELOTON_TASK_EVENT {
// Peloton task event is used for task status update from resmgr.
updateEvent.taskID = event.PelotonTaskEvent.TaskId.Value
updateEvent.state = event.PelotonTaskEvent.State
log.WithFields(log.Fields{
"task_id": updateEvent.taskID,
"state": updateEvent.state.String(),
}).Debug("Adding Peloton Event ")
} else {
log.WithFields(log.Fields{
"task_id": updateEvent.taskID,
"state": updateEvent.state.String(),
}).Error("Unknown Event ")
return nil, errors.New("Unknown Event ")
}
return updateEvent, nil
}
func parsePodState(s string) pbpod.PodState {
switch s {
case pbpod.PodState_POD_STATE_LAUNCHED.String():
return pbpod.PodState_POD_STATE_LAUNCHED
case pbpod.PodState_POD_STATE_RUNNING.String():
return pbpod.PodState_POD_STATE_RUNNING
case pbpod.PodState_POD_STATE_SUCCEEDED.String():
return pbpod.PodState_POD_STATE_SUCCEEDED
case pbpod.PodState_POD_STATE_FAILED.String():
return pbpod.PodState_POD_STATE_FAILED
case pbpod.PodState_POD_STATE_LOST.String():
return pbpod.PodState_POD_STATE_LOST
case pbpod.PodState_POD_STATE_KILLED.String():
return pbpod.PodState_POD_STATE_KILLED
}
return pbpod.PodState_POD_STATE_INVALID
}
func convertV1Event(event *v1pbevent.Event) (*Event, error) {
podEvent := event.GetPodEvent()
// At this point, podID and mesosTaskID look the same. So we can just
// extract taskID from podID using the same way as we do for mesos in
// case of v0 event.
taskID, err := util.ParseTaskIDFromMesosTaskID(
podEvent.GetPodId().GetValue(),
)
if err != nil {
return nil, errors.Wrap(err,
fmt.Sprintf(
"failed to parse taskID from podID: %v",
podEvent.GetPodId().GetValue(),
),
)
}
return &Event{
taskID: taskID,
state: api.ConvertPodStateToTaskState(
parsePodState(
podEvent.ActualState),
),
v1event: event,
}, nil
}