private void processInboundKafkaMsgs()

in src/main/java/com/amazonaws/fixengineonaws/FixEngine.java [365:411]


    private void processInboundKafkaMsgs(KafkaConsumer<String, Object> kafkaConsumer) {
        LOGGER.info(MY_IP+"****PROCESS KAFKA MSGS: ************* after calling getKafkaConsumer ");
        int count = 0;
        if(IM_AM_THE_ACTIVE_ENGINE && FIX_SESSION_ID != null) {
            //Test code
   //       NewOrderSingle newOrder = new NewOrderSingle(new ClOrdID("12345"), new HandlInst('1'), new Symbol("6758.T"), new Side(Side.BUY), new TransactTime(), new OrdType(OrdType.MARKET));
            // try {
            //     FIX_OUTBOUND_SESSION.sendToTarget(newOrder, FIX_OUTBOUND_SESSION_ID);
            //     Thread.sleep(5000);
            // } catch (Exception e) {
            //     e.printStackTrace();
            // }
            //Test COde 
            // Poll for records
        	
            ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(50));
            long timeOutKafkaPollEnd = System.currentTimeMillis();
        	if(timeOutKafkaPollEnd - lastStatsLogTime > logStatsEvery) {
        		LOGGER.info(MY_IP+"@@@@@@@@@@ OUTBOUND TIMING STATISTICS: RESETTING TOTALS SINCE IT'S BEEN OVER A MINUTE SINCE THE LAST MESSAGE");
        		totalOutboundFixProcessingTime = 0;
        		totalOutboundMessageProcessingTime= 0;
        	}

            //LOGGER.fine(MY_IP+" After polling consumer records.count() : " + records.count());
            // Did we get any?
            if (records.count() == 0) {
                // timeout/nothing to read
	                LOGGER.info(MY_IP+"****PROCESS KAFKA MSGS: nothing to read from Kafka");
            } else {
	                LOGGER.info(MY_IP+"****PROCESS KAFKA MSGS: got some messages from Kafka");
                // Yes, loop over records
                // for(ConsumerRecord<String, String> record: records) {
                for(ConsumerRecord<String, Object> record: records) {
                    // Display record and count
                    count += 1;
                    LOGGER.fine(MY_IP+ count + ": " + record.value());
                    long timeOutKafkaMessageProcessStart = System.currentTimeMillis();
                    processOneInboundKafkaMessage(record);
                    long timeOutKafkaMessageProcessEnd = System.currentTimeMillis();

                    totalOutboundFixProcessingTime += timeOutKafkaMessageProcessEnd - timeOutKafkaMessageProcessStart;
                    totalOutboundMessageProcessingTime += timeOutKafkaMessageProcessEnd - timeOutKafkaPollEnd;
                    LOGGER.info(MY_IP+"@@@@@@@@@@ OUTBOUND TIMING STATISTICS:\tmessageCount:\t" + count + "\ttotalOutboundFixProcessingTime:\t" + totalOutboundFixProcessingTime + "\ttotalOutboundMessageProcessingTime:\t" + totalOutboundMessageProcessingTime);                
                }
            }
        }
    }