func()

in pkg/storage/objects/pod_events.go [208:316]


func (d *podEventsOps) GetAll(
	ctx context.Context,
	jobID string,
	instanceID uint32,
	podID ...string) ([]*task.PodEvent, error) {
	podEventsObject := &PodEventsObject{
		JobID:      jobID,
		InstanceID: instanceID,
	}
	var result []base.Object
	var runID uint64
	var err error
	if len(podID) > 0 && len(podID[0]) > 0 {
		runID, err = util.ParseRunID(podID[0])
		if err != nil {
			return nil, errors.Wrap(err, "Failed to parse runID")
		}
	} else {
		// Events are sorted in descending order by run_id and then update_time.
		// If pod event is not specified, we will get the latest run_id.
		podEventsObject = &PodEventsObject{
			JobID:      jobID,
			InstanceID: instanceID,
		}
		// By default, the Get will fetch the latest row of the latest run_id.
		row, err := d.store.oClient.Get(
			ctx,
			podEventsObject,
		)

		if err != nil {
			d.store.metrics.OrmTaskMetrics.PodEventsGetFail.Inc(1)
			return nil, err
		}
		if len(row) == 0 {
			return []*task.PodEvent{}, nil
		}
		runID = row["run_id"].(uint64)
	}

	podEventsObject = &PodEventsObject{
		JobID:      jobID,
		InstanceID: instanceID,
		RunID:      base.NewOptionalUInt64(runID),
	}
	rows, err := d.store.oClient.GetAll(ctx, podEventsObject)
	for _, row := range rows {
		podEventsObjectValue := &PodEventsObject{}
		podEventsObjectValue.transform(row)
		result = append(result, podEventsObjectValue)
	}

	var podEvents []*task.PodEvent
	for _, value := range result {
		podEvent := &task.PodEvent{}

		podEventsObjectValue := value.(*PodEventsObject)
		mesosTaskID := fmt.Sprintf("%s-%d-%d",
			podEventsObjectValue.JobID,
			podEventsObjectValue.InstanceID,
			base.ConvertFromOptionalToRawType(reflect.ValueOf(
				podEventsObjectValue.RunID)).(uint64))

		prevMesosTaskID := fmt.Sprintf("%s-%d-%d",
			podEventsObjectValue.JobID,
			podEventsObjectValue.InstanceID,
			podEventsObjectValue.PreviousRunID)

		desiredMesosTaskID := fmt.Sprintf("%s-%d-%d",
			podEventsObjectValue.JobID,
			podEventsObjectValue.InstanceID,
			podEventsObjectValue.DesiredRunID)

		// Set podEvent fields
		podEvent.TaskId = &mesos_v1.TaskID{
			Value: &mesosTaskID,
		}
		podEvent.PrevTaskId = &mesos_v1.TaskID{
			Value: &prevMesosTaskID,
		}
		podEvent.DesriedTaskId = &mesos_v1.TaskID{
			Value: &desiredMesosTaskID,
		}
		convertedTS, err := gocql.ParseUUID(base.ConvertFromOptionalToRawType(
			reflect.
				ValueOf(podEventsObjectValue.UpdateTime)).(string))
		if err != nil {
			d.store.metrics.OrmTaskMetrics.PodEventsGetFail.Inc(1)
			return nil, errors.Wrap(err, "Failed to parse update_time")
		}

		podEvent.Timestamp = convertedTS.Time().Format(time.RFC3339)
		podEvent.ConfigVersion = podEventsObjectValue.ConfigVersion
		podEvent.DesiredConfigVersion = podEventsObjectValue.DesiredConfigVersion

		podEvent.ActualState = podEventsObjectValue.ActualState
		podEvent.GoalState = podEventsObjectValue.GoalState
		podEvent.Message = podEventsObjectValue.Message
		podEvent.Reason = podEventsObjectValue.Reason
		podEvent.AgentID = podEventsObjectValue.AgentID
		podEvent.Hostname = podEventsObjectValue.Hostname
		podEvent.Healthy = podEventsObjectValue.Healthy

		podEvents = append(podEvents, podEvent)
	}
	d.store.metrics.OrmTaskMetrics.PodEventsGet.Inc(1)

	return podEvents, nil
}