private static void processKafkaMsgs()

in src/main/java/com/amazonaws/fixengineonaws/TestClient.java [296:375]


    private static void processKafkaMsgs() {
        LOGGER.fine("****processKafkaMsgs: Start ");
        // Loop until ctrl + c
        // Harman: create a thread
        int count = 0;
        boolean firstTime = true;
        int x = 1;
        //Date lastTime = null;
        long totalTimeInSec = 0;
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        Date firstOrderRcvdTime = null;
        
        while(x ==1) {

            // Poll for records
    	    ConsumerRecords<String, Object> records = KAFKA_CONSUMER.poll(Duration.ofMillis(50));
            //LOGGER.fine(" After polling consumer records.count() : " + records.count());
            // Did we get any?
            if (records.count() == 0) {
                // timeout/nothing to read
                LOGGER.info("nothing to read from Kafka");
                
                if (count > 0) {
                     Date currentTime = new Date();
                     totalTimeInSec = (currentTime.getTime() - lastOrderRcvdTime.getTime())/1000;
                     
                     if (totalTimeInSec > 20) { 
                        x = 0;
                        totalTimeInSec = (lastOrderRcvdTime.getTime() - firstOrderRcvdTime.getTime() )/1000 ;
        
                        if (totalTimeInSec < 1) totalTimeInSec = 1;
                        
                        double tps = count/totalTimeInSec;
                        
                        LOGGER.info(" ************ Order Received Performance & Througput Results ******************* ");
                        LOGGER.info("\n Start Time: " + sdf.format(firstOrderRcvdTime) + 
                                    "\n End Time: " + sdf.format(lastOrderRcvdTime) + "\n Total Messages Processed: " + count 
                                    + "\n Total Processing Time (seconds) " + totalTimeInSec + "\n TPS: " + tps);
                        	
                        LOGGER.info(" ************ ************ ************ ************ ************");	
                    }
                }
                
            } else {
                
                // Yes, loop over records
                // for(ConsumerRecord<String, String> record: records) {
                for(ConsumerRecord<String, Object> record: records) {
                    // Display record and count
                    if (count == 0) firstOrderRcvdTime = new Date();
                    count += 1;
                    LOGGER.fine( count + ": " + record.value());
                    String ordStr = record.value().toString();
                    LOGGER.info("*********** ORDER RCVD from Client or Server *****************************************************************************************");
                    LOGGER.info("*** processKafkaMsgs() ordStr : " + ordStr);
                    
                    // LOGGER.info("processInboundKafkaMsgs() I_AM_TEST_CLIENT : " + I_AM_TEST_CLIENT);
                
                    if (!I_AM_TEST_CLIENT && SEND_EXEC_REPORT) {
                        // send the execution report back to client Fix Engine
                        ExecutionReport newExec = generateExecution(count);
                        ordStr = newExec.toString();
                        
                        try {
                            KAFKA_PRODUCER.send(new ProducerRecord<String, String>(KAFKA_OUTBOUND_TOPIC_NAME, ordStr)).get();
                            LOGGER.info("*********** Generated ExecutionReport from Server to Client ********************************************************");
                            LOGGER.info("ExecutionReport : " + ordStr);
                            //producer.send(new ProducerRecord<String, String>(topicName, message.toString())).get();
                        } catch (Exception ex) {
                             LOGGER.severe(" Exception : " + ex.getMessage());
                        }
                    }
                            
                } // for end
                lastOrderRcvdTime = new Date();
        	} // if (records.count() == 0)
        	
        } //while loop
        
    }