public void fromApp()

in src/main/java/com/amazonaws/fixengineonaws/FixEngine.java [177:215]


    public void fromApp(Message message, SessionID sessionID) throws FieldNotFound, IncorrectDataFormat, IncorrectTagValue, UnsupportedMessageType {
        LOGGER.info(MY_IP+"%%%%%%%% FROMAPP: " + message);
        long timeInFromAppStart = System.currentTimeMillis();
        
        if(DROP_FIX_MESSAGES) {
            LOGGER.severe(MY_IP+"%%%%%%%% FROMAPP: DROPPING MESSAGE INSTEAD OF SENDING IT!");        	
        } else {
	        if (!IM_AM_THE_ACTIVE_ENGINE) {
	            LOGGER.fine(MY_IP+"%%%%%%%% FROMAPP: NOT ACTIVE ENGINE, DO Nothing" );
	        }
	
	        LOGGER.fine(MY_IP+"********************** counter: " + messageCounter++);
	
	        String parsedOrdStr = message.toString();
	        LOGGER.fine(MY_IP+"%%%%%%%% FROMAPP: ***SERVER FIX ENGINE*** PARSED ORDER FIX STRING: " + parsedOrdStr);
	
	    //  Object[] array = getKafkaProducer();
	    //     KafkaProducer<String, String> producer = (KafkaProducer) array[0];
	    //     String topicName = (String )array[1];
	
	        try {
	            long timeInFromAppKafkaSendStart = System.currentTimeMillis();
            	if(timeInFromAppKafkaSendStart - lastStatsLogTime > logStatsEvery) {
            		LOGGER.info(MY_IP+"@@@@@@@@@@ INBOUND TIMING STATISTICS: RESETTING TOTALS SINCE IT'S BEEN OVER A MINUTE SINCE THE LAST MESSAGE");
            		totalInboundKafkaProcessingTime = 0;
            		totalInboundMessageProcessingTime = 0;
            	}
            	KAFKA_PRODUCER.send(new ProducerRecord<String, String>(KAFKA_INBOUND_TOPIC_NAME, parsedOrdStr)).get();
	            long timeInFromAppKafkaSendEnd = System.currentTimeMillis();

                totalInboundKafkaProcessingTime += timeInFromAppKafkaSendEnd - timeInFromAppKafkaSendStart;
                totalInboundMessageProcessingTime += timeInFromAppKafkaSendEnd - timeInFromAppStart;
                LOGGER.info(MY_IP+"@@@@@@@@@@ INBOUND TIMING STATISTICS:\ttotalInboundKafkaProcessingTime:\t" + totalInboundKafkaProcessingTime + "\ttotalInboundMessageProcessingTime:\t" + totalInboundMessageProcessingTime);
	        } catch (Exception e) {
	            LOGGER.severe(MY_IP+"%%%%%%%% FROMAPP: Exception:" + e);
	            e.printStackTrace();
	        }       
        }
    }