in src/main/java/com/amazonaws/fixengineonaws/FixEngine.java [365:411]
private void processInboundKafkaMsgs(KafkaConsumer<String, Object> kafkaConsumer) {
LOGGER.info(MY_IP+"****PROCESS KAFKA MSGS: ************* after calling getKafkaConsumer ");
int count = 0;
if(IM_AM_THE_ACTIVE_ENGINE && FIX_SESSION_ID != null) {
//Test code
// NewOrderSingle newOrder = new NewOrderSingle(new ClOrdID("12345"), new HandlInst('1'), new Symbol("6758.T"), new Side(Side.BUY), new TransactTime(), new OrdType(OrdType.MARKET));
// try {
// FIX_OUTBOUND_SESSION.sendToTarget(newOrder, FIX_OUTBOUND_SESSION_ID);
// Thread.sleep(5000);
// } catch (Exception e) {
// e.printStackTrace();
// }
//Test COde
// Poll for records
ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(50));
long timeOutKafkaPollEnd = System.currentTimeMillis();
if(timeOutKafkaPollEnd - lastStatsLogTime > logStatsEvery) {
LOGGER.info(MY_IP+"@@@@@@@@@@ OUTBOUND TIMING STATISTICS: RESETTING TOTALS SINCE IT'S BEEN OVER A MINUTE SINCE THE LAST MESSAGE");
totalOutboundFixProcessingTime = 0;
totalOutboundMessageProcessingTime= 0;
}
//LOGGER.fine(MY_IP+" After polling consumer records.count() : " + records.count());
// Did we get any?
if (records.count() == 0) {
// timeout/nothing to read
LOGGER.info(MY_IP+"****PROCESS KAFKA MSGS: nothing to read from Kafka");
} else {
LOGGER.info(MY_IP+"****PROCESS KAFKA MSGS: got some messages from Kafka");
// Yes, loop over records
// for(ConsumerRecord<String, String> record: records) {
for(ConsumerRecord<String, Object> record: records) {
// Display record and count
count += 1;
LOGGER.fine(MY_IP+ count + ": " + record.value());
long timeOutKafkaMessageProcessStart = System.currentTimeMillis();
processOneInboundKafkaMessage(record);
long timeOutKafkaMessageProcessEnd = System.currentTimeMillis();
totalOutboundFixProcessingTime += timeOutKafkaMessageProcessEnd - timeOutKafkaMessageProcessStart;
totalOutboundMessageProcessingTime += timeOutKafkaMessageProcessEnd - timeOutKafkaPollEnd;
LOGGER.info(MY_IP+"@@@@@@@@@@ OUTBOUND TIMING STATISTICS:\tmessageCount:\t" + count + "\ttotalOutboundFixProcessingTime:\t" + totalOutboundFixProcessingTime + "\ttotalOutboundMessageProcessingTime:\t" + totalOutboundMessageProcessingTime);
}
}
}
}