private void heartbeatMessageProcessingLoop()

in src/main/java/com/amazonaws/fixengineonaws/FixEngine.java [645:718]


    private void heartbeatMessageProcessingLoop(FixEngineConfig config) throws ConfigError {
        LOGGER.info(MY_IP+"*****START HEARTBEAT *****");
    	boolean iAmClientFixEngine = "initiator".equals(config.getSessionSetting("ConnectionType"));
    	String kafkaBrokerString = config.getSessionSetting("kafkaBootstrapBrokerString");
    	String consumerGroupId = config.getSessionSetting("KafkaConsumerGroupID");
    	String kafkaOutboundTopicName = config.getSessionSetting("KafkaOutboundTopicName");
    	KAFKA_INBOUND_TOPIC_NAME = config.getSessionSetting("KafkaInboundTopicName");
    	String myGAEndpointGroupArn = iAmClientFixEngine ? null : config.getSessionSetting("GAEndpointGroupArn");
    	String myGAEndpointArn = iAmClientFixEngine ? null : config.getSessionSetting("GAEndpointArn");
    	boolean useJdbcConnection = "true".equals(config.getSessionSetting("UseJdbcHeartbeat"));

    	CallableStatement heartbeatSprocStmt = null;
        if(useJdbcConnection) {
        	config.addSqlDbConnectionCoordinatesToSettings(config.getSessionSetting("RDSClusterSecretArn"));
            heartbeatSprocStmt = getHeartbeatSprocStmt(config.getSessionSetting("JdbcDriver"), config.getSessionSetting("JdbcURL"), config.getSessionSetting("JdbcUser"), config.getSessionSetting("JdbcPassword"));
        }

        if(!iAmClientFixEngine) {
        	startFixServer(config); // to let health check know we're alive
        }

        while(true) { 
			LOGGER.info(MY_IP+"**************** HEARTBEAT: iAmClientFixEngine: " + iAmClientFixEngine + " ; IM_AM_THE_ACTIVE_ENGINE: " + IM_AM_THE_ACTIVE_ENGINE);
			int leaderStatus = LEADER_STATUS_STILL_NOT_LEADER;
			try {
				leaderStatus = getLeaderStatus(heartbeatSprocStmt, IM_AM_THE_ACTIVE_ENGINE, useJdbcConnection);
			} catch (SQLException e) {
				
    			LOGGER.severe(MY_IP+"****HEARTBEAT: ***ERROR GETTING LEADER STATUS!*** " + e);
    			e.printStackTrace();
    			LOGGER.severe(MY_IP+"****HEARTBEAT: ***RECREATING HEARTBEAT CONNECTION TO ATTEMPT TO RECOVER!***");
                heartbeatSprocStmt = getHeartbeatSprocStmt(config.getSessionSetting("JdbcDriver"), config.getSessionSetting("JdbcURL"), config.getSessionSetting("JdbcUser"), config.getSessionSetting("JdbcPassword"));
			}
			LOGGER.info(MY_IP+"**************** HEARTBEAT: iAmClientFixEngine: " + iAmClientFixEngine + " ; IM_AM_THE_ACTIVE_ENGINE: " + IM_AM_THE_ACTIVE_ENGINE + " ; leaderStatus: " + leaderStatus);
    		
    	    if(leaderStatus == LEADER_STATUS_JUST_BECAME_LEADER) {
    			LOGGER.info(MY_IP+"****HEARTBEAT: ***I'M STILL LEADER OR JUST BECAME LEADER! ENSURING ENGINES ARE RUNNING!***");
    	        if(KAFKA_CONSUMER == null) { KAFKA_CONSUMER = startKafkaConsumer(kafkaBrokerString, consumerGroupId, kafkaOutboundTopicName); }
    	        if(KAFKA_PRODUCER == null) { KAFKA_PRODUCER = startKafkaProducer(kafkaBrokerString); }
    	        if(iAmClientFixEngine) {
    	            startFixClient(config);
    	        } else {
    	            LOGGER.info(MY_IP+"**************** HEARTBEAT: I AM Server ENGINE***********");
    	            startFixServer(config);
    	            if("10.130.0.66".equals(MY_IP)) {
    	            	LOGGER.severe(MY_IP+"**************** HEARTBEAT: NOT UPDATING GLOBAL ACCELERATOR ENDPOINT BECAUSE WE DONT HAVE ACCESS FROM THIS MACHINE!!!***********");
    	            } else {
    	            	updateGAEndpoints(myGAEndpointGroupArn, myGAEndpointArn);
    	            }
    	        }
    	        IM_AM_THE_ACTIVE_ENGINE = true;
    	    } else if(leaderStatus == LEADER_STATUS_STILL_LEADER) { // Disconnect if connected
    			LOGGER.info(MY_IP+"****HEARTBEAT: ***STILL LEADER! Keep listening!***");
    	    } else if(leaderStatus == LEADER_STATUS_STILL_NOT_LEADER) { // Disconnect if connected
    			LOGGER.info(MY_IP+"****HEARTBEAT: ***STILL NOT LEADER!***");
    			KAFKA_CONSUMER = null;
    			KAFKA_PRODUCER = null;
    			stopFixClient();
//    	    } else if(leaderStatus == LEADER_STATUS_JUST_BECAME_LEADER) { // Connect!
//    			LOGGER.info(MY_IP+"****HEARTBEAT: ***I JUST BECAME THE LEADER!***");
//    	    	startEngine();
    	    }

    	    processInboundKafkaMsgs(KAFKA_CONSUMER);

    	    if(HEARTBEAT_SLEEP_INTERVAL > 0) {
	            try {
	                Thread.sleep(HEARTBEAT_SLEEP_INTERVAL);
	            } catch (InterruptedException ie) {
	                LOGGER.severe(MY_IP+"HEARTBEAT THREAD INTERRUPTED: " +ie);
	            }
    	    }
        }
    }