in internal/pkg/stream/ecs.go [134:234]
func (s *ECSDeploymentStreamer) Fetch() (next time.Time, done bool, err error) {
out, err := s.client.Service(s.cluster, s.service)
if err != nil {
if request.IsErrorThrottle(err) {
s.ecsRetries += 1
return nextFetchDate(s.clock, s.rand, s.ecsRetries), false, nil
}
return next, false, fmt.Errorf("fetch service description: %w", err)
}
s.ecsRetries = 0
var deployments []ECSDeployment
var primaryDeploymentId string
for _, deployment := range out.Deployments {
status := aws.StringValue(deployment.Status)
desiredCount, runningCount := aws.Int64Value(deployment.DesiredCount), aws.Int64Value(deployment.RunningCount)
rollingDeploy := ECSDeployment{
Status: status,
TaskDefRevision: parseRevisionFromTaskDefARN(aws.StringValue(deployment.TaskDefinition)),
DesiredCount: int(desiredCount),
RunningCount: int(runningCount),
FailedCount: int(aws.Int64Value(deployment.FailedTasks)),
PendingCount: int(aws.Int64Value(deployment.PendingCount)),
RolloutState: aws.StringValue(deployment.RolloutState),
CreatedAt: aws.TimeValue(deployment.CreatedAt),
UpdatedAt: aws.TimeValue(deployment.UpdatedAt),
Id: aws.StringValue(deployment.Id),
}
deployments = append(deployments, rollingDeploy)
if isDeploymentDone(rollingDeploy, s.deploymentCreationTime) {
done = true
}
if rollingDeploy.isPrimary() {
primaryDeploymentId = rollingDeploy.Id
}
}
stoppedSvcTasks, err := s.client.StoppedServiceTasks(s.cluster, s.service)
if err != nil {
if request.IsErrorThrottle(err) {
s.ecsRetries += 1
return nextFetchDate(s.clock, s.rand, s.ecsRetries), false, nil
}
return next, false, fmt.Errorf("fetch stopped tasks: %w", err)
}
s.ecsRetries = 0
var stoppedTasks []ecs.Task
for _, st := range stoppedSvcTasks {
if stoppingAt := aws.TimeValue(st.StoppingAt); aws.StringValue(st.StartedBy) != primaryDeploymentId || stoppingAt.Before(s.deploymentCreationTime) ||
(strings.Contains(aws.StringValue(st.StoppedReason), ecsScalingActivity)) {
continue
}
stoppedTasks = append(stoppedTasks, ecs.Task{
TaskArn: st.TaskArn,
DesiredStatus: st.DesiredStatus,
LastStatus: st.LastStatus,
StoppedReason: st.StoppedReason,
StoppingAt: st.StoppingAt,
})
}
sort.SliceStable(stoppedTasks, func(i, j int) bool {
return aws.TimeValue(stoppedTasks[i].StoppingAt).After(aws.TimeValue(stoppedTasks[j].StoppingAt))
})
var failureMsgs []string
for _, event := range out.Events {
if createdAt := aws.TimeValue(event.CreatedAt); createdAt.Before(s.deploymentCreationTime) {
break
}
id := aws.StringValue(event.Id)
if _, ok := s.pastEventIDs[id]; ok {
break
}
if msg := aws.StringValue(event.Message); isFailureServiceEvent(msg) {
failureMsgs = append(failureMsgs, msg)
}
s.pastEventIDs[id] = true
}
var alarms []cloudwatch.AlarmStatus
if out.DeploymentConfiguration != nil && out.DeploymentConfiguration.Alarms != nil && aws.BoolValue(out.DeploymentConfiguration.Alarms.Enable) {
alarmNames := aws.StringValueSlice(out.DeploymentConfiguration.Alarms.AlarmNames)
alarms, err = s.cw.AlarmStatuses(cloudwatch.WithNames(alarmNames))
if err != nil {
if request.IsErrorThrottle(err) {
s.cwRetries += 1
return nextFetchDate(s.clock, s.rand, s.cwRetries), false, nil
}
return next, false, fmt.Errorf("retrieve alarm statuses: %w", err)
}
s.cwRetries = 0
}
s.eventsToFlush = append(s.eventsToFlush, ECSService{
Deployments: deployments,
LatestFailureEvents: failureMsgs,
Alarms: alarms,
StoppedTasks: stoppedTasks,
})
return nextFetchDate(s.clock, s.rand, 0), done, nil
}