agent/api/statechange.go (465 lines of code) (raw):

// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"). You may // not use this file except in compliance with the License. A copy of the // License is located at // // http://aws.amazon.com/apache2.0/ // // or in the "license" file accompanying this file. This file is distributed // on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either // express or implied. See the License for the specific language governing // permissions and limitations under the License. package api import ( "fmt" "strconv" "time" apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container" apitask "github.com/aws/amazon-ecs-agent/agent/api/task" "github.com/aws/amazon-ecs-agent/agent/statechange" "github.com/aws/amazon-ecs-agent/ecs-agent/api/attachment" apicontainerstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/container/status" "github.com/aws/amazon-ecs-agent/ecs-agent/api/ecs" apitaskstatus "github.com/aws/amazon-ecs-agent/ecs-agent/api/task/status" "github.com/aws/amazon-ecs-agent/ecs-agent/logger" "github.com/aws/amazon-ecs-agent/ecs-agent/logger/field" ni "github.com/aws/amazon-ecs-agent/ecs-agent/netlib/model/networkinterface" "github.com/aws/amazon-ecs-agent/ecs-agent/utils" "github.com/aws/aws-sdk-go-v2/service/ecs/types" "github.com/aws/aws-sdk-go/aws" "github.com/docker/go-connections/nat" "github.com/pkg/errors" ) const ( // ecsMaxNetworkBindingsLength is the maximum length of the ecs.NetworkBindings list sent as part of the // container state change payload. Currently, this is enforced only when containerPortRanges are requested. ecsMaxNetworkBindingsLength = 100 ) // ContainerStateChange represents a state change that needs to be sent to the // SubmitContainerStateChange API type ContainerStateChange struct { // TaskArn is the unique identifier for the task TaskArn string // RuntimeID is the dockerID of the container RuntimeID string // ContainerName is the name of the container ContainerName string // Status is the status to send Status apicontainerstatus.ContainerStatus // ImageDigest is the sha-256 digest of the container image as pulled from the repository ImageDigest string // Reason may contain details of why the container stopped Reason string // ExitCode is the exit code of the container, if available ExitCode *int // PortBindings are the details of the host ports picked for the specified // container ports PortBindings []apicontainer.PortBinding // Container is a pointer to the container involved in the state change that gives the event handler a hook into // storing what status was sent. This is used to ensure the same event is handled only once. Container *apicontainer.Container } type ManagedAgentStateChange struct { // TaskArn is the unique identifier for the task TaskArn string // Name is the name of the managed agent Name string // Container is a pointer to the container involved in the state change that gives the event handler a hook into // storing what status was sent. This is used to ensure the same event is handled only once. Container *apicontainer.Container // Status is the status of the managed agent Status apicontainerstatus.ManagedAgentStatus // Reason indicates an error in a managed agent state chage Reason string } // TaskStateChange represents a state change that needs to be sent to the // SubmitTaskStateChange API type TaskStateChange struct { // Attachment is the eni attachment object to send Attachment *ni.ENIAttachment // TaskArn is the unique identifier for the task TaskARN string // Status is the status to send Status apitaskstatus.TaskStatus // Reason may contain details of why the task stopped Reason string // Containers holds the events generated by containers owned by this task Containers []ContainerStateChange // ManagedAgents contain the name and status of Agents running inside the container ManagedAgents []ManagedAgentStateChange // PullStartedAt is the timestamp when the task start pulling PullStartedAt *time.Time // PullStoppedAt is the timestamp when the task finished pulling PullStoppedAt *time.Time // ExecutionStoppedAt is the timestamp when the essential container stopped ExecutionStoppedAt *time.Time // Task is a pointer to the task involved in the state change that gives the event handler a hook into storing // what status was sent. This is used to ensure the same event is handled only once. Task *apitask.Task } // AttachmentStateChange represents a state change that needs to be sent to the // SubmitAttachmentStateChanges API type AttachmentStateChange struct { // Attachment is the attachment object to send Attachment attachment.Attachment } type ErrShouldNotSendEvent struct { resourceId string } func (e ErrShouldNotSendEvent) Error() string { return fmt.Sprintf("should not send events for internal tasks or containers: %s", e.resourceId) } // NewTaskStateChangeEvent creates a new task state change event // returns error if the state change doesn't need to be sent to the ECS backend. func NewTaskStateChangeEvent(task *apitask.Task, reason string) (TaskStateChange, error) { var event TaskStateChange if task.IsInternal { return event, ErrShouldNotSendEvent{task.Arn} } taskKnownStatus := task.GetKnownStatus() if taskKnownStatus != apitaskstatus.TaskManifestPulled && !taskKnownStatus.BackendRecognized() { return event, errors.Errorf( "create task state change event api: status not recognized by ECS: %v", taskKnownStatus) } if task.GetSentStatus() >= taskKnownStatus { return event, errors.Errorf( "create task state change event api: status [%s] already sent", taskKnownStatus.String()) } if taskKnownStatus == apitaskstatus.TaskManifestPulled && !task.HasAContainerWithResolvedDigest() { return event, ErrShouldNotSendEvent{ fmt.Sprintf( "create task state change event api: status %s not eligible for backend reporting as"+ " no digests were resolved", apitaskstatus.TaskManifestPulled.String()), } } event = TaskStateChange{ TaskARN: task.Arn, Status: taskKnownStatus, Reason: reason, Task: task, } event.SetTaskTimestamps() return event, nil } // NewContainerStateChangeEvent creates a new container state change event // returns error if the state change doesn't need to be sent to the ECS backend. func NewContainerStateChangeEvent(task *apitask.Task, cont *apicontainer.Container, reason string) (ContainerStateChange, error) { event, err := newUncheckedContainerStateChangeEvent(task, cont, reason) if err != nil { return event, err } contKnownStatus := cont.GetKnownStatus() if contKnownStatus != apicontainerstatus.ContainerManifestPulled && !contKnownStatus.ShouldReportToBackend(cont.GetSteadyStateStatus()) { return event, ErrShouldNotSendEvent{fmt.Sprintf( "create container state change event api: status not recognized by ECS: %v", contKnownStatus)} } if contKnownStatus == apicontainerstatus.ContainerManifestPulled && !cont.DigestResolved() { // Transition to MANIFEST_PULLED state is sent to the backend only to report a resolved // image manifest digest. No need to generate an event if the digest was not resolved // which could happen due to various reasons. return event, ErrShouldNotSendEvent{fmt.Sprintf( "create container state change event api:"+ " no need to send %s event as no resolved digests were found", apicontainerstatus.ContainerManifestPulled.String())} } if cont.GetSentStatus() >= contKnownStatus { return event, ErrShouldNotSendEvent{fmt.Sprintf( "create container state change event api: status [%s] already sent for container %s, task %s", contKnownStatus.String(), cont.Name, task.Arn)} } if reason == "" && cont.ApplyingError != nil { reason = cont.ApplyingError.Error() event.Reason = reason } return event, nil } func newUncheckedContainerStateChangeEvent(task *apitask.Task, cont *apicontainer.Container, reason string) (ContainerStateChange, error) { var event ContainerStateChange if cont.IsInternal() { return event, ErrShouldNotSendEvent{cont.Name} } portBindings := cont.GetKnownPortBindings() if task.IsServiceConnectEnabled() && task.IsNetworkModeBridge() { pauseCont, err := task.GetBridgeModePauseContainerForTaskContainer(cont) if err != nil { return event, fmt.Errorf("error resolving pause container for bridge mode SC container: %s", cont.Name) } portBindings = pauseCont.GetKnownPortBindings() } contKnownStatus := cont.GetKnownStatus() event = ContainerStateChange{ TaskArn: task.Arn, ContainerName: cont.Name, RuntimeID: cont.GetRuntimeID(), Status: containerStatusChangeStatus(contKnownStatus, cont.GetSteadyStateStatus()), ExitCode: cont.GetKnownExitCode(), PortBindings: portBindings, ImageDigest: cont.GetImageDigest(), Reason: reason, Container: cont, } return event, nil } // Maps container known status to a suitable status for ContainerStateChange. // // Returns ContainerRunning if known status matches steady state status, // returns knownStatus if it is ContainerManifestPulled or ContainerStopped, // returns ContainerStatusNone for all other cases. func containerStatusChangeStatus( knownStatus apicontainerstatus.ContainerStatus, steadyStateStatus apicontainerstatus.ContainerStatus, ) apicontainerstatus.ContainerStatus { switch knownStatus { case steadyStateStatus: return apicontainerstatus.ContainerRunning case apicontainerstatus.ContainerManifestPulled: return apicontainerstatus.ContainerManifestPulled case apicontainerstatus.ContainerStopped: return apicontainerstatus.ContainerStopped default: return apicontainerstatus.ContainerStatusNone } } // NewManagedAgentChangeEvent creates a new managedAgent change event to convey managed agent state changes // returns error if the state change doesn't need to be sent to the ECS backend. func NewManagedAgentChangeEvent(task *apitask.Task, cont *apicontainer.Container, managedAgentName string, reason string) (ManagedAgentStateChange, error) { var event = ManagedAgentStateChange{} managedAgent, ok := cont.GetManagedAgentByName(managedAgentName) if !ok { return event, errors.Errorf("No ExecuteCommandAgent available in container: %v", cont.Name) } if !managedAgent.Status.ShouldReportToBackend() { return event, errors.Errorf("create managed agent state change event: status not recognized by ECS: %v", managedAgent.Status) } event = ManagedAgentStateChange{ TaskArn: task.Arn, Name: managedAgent.Name, Container: cont, Status: managedAgent.Status, Reason: reason, } return event, nil } // NewAttachmentStateChangeEvent creates a new attachment state change event func NewAttachmentStateChangeEvent(eniAttachment *ni.ENIAttachment) AttachmentStateChange { return AttachmentStateChange{ Attachment: eniAttachment, } } func (c *ContainerStateChange) ToFields() logger.Fields { return logger.Fields{ "eventType": "ContainerStateChange", "taskArn": c.TaskArn, "containerName": c.ContainerName, "containerStatus": c.Status.String(), "exitCode": strconv.Itoa(*c.ExitCode), "reason": c.Reason, "portBindings": c.PortBindings, } } // String returns a human readable string representation of this object func (c *ContainerStateChange) String() string { res := fmt.Sprintf("containerName=%s containerStatus=%s", c.ContainerName, c.Status.String()) if c.ExitCode != nil { res += " containerExitCode=" + strconv.Itoa(*c.ExitCode) } if c.Reason != "" { res += " containerReason=" + c.Reason } if len(c.PortBindings) != 0 { res += fmt.Sprintf(" containerPortBindings=%v", c.PortBindings) } if c.Container != nil { res += fmt.Sprintf(" containerKnownSentStatus=%s containerRuntimeID=%s containerIsEssential=%v", c.Container.GetSentStatus().String(), c.Container.GetRuntimeID(), c.Container.IsEssential()) } return res } // ToECSAgent converts the agent module level ContainerStateChange to ecs-agent module level ContainerStateChange. func (c *ContainerStateChange) ToECSAgent() (*ecs.ContainerStateChange, error) { pl, err := buildContainerStateChangePayload(*c) if err != nil { logger.Error("Could not convert agent container state change to ecs-agent container state change", logger.Fields{ "agentContainerStateChange": c.String(), field.Error: err, }) return nil, err } else if pl == nil { return nil, nil } return &ecs.ContainerStateChange{ TaskArn: c.TaskArn, RuntimeID: aws.StringValue(pl.RuntimeId), ContainerName: c.ContainerName, Status: c.Status, ImageDigest: aws.StringValue(pl.ImageDigest), Reason: aws.StringValue(pl.Reason), ExitCode: utils.Int32PtrToIntPtr(pl.ExitCode), NetworkBindings: pl.NetworkBindings, MetadataGetter: newContainerMetadataGetter(c.Container), }, nil } // String returns a human readable string representation of ManagedAgentStateChange func (m *ManagedAgentStateChange) String() string { res := fmt.Sprintf("containerName=%s managedAgentName=%s managedAgentStatus=%s", m.Container.Name, m.Name, m.Status.String()) if m.Reason != "" { res += " managedAgentReason=" + m.Reason } return res } // SetTaskTimestamps adds the timestamp information of task into the event // to be sent by SubmitTaskStateChange func (change *TaskStateChange) SetTaskTimestamps() { if change.Task == nil { return } // Send the task timestamp if set if timestamp := change.Task.GetPullStartedAt(); !timestamp.IsZero() { change.PullStartedAt = aws.Time(timestamp.UTC()) } if timestamp := change.Task.GetPullStoppedAt(); !timestamp.IsZero() { change.PullStoppedAt = aws.Time(timestamp.UTC()) } if timestamp := change.Task.GetExecutionStoppedAt(); !timestamp.IsZero() { change.ExecutionStoppedAt = aws.Time(timestamp.UTC()) } } func (change *TaskStateChange) ToFields() logger.Fields { fields := logger.Fields{ "eventType": "TaskStateChange", "taskArn": change.TaskARN, "taskStatus": change.Status.String(), "taskReason": change.Reason, } if change.Task != nil { fields["taskKnownSentStatus"] = change.Task.GetSentStatus().String() fields["taskPullStartedAt"] = change.Task.GetPullStartedAt().UTC().Format(time.RFC3339) fields["taskPullStoppedAt"] = change.Task.GetPullStoppedAt().UTC().Format(time.RFC3339) fields["taskExecutionStoppedAt"] = change.Task.GetExecutionStoppedAt().UTC().Format(time.RFC3339) } if change.Attachment != nil { fields["eniAttachment"] = change.Attachment.String() } for i, containerChange := range change.Containers { fields["containerChange-"+strconv.Itoa(i)] = containerChange.String() } for i, managedAgentChange := range change.ManagedAgents { fields["managedAgentChange-"+strconv.Itoa(i)] = managedAgentChange.String() } return fields } // String returns a human readable string representation of this object func (change *TaskStateChange) String() string { res := fmt.Sprintf("%s -> %s", change.TaskARN, change.Status.String()) if change.Task != nil { res += fmt.Sprintf(", Known Sent: %s, PullStartedAt: %s, PullStoppedAt: %s, ExecutionStoppedAt: %s", change.Task.GetSentStatus().String(), change.Task.GetPullStartedAt(), change.Task.GetPullStoppedAt(), change.Task.GetExecutionStoppedAt()) } if change.Attachment != nil { res += ", " + change.Attachment.String() } for _, containerChange := range change.Containers { res += ", container change: " + containerChange.String() } for _, managedAgentChange := range change.ManagedAgents { res += ", managed agent: " + managedAgentChange.String() } return res } // ToECSAgent converts the agent module level TaskStateChange to ecs-agent module level TaskStateChange. func (change *TaskStateChange) ToECSAgent() (*ecs.TaskStateChange, error) { output := &ecs.TaskStateChange{ Attachment: change.Attachment, TaskARN: change.TaskARN, Status: change.Status, Reason: change.Reason, PullStartedAt: change.PullStartedAt, PullStoppedAt: change.PullStoppedAt, ExecutionStoppedAt: change.ExecutionStoppedAt, MetadataGetter: newTaskMetadataGetter(change.Task), } for _, managedAgentEvent := range change.ManagedAgents { if mgspl := buildManagedAgentStateChangePayload(managedAgentEvent); mgspl != nil { output.ManagedAgents = append(output.ManagedAgents, *mgspl) } } containerEvents := make([]types.ContainerStateChange, len(change.Containers)) for i, containerEvent := range change.Containers { payload, err := buildContainerStateChangePayload(containerEvent) if err != nil { logger.Error("Could not convert agent task state change to ecs-agent task state change", logger.Fields{ "agentTaskStateChange": change.String(), field.Error: err, }) return nil, err } containerEvents[i] = *payload } output.Containers = containerEvents return output, nil } // String returns a human readable string representation of this object func (change *AttachmentStateChange) String() string { if change.Attachment != nil { return fmt.Sprintf("%s -> %v, %s", change.Attachment.GetAttachmentARN(), change.Attachment.GetAttachmentStatus(), change.Attachment.String()) } return "" } // ToECSAgent converts the agent module level AttachmentStateChange to ecs-agent module level AttachmentStateChange. func (change *AttachmentStateChange) ToECSAgent() *ecs.AttachmentStateChange { return &ecs.AttachmentStateChange{ Attachment: change.Attachment, } } // GetEventType returns an enum identifying the event type func (ContainerStateChange) GetEventType() statechange.EventType { return statechange.ContainerEvent } func (ms ManagedAgentStateChange) GetEventType() statechange.EventType { return statechange.ManagedAgentEvent } // GetEventType returns an enum identifying the event type func (ts TaskStateChange) GetEventType() statechange.EventType { return statechange.TaskEvent } // GetEventType returns an enum identifying the event type func (AttachmentStateChange) GetEventType() statechange.EventType { return statechange.AttachmentEvent } func buildManagedAgentStateChangePayload(change ManagedAgentStateChange) *types.ManagedAgentStateChange { if !change.Status.ShouldReportToBackend() { logger.Warn("Not submitting unsupported managed agent state", logger.Fields{ field.Status: change.Status.String(), field.ContainerName: change.Container.Name, field.TaskARN: change.TaskArn, }) return nil } return &types.ManagedAgentStateChange{ ManagedAgentName: types.ManagedAgentName(change.Name), ContainerName: aws.String(change.Container.Name), Status: aws.String(change.Status.String()), Reason: aws.String(change.Reason), } } func buildContainerStateChangePayload(change ContainerStateChange) (*types.ContainerStateChange, error) { if change.ContainerName == "" { return nil, fmt.Errorf("container state change has no container name") } statechange := types.ContainerStateChange{ ContainerName: aws.String(change.ContainerName), } if change.RuntimeID != "" { statechange.RuntimeId = aws.String(change.RuntimeID) } if change.Reason != "" { statechange.Reason = aws.String(change.Reason) } if change.ImageDigest != "" { statechange.ImageDigest = aws.String(change.ImageDigest) } // TODO: This check already exists in NewContainerStateChangeEvent and shouldn't be repeated here; remove after verifying stat := change.Status if stat != apicontainerstatus.ContainerManifestPulled && stat != apicontainerstatus.ContainerStopped && stat != apicontainerstatus.ContainerRunning { logger.Warn("Not submitting unsupported upstream container state", logger.Fields{ field.Status: stat.String(), field.ContainerName: change.ContainerName, field.TaskARN: change.TaskArn, }) return nil, nil } // TODO: This check is probably redundant as String() method never returns "DEAD"; remove after verifying if stat.String() == "DEAD" { stat = apicontainerstatus.ContainerStopped } statechange.Status = aws.String(stat.BackendStatusString()) if change.ExitCode != nil { exitCode := int32(aws.IntValue(change.ExitCode)) statechange.ExitCode = aws.Int32(exitCode) } networkBindings := getNetworkBindings(change) // we enforce a limit on the no. of network bindings for containers with at-least 1 port range requested. // this limit is enforced by ECS, and we fail early and don't call SubmitContainerStateChange. if change.Container.HasPortRange() && len(networkBindings) > ecsMaxNetworkBindingsLength { return nil, fmt.Errorf("no. of network bindings %v is more than the maximum supported no. %v, "+ "container: %s "+"task: %s", len(networkBindings), ecsMaxNetworkBindingsLength, change.ContainerName, change.TaskArn) } statechange.NetworkBindings = networkBindings return &statechange, nil } // ProtocolBindIP used to store protocol and bindIP information associated to a particular host port type ProtocolBindIP struct { protocol string bindIP string } // getNetworkBindings returns the list of networkingBindings, sent to ECS as part of the container state change payload func getNetworkBindings(change ContainerStateChange) []types.NetworkBinding { networkBindings := []types.NetworkBinding{} // hostPortToProtocolBindIPMap is a map to store protocol and bindIP information associated to host ports // that belong to a range. This is used in case when there are multiple protocol/bindIP combinations associated to a // port binding. example: when both IPv4 and IPv6 bindIPs are populated by docker. hostPortToProtocolBindIPMap := map[int64][]ProtocolBindIP{} // ContainerPortSet consists of singular ports, and ports that belong to a range, but for which we were not able to // find contiguous host ports and ask docker to pick instead. containerPortSet := change.Container.GetContainerPortSet() // each entry in the ContainerPortRangeMap implies that we found a contiguous host port range for the same containerPortRangeMap := change.Container.GetContainerPortRangeMap() for _, binding := range change.PortBindings { containerPort := int32(binding.ContainerPort) bindIP := binding.BindIP protocol := binding.Protocol.String() // create network binding for each containerPort that exists in the singular ContainerPortSet // for container ports that belong to a range, we'll have 1 consolidated network binding for the range if _, ok := containerPortSet[int(containerPort)]; ok { networkBindings = append(networkBindings, types.NetworkBinding{ BindIP: aws.String(bindIP), ContainerPort: aws.Int32(containerPort), HostPort: aws.Int32(int32(binding.HostPort)), Protocol: types.TransportProtocol(protocol), }) } else { // populate hostPortToProtocolBindIPMap – this is used below when we construct network binding for ranges. hostPort := int64(binding.HostPort) hostPortToProtocolBindIPMap[hostPort] = append(hostPortToProtocolBindIPMap[hostPort], ProtocolBindIP{ protocol: protocol, bindIP: bindIP, }) } } for containerPortRange, hostPortRange := range containerPortRangeMap { // we check for protocol and bindIP information associated to any one of the host ports from the hostPortRange, // all ports belonging to the same range share this information. hostPort, _, _ := nat.ParsePortRangeToInt(hostPortRange) if val, ok := hostPortToProtocolBindIPMap[int64(hostPort)]; ok { for _, v := range val { networkBindings = append(networkBindings, types.NetworkBinding{ BindIP: aws.String(v.bindIP), ContainerPortRange: aws.String(containerPortRange), HostPortRange: aws.String(hostPortRange), Protocol: types.TransportProtocol(v.protocol), }) } } } return networkBindings }