in pkg/storage/objects/pod_events.go [208:316]
func (d *podEventsOps) GetAll(
ctx context.Context,
jobID string,
instanceID uint32,
podID ...string) ([]*task.PodEvent, error) {
podEventsObject := &PodEventsObject{
JobID: jobID,
InstanceID: instanceID,
}
var result []base.Object
var runID uint64
var err error
if len(podID) > 0 && len(podID[0]) > 0 {
runID, err = util.ParseRunID(podID[0])
if err != nil {
return nil, errors.Wrap(err, "Failed to parse runID")
}
} else {
// Events are sorted in descending order by run_id and then update_time.
// If pod event is not specified, we will get the latest run_id.
podEventsObject = &PodEventsObject{
JobID: jobID,
InstanceID: instanceID,
}
// By default, the Get will fetch the latest row of the latest run_id.
row, err := d.store.oClient.Get(
ctx,
podEventsObject,
)
if err != nil {
d.store.metrics.OrmTaskMetrics.PodEventsGetFail.Inc(1)
return nil, err
}
if len(row) == 0 {
return []*task.PodEvent{}, nil
}
runID = row["run_id"].(uint64)
}
podEventsObject = &PodEventsObject{
JobID: jobID,
InstanceID: instanceID,
RunID: base.NewOptionalUInt64(runID),
}
rows, err := d.store.oClient.GetAll(ctx, podEventsObject)
for _, row := range rows {
podEventsObjectValue := &PodEventsObject{}
podEventsObjectValue.transform(row)
result = append(result, podEventsObjectValue)
}
var podEvents []*task.PodEvent
for _, value := range result {
podEvent := &task.PodEvent{}
podEventsObjectValue := value.(*PodEventsObject)
mesosTaskID := fmt.Sprintf("%s-%d-%d",
podEventsObjectValue.JobID,
podEventsObjectValue.InstanceID,
base.ConvertFromOptionalToRawType(reflect.ValueOf(
podEventsObjectValue.RunID)).(uint64))
prevMesosTaskID := fmt.Sprintf("%s-%d-%d",
podEventsObjectValue.JobID,
podEventsObjectValue.InstanceID,
podEventsObjectValue.PreviousRunID)
desiredMesosTaskID := fmt.Sprintf("%s-%d-%d",
podEventsObjectValue.JobID,
podEventsObjectValue.InstanceID,
podEventsObjectValue.DesiredRunID)
// Set podEvent fields
podEvent.TaskId = &mesos_v1.TaskID{
Value: &mesosTaskID,
}
podEvent.PrevTaskId = &mesos_v1.TaskID{
Value: &prevMesosTaskID,
}
podEvent.DesriedTaskId = &mesos_v1.TaskID{
Value: &desiredMesosTaskID,
}
convertedTS, err := gocql.ParseUUID(base.ConvertFromOptionalToRawType(
reflect.
ValueOf(podEventsObjectValue.UpdateTime)).(string))
if err != nil {
d.store.metrics.OrmTaskMetrics.PodEventsGetFail.Inc(1)
return nil, errors.Wrap(err, "Failed to parse update_time")
}
podEvent.Timestamp = convertedTS.Time().Format(time.RFC3339)
podEvent.ConfigVersion = podEventsObjectValue.ConfigVersion
podEvent.DesiredConfigVersion = podEventsObjectValue.DesiredConfigVersion
podEvent.ActualState = podEventsObjectValue.ActualState
podEvent.GoalState = podEventsObjectValue.GoalState
podEvent.Message = podEventsObjectValue.Message
podEvent.Reason = podEventsObjectValue.Reason
podEvent.AgentID = podEventsObjectValue.AgentID
podEvent.Hostname = podEventsObjectValue.Hostname
podEvent.Healthy = podEventsObjectValue.Healthy
podEvents = append(podEvents, podEvent)
}
d.store.metrics.OrmTaskMetrics.PodEventsGet.Inc(1)
return podEvents, nil
}