func()

in agent/acs/handler/acs_handler.go [280:407]


func (acsSession *session) startACSSession(client wsclient.ClientServer) error {
	cfg := acsSession.agentConfig

	refreshCredsHandler := newRefreshCredentialsHandler(acsSession.ctx, cfg.Cluster, acsSession.containerInstanceARN,
		client, acsSession.credentialsManager, acsSession.taskEngine)
	defer refreshCredsHandler.clearAcks()
	refreshCredsHandler.start()
	defer refreshCredsHandler.stop()

	client.AddRequestHandler(refreshCredsHandler.handlerFunc())

	// Add handler to ack task ENI attach message
	eniAttachHandler := newAttachTaskENIHandler(
		acsSession.ctx,
		cfg.Cluster,
		acsSession.containerInstanceARN,
		client,
		acsSession.state,
		acsSession.dataClient,
	)
	eniAttachHandler.start()
	defer eniAttachHandler.stop()

	client.AddRequestHandler(eniAttachHandler.handlerFunc())

	// Add handler to ack instance ENI attach message
	instanceENIAttachHandler := newAttachInstanceENIHandler(
		acsSession.ctx,
		cfg.Cluster,
		acsSession.containerInstanceARN,
		client,
		acsSession.state,
		acsSession.dataClient,
	)
	instanceENIAttachHandler.start()
	defer instanceENIAttachHandler.stop()

	client.AddRequestHandler(instanceENIAttachHandler.handlerFunc())

	// Add TaskManifestHandler
	taskManifestHandler := newTaskManifestHandler(acsSession.ctx, cfg.Cluster, acsSession.containerInstanceARN,
		client, acsSession.dataClient, acsSession.taskEngine, acsSession.latestSeqNumTaskManifest)

	defer taskManifestHandler.clearAcks()
	taskManifestHandler.start()
	defer taskManifestHandler.stop()

	client.AddRequestHandler(taskManifestHandler.handlerFuncTaskManifestMessage())
	client.AddRequestHandler(taskManifestHandler.handlerFuncTaskStopVerificationMessage())

	// Add request handler for handling payload messages from ACS
	payloadHandler := newPayloadRequestHandler(
		acsSession.ctx,
		acsSession.taskEngine,
		acsSession.ecsClient,
		cfg.Cluster,
		acsSession.containerInstanceARN,
		client,
		acsSession.dataClient,
		refreshCredsHandler,
		acsSession.credentialsManager,
		acsSession.taskHandler, acsSession.latestSeqNumTaskManifest)
	// Clear the acks channel on return because acks of messageids don't have any value across sessions
	defer payloadHandler.clearAcks()
	payloadHandler.start()
	defer payloadHandler.stop()

	client.AddRequestHandler(payloadHandler.handlerFunc())

	heartbeatHandler := newHeartbeatHandler(acsSession.ctx, client, acsSession.doctor)
	defer heartbeatHandler.clearAcks()
	heartbeatHandler.start()
	defer heartbeatHandler.stop()

	client.AddRequestHandler(heartbeatHandler.handlerFunc())

	updater.AddAgentUpdateHandlers(client, cfg, acsSession.state, acsSession.dataClient, acsSession.taskEngine)

	err := client.Connect()
	if err != nil {
		seelog.Errorf("Error connecting to ACS: %v", err)
		return err
	}

	seelog.Info("Connected to ACS endpoint")
	// Start inactivity timer for closing the connection
	timer := newDisconnectionTimer(client, acsSession.heartbeatTimeout(), acsSession.heartbeatJitter())
	// Any message from the server resets the disconnect timeout
	client.SetAnyRequestHandler(anyMessageHandler(timer, client))
	defer timer.Stop()

	acsSession.resources.connectedToACS()

	backoffResetTimer := time.AfterFunc(
		retry.AddJitter(acsSession.heartbeatTimeout(), acsSession.heartbeatJitter()), func() {
			// If we do not have an error connecting and remain connected for at
			// least 1 or so minutes, reset the backoff. This prevents disconnect
			// errors that only happen infrequently from damaging the reconnect
			// delay as significantly.
			acsSession.backoff.Reset()
		})
	defer backoffResetTimer.Stop()

	serveErr := make(chan error, 1)
	go func() {
		serveErr <- client.Serve()
	}()

	for {
		select {
		case <-acsSession.ctx.Done():
			// Stop receiving and sending messages from and to ACS when
			// the context received from the main function is canceled
			seelog.Infof("ACS session exited cleanly.")
			return acsSession.ctx.Err()
		case err := <-serveErr:
			// Stop receiving and sending messages from and to ACS when
			// client.Serve returns an error. This can happen when the
			// the connection is closed by ACS or the agent
			if err == nil || err == io.EOF {
				seelog.Info("ACS Websocket connection closed for a valid reason")
			} else {
				seelog.Errorf("Error: lost websocket connection with Agent Communication Service (ACS): %v", err)
			}
			return err
		}
	}
}