in src/main/java/com/amazonaws/fixengineonaws/FixEngine.java [177:215]
public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
LOGGER.info(MY_IP+"%%%%%%%% FROMAPP: " + message);
long timeInFromAppStart = System.currentTimeMillis();
if(DROP_FIX_MESSAGES) {
LOGGER.severe(MY_IP+"%%%%%%%% FROMAPP: DROPPING MESSAGE INSTEAD OF SENDING IT!");
} else {
if (!IM_AM_THE_ACTIVE_ENGINE) {
LOGGER.fine(MY_IP+"%%%%%%%% FROMAPP: NOT ACTIVE ENGINE, DO Nothing" );
}
LOGGER.fine(MY_IP+"********************** counter: " + messageCounter++);
String parsedOrdStr = message.toString();
LOGGER.fine(MY_IP+"%%%%%%%% FROMAPP: ***SERVER FIX ENGINE*** PARSED ORDER FIX STRING: " + parsedOrdStr);
// Object[] array = getKafkaProducer();
// KafkaProducer<String, String> producer = (KafkaProducer) array[0];
// String topicName = (String )array[1];
try {
long timeInFromAppKafkaSendStart = System.currentTimeMillis();
if(timeInFromAppKafkaSendStart - lastStatsLogTime > logStatsEvery) {
LOGGER.info(MY_IP+"@@@@@@@@@@ INBOUND TIMING STATISTICS: RESETTING TOTALS SINCE IT'S BEEN OVER A MINUTE SINCE THE LAST MESSAGE");
totalInboundKafkaProcessingTime = 0;
totalInboundMessageProcessingTime = 0;
}
KAFKA_PRODUCER.send(new ProducerRecord<String, String>(KAFKA_INBOUND_TOPIC_NAME, parsedOrdStr)).get();
long timeInFromAppKafkaSendEnd = System.currentTimeMillis();
totalInboundKafkaProcessingTime += timeInFromAppKafkaSendEnd - timeInFromAppKafkaSendStart;
totalInboundMessageProcessingTime += timeInFromAppKafkaSendEnd - timeInFromAppStart;
LOGGER.info(MY_IP+"@@@@@@@@@@ INBOUND TIMING STATISTICS:\ttotalInboundKafkaProcessingTime:\t" + totalInboundKafkaProcessingTime + "\ttotalInboundMessageProcessingTime:\t" + totalInboundMessageProcessingTime);
} catch (Exception e) {
LOGGER.severe(MY_IP+"%%%%%%%% FROMAPP: Exception:" + e);
e.printStackTrace();
}
}
}