in agent/acs/handler/acs_handler.go [280:407]
func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
cfg := acsSession.agentConfig
refreshCredsHandler := newRefreshCredentialsHandler(acsSession.ctx, cfg.Cluster, acsSession.containerInstanceARN,
client, acsSession.credentialsManager, acsSession.taskEngine)
defer refreshCredsHandler.clearAcks()
refreshCredsHandler.start()
defer refreshCredsHandler.stop()
client.AddRequestHandler(refreshCredsHandler.handlerFunc())
// Add handler to ack task ENI attach message
eniAttachHandler := newAttachTaskENIHandler(
acsSession.ctx,
cfg.Cluster,
acsSession.containerInstanceARN,
client,
acsSession.state,
acsSession.dataClient,
)
eniAttachHandler.start()
defer eniAttachHandler.stop()
client.AddRequestHandler(eniAttachHandler.handlerFunc())
// Add handler to ack instance ENI attach message
instanceENIAttachHandler := newAttachInstanceENIHandler(
acsSession.ctx,
cfg.Cluster,
acsSession.containerInstanceARN,
client,
acsSession.state,
acsSession.dataClient,
)
instanceENIAttachHandler.start()
defer instanceENIAttachHandler.stop()
client.AddRequestHandler(instanceENIAttachHandler.handlerFunc())
// Add TaskManifestHandler
taskManifestHandler := newTaskManifestHandler(acsSession.ctx, cfg.Cluster, acsSession.containerInstanceARN,
client, acsSession.dataClient, acsSession.taskEngine, acsSession.latestSeqNumTaskManifest)
defer taskManifestHandler.clearAcks()
taskManifestHandler.start()
defer taskManifestHandler.stop()
client.AddRequestHandler(taskManifestHandler.handlerFuncTaskManifestMessage())
client.AddRequestHandler(taskManifestHandler.handlerFuncTaskStopVerificationMessage())
// Add request handler for handling payload messages from ACS
payloadHandler := newPayloadRequestHandler(
acsSession.ctx,
acsSession.taskEngine,
acsSession.ecsClient,
cfg.Cluster,
acsSession.containerInstanceARN,
client,
acsSession.dataClient,
refreshCredsHandler,
acsSession.credentialsManager,
acsSession.taskHandler, acsSession.latestSeqNumTaskManifest)
// Clear the acks channel on return because acks of messageids don't have any value across sessions
defer payloadHandler.clearAcks()
payloadHandler.start()
defer payloadHandler.stop()
client.AddRequestHandler(payloadHandler.handlerFunc())
heartbeatHandler := newHeartbeatHandler(acsSession.ctx, client, acsSession.doctor)
defer heartbeatHandler.clearAcks()
heartbeatHandler.start()
defer heartbeatHandler.stop()
client.AddRequestHandler(heartbeatHandler.handlerFunc())
updater.AddAgentUpdateHandlers(client, cfg, acsSession.state, acsSession.dataClient, acsSession.taskEngine)
err := client.Connect()
if err != nil {
seelog.Errorf("Error connecting to ACS: %v", err)
return err
}
seelog.Info("Connected to ACS endpoint")
// Start inactivity timer for closing the connection
timer := newDisconnectionTimer(client, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter())
// Any message from the server resets the disconnect timeout
client.SetAnyRequestHandler(anyMessageHandler(timer, client))
defer timer.Stop()
acsSession.resources.connectedToACS()
backoffResetTimer := time.AfterFunc(
retry.AddJitter(acsSession.heartbeatTimeout(), acsSession.heartbeatJitter()), func() {
// If we do not have an error connecting and remain connected for at
// least 1 or so minutes, reset the backoff. This prevents disconnect
// errors that only happen infrequently from damaging the reconnect
// delay as significantly.
acsSession.backoff.Reset()
})
defer backoffResetTimer.Stop()
serveErr := make(chan error, 1)
go func() {
serveErr <- client.Serve()
}()
for {
select {
case <-acsSession.ctx.Done():
// Stop receiving and sending messages from and to ACS when
// the context received from the main function is canceled
seelog.Infof("ACS session exited cleanly.")
return acsSession.ctx.Err()
case err := <-serveErr:
// Stop receiving and sending messages from and to ACS when
// client.Serve returns an error. This can happen when the
// the connection is closed by ACS or the agent
if err == nil || err == io.EOF {
seelog.Info("ACS Websocket connection closed for a valid reason")
} else {
seelog.Errorf("Error: lost websocket connection with Agent Communication Service (ACS): %v", err)
}
return err
}
}
}