in src/main/java/com/amazonaws/fixengineonaws/FixEngine.java [645:718]
private void heartbeatMessageProcessingLoop(FixEngineConfig config) throws ConfigError {
LOGGER.info(MY_IP+"*****START HEARTBEAT *****");
boolean iAmClientFixEngine = "initiator".equals(config.getSessionSetting("ConnectionType"));
String kafkaBrokerString = config.getSessionSetting("kafkaBootstrapBrokerString");
String consumerGroupId = config.getSessionSetting("KafkaConsumerGroupID");
String kafkaOutboundTopicName = config.getSessionSetting("KafkaOutboundTopicName");
KAFKA_INBOUND_TOPIC_NAME = config.getSessionSetting("KafkaInboundTopicName");
String myGAEndpointGroupArn = iAmClientFixEngine ? null : config.getSessionSetting("GAEndpointGroupArn");
String myGAEndpointArn = iAmClientFixEngine ? null : config.getSessionSetting("GAEndpointArn");
boolean useJdbcConnection = "true".equals(config.getSessionSetting("UseJdbcHeartbeat"));
CallableStatement heartbeatSprocStmt = null;
if(useJdbcConnection) {
config.addSqlDbConnectionCoordinatesToSettings(config.getSessionSetting("RDSClusterSecretArn"));
heartbeatSprocStmt = getHeartbeatSprocStmt(config.getSessionSetting("JdbcDriver"), config.getSessionSetting("JdbcURL"), config.getSessionSetting("JdbcUser"), config.getSessionSetting("JdbcPassword"));
}
if(!iAmClientFixEngine) {
startFixServer(config); // to let health check know we're alive
}
while(true) {
LOGGER.info(MY_IP+"**************** HEARTBEAT: iAmClientFixEngine: " + iAmClientFixEngine + " ; IM_AM_THE_ACTIVE_ENGINE: " + IM_AM_THE_ACTIVE_ENGINE);
int leaderStatus = LEADER_STATUS_STILL_NOT_LEADER;
try {
leaderStatus = getLeaderStatus(heartbeatSprocStmt, IM_AM_THE_ACTIVE_ENGINE, useJdbcConnection);
} catch (SQLException e) {
LOGGER.severe(MY_IP+"****HEARTBEAT: ***ERROR GETTING LEADER STATUS!*** " + e);
e.printStackTrace();
LOGGER.severe(MY_IP+"****HEARTBEAT: ***RECREATING HEARTBEAT CONNECTION TO ATTEMPT TO RECOVER!***");
heartbeatSprocStmt = getHeartbeatSprocStmt(config.getSessionSetting("JdbcDriver"), config.getSessionSetting("JdbcURL"), config.getSessionSetting("JdbcUser"), config.getSessionSetting("JdbcPassword"));
}
LOGGER.info(MY_IP+"**************** HEARTBEAT: iAmClientFixEngine: " + iAmClientFixEngine + " ; IM_AM_THE_ACTIVE_ENGINE: " + IM_AM_THE_ACTIVE_ENGINE + " ; leaderStatus: " + leaderStatus);
if(leaderStatus == LEADER_STATUS_JUST_BECAME_LEADER) {
LOGGER.info(MY_IP+"****HEARTBEAT: ***I'M STILL LEADER OR JUST BECAME LEADER! ENSURING ENGINES ARE RUNNING!***");
if(KAFKA_CONSUMER == null) { KAFKA_CONSUMER = startKafkaConsumer(kafkaBrokerString, consumerGroupId, kafkaOutboundTopicName); }
if(KAFKA_PRODUCER == null) { KAFKA_PRODUCER = startKafkaProducer(kafkaBrokerString); }
if(iAmClientFixEngine) {
startFixClient(config);
} else {
LOGGER.info(MY_IP+"**************** HEARTBEAT: I AM Server ENGINE***********");
startFixServer(config);
if("10.130.0.66".equals(MY_IP)) {
LOGGER.severe(MY_IP+"**************** HEARTBEAT: NOT UPDATING GLOBAL ACCELERATOR ENDPOINT BECAUSE WE DONT HAVE ACCESS FROM THIS MACHINE!!!***********");
} else {
updateGAEndpoints(myGAEndpointGroupArn, myGAEndpointArn);
}
}
IM_AM_THE_ACTIVE_ENGINE = true;
} else if(leaderStatus == LEADER_STATUS_STILL_LEADER) { // Disconnect if connected
LOGGER.info(MY_IP+"****HEARTBEAT: ***STILL LEADER! Keep listening!***");
} else if(leaderStatus == LEADER_STATUS_STILL_NOT_LEADER) { // Disconnect if connected
LOGGER.info(MY_IP+"****HEARTBEAT: ***STILL NOT LEADER!***");
KAFKA_CONSUMER = null;
KAFKA_PRODUCER = null;
stopFixClient();
// } else if(leaderStatus == LEADER_STATUS_JUST_BECAME_LEADER) { // Connect!
// LOGGER.info(MY_IP+"****HEARTBEAT: ***I JUST BECAME THE LEADER!***");
// startEngine();
}
processInboundKafkaMsgs(KAFKA_CONSUMER);
if(HEARTBEAT_SLEEP_INTERVAL > 0) {
try {
Thread.sleep(HEARTBEAT_SLEEP_INTERVAL);
} catch (InterruptedException ie) {
LOGGER.severe(MY_IP+"HEARTBEAT THREAD INTERRUPTED: " +ie);
}
}
}
}