func()

in internal/pkg/stream/ecs.go [134:234]


func (s *ECSDeploymentStreamer) Fetch() (next time.Time, done bool, err error) {
	out, err := s.client.Service(s.cluster, s.service)
	if err != nil {
		if request.IsErrorThrottle(err) {
			s.ecsRetries += 1
			return nextFetchDate(s.clock, s.rand, s.ecsRetries), false, nil
		}
		return next, false, fmt.Errorf("fetch service description: %w", err)
	}
	s.ecsRetries = 0

	var deployments []ECSDeployment
	var primaryDeploymentId string
	for _, deployment := range out.Deployments {
		status := aws.StringValue(deployment.Status)
		desiredCount, runningCount := aws.Int64Value(deployment.DesiredCount), aws.Int64Value(deployment.RunningCount)
		rollingDeploy := ECSDeployment{
			Status:          status,
			TaskDefRevision: parseRevisionFromTaskDefARN(aws.StringValue(deployment.TaskDefinition)),
			DesiredCount:    int(desiredCount),
			RunningCount:    int(runningCount),
			FailedCount:     int(aws.Int64Value(deployment.FailedTasks)),
			PendingCount:    int(aws.Int64Value(deployment.PendingCount)),
			RolloutState:    aws.StringValue(deployment.RolloutState),
			CreatedAt:       aws.TimeValue(deployment.CreatedAt),
			UpdatedAt:       aws.TimeValue(deployment.UpdatedAt),
			Id:              aws.StringValue(deployment.Id),
		}
		deployments = append(deployments, rollingDeploy)
		if isDeploymentDone(rollingDeploy, s.deploymentCreationTime) {
			done = true
		}
		if rollingDeploy.isPrimary() {
			primaryDeploymentId = rollingDeploy.Id
		}
	}
	stoppedSvcTasks, err := s.client.StoppedServiceTasks(s.cluster, s.service)
	if err != nil {
		if request.IsErrorThrottle(err) {
			s.ecsRetries += 1
			return nextFetchDate(s.clock, s.rand, s.ecsRetries), false, nil
		}
		return next, false, fmt.Errorf("fetch stopped tasks: %w", err)
	}
	s.ecsRetries = 0

	var stoppedTasks []ecs.Task
	for _, st := range stoppedSvcTasks {
		if stoppingAt := aws.TimeValue(st.StoppingAt); aws.StringValue(st.StartedBy) != primaryDeploymentId || stoppingAt.Before(s.deploymentCreationTime) ||
			(strings.Contains(aws.StringValue(st.StoppedReason), ecsScalingActivity)) {
			continue
		}
		stoppedTasks = append(stoppedTasks, ecs.Task{
			TaskArn:       st.TaskArn,
			DesiredStatus: st.DesiredStatus,
			LastStatus:    st.LastStatus,
			StoppedReason: st.StoppedReason,
			StoppingAt:    st.StoppingAt,
		})
	}
	sort.SliceStable(stoppedTasks, func(i, j int) bool {
		return aws.TimeValue(stoppedTasks[i].StoppingAt).After(aws.TimeValue(stoppedTasks[j].StoppingAt))
	})

	var failureMsgs []string
	for _, event := range out.Events {
		if createdAt := aws.TimeValue(event.CreatedAt); createdAt.Before(s.deploymentCreationTime) {
			break
		}
		id := aws.StringValue(event.Id)
		if _, ok := s.pastEventIDs[id]; ok {
			break
		}
		if msg := aws.StringValue(event.Message); isFailureServiceEvent(msg) {
			failureMsgs = append(failureMsgs, msg)
		}
		s.pastEventIDs[id] = true
	}

	var alarms []cloudwatch.AlarmStatus
	if out.DeploymentConfiguration != nil && out.DeploymentConfiguration.Alarms != nil && aws.BoolValue(out.DeploymentConfiguration.Alarms.Enable) {
		alarmNames := aws.StringValueSlice(out.DeploymentConfiguration.Alarms.AlarmNames)
		alarms, err = s.cw.AlarmStatuses(cloudwatch.WithNames(alarmNames))
		if err != nil {
			if request.IsErrorThrottle(err) {
				s.cwRetries += 1
				return nextFetchDate(s.clock, s.rand, s.cwRetries), false, nil
			}
			return next, false, fmt.Errorf("retrieve alarm statuses: %w", err)
		}
		s.cwRetries = 0
	}

	s.eventsToFlush = append(s.eventsToFlush, ECSService{
		Deployments:         deployments,
		LatestFailureEvents: failureMsgs,
		Alarms:              alarms,
		StoppedTasks:        stoppedTasks,
	})
	return nextFetchDate(s.clock, s.rand, 0), done, nil
}