in src/main/java/com/amazonaws/fixengineonaws/TestClient.java [296:375]
private static void processKafkaMsgs() {
LOGGER.fine("****processKafkaMsgs: Start ");
// Loop until ctrl + c
// Harman: create a thread
int count = 0;
boolean firstTime = true;
int x = 1;
//Date lastTime = null;
long totalTimeInSec = 0;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
Date firstOrderRcvdTime = null;
while(x ==1) {
// Poll for records
ConsumerRecords<String, Object> records = KAFKA_CONSUMER.poll(Duration.ofMillis(50));
//LOGGER.fine(" After polling consumer records.count() : " + records.count());
// Did we get any?
if (records.count() == 0) {
// timeout/nothing to read
LOGGER.info("nothing to read from Kafka");
if (count > 0) {
Date currentTime = new Date();
totalTimeInSec = (currentTime.getTime() - lastOrderRcvdTime.getTime())/1000;
if (totalTimeInSec > 20) {
x = 0;
totalTimeInSec = (lastOrderRcvdTime.getTime() - firstOrderRcvdTime.getTime() )/1000 ;
if (totalTimeInSec < 1) totalTimeInSec = 1;
double tps = count/totalTimeInSec;
LOGGER.info(" ************ Order Received Performance & Througput Results ******************* ");
LOGGER.info("\n Start Time: " + sdf.format(firstOrderRcvdTime) +
"\n End Time: " + sdf.format(lastOrderRcvdTime) + "\n Total Messages Processed: " + count
+ "\n Total Processing Time (seconds) " + totalTimeInSec + "\n TPS: " + tps);
LOGGER.info(" ************ ************ ************ ************ ************");
}
}
} else {
// Yes, loop over records
// for(ConsumerRecord<String, String> record: records) {
for(ConsumerRecord<String, Object> record: records) {
// Display record and count
if (count == 0) firstOrderRcvdTime = new Date();
count += 1;
LOGGER.fine( count + ": " + record.value());
String ordStr = record.value().toString();
LOGGER.info("*********** ORDER RCVD from Client or Server *****************************************************************************************");
LOGGER.info("*** processKafkaMsgs() ordStr : " + ordStr);
// LOGGER.info("processInboundKafkaMsgs() I_AM_TEST_CLIENT : " + I_AM_TEST_CLIENT);
if (!I_AM_TEST_CLIENT && SEND_EXEC_REPORT) {
// send the execution report back to client Fix Engine
ExecutionReport newExec = generateExecution(count);
ordStr = newExec.toString();
try {
KAFKA_PRODUCER.send(new ProducerRecord<String, String>(KAFKA_OUTBOUND_TOPIC_NAME, ordStr)).get();
LOGGER.info("*********** Generated ExecutionReport from Server to Client ********************************************************");
LOGGER.info("ExecutionReport : " + ordStr);
//producer.send(new ProducerRecord<String, String>(topicName, message.toString())).get();
} catch (Exception ex) {
LOGGER.severe(" Exception : " + ex.getMessage());
}
}
} // for end
lastOrderRcvdTime = new Date();
} // if (records.count() == 0)
} //while loop
}