in src/main/java/com/aws/amazonmq/blog/util/MsgProducer_FIFO.java [39:124]
public void execute(JobExecutionContext context) throws JobExecutionException {
// Retrieve job specific settings
JobKey key = context.getJobDetail().getKey();
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
String amazonMQSSLEndpoint = dataMap.getString("amazonMQSSLEndPoint");
String username = dataMap.getString("username");
String password = dataMap.getString("password");
String queueName = dataMap.getString("queueName");
String producerName = dataMap.getString("producerName");
String useCaseId = dataMap.getString("useCaseId");
int numMsgs = dataMap.getInt("numMsgs");
String msgGroup = dataMap.getString("msgGroup");
String msgPrefix = dataMap.getString("msgPrefix");
int msgIdSequence = dataMap.getInt("msgIdSequence");
System.out.println("Running Job: " + key.getName());
// Create AmazonMQ connection factory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(amazonMQSSLEndpoint);
connectionFactory.setUserName(username);
connectionFactory.setPassword(password);
try {
// Create a connection
long t1 = System.currentTimeMillis();
Connection connection = connectionFactory.createConnection();
connection.start();
long t2 = System.currentTimeMillis();
System.out.printf("Connection started for %s. It took %d milliseconds. \n", producerName, (t2 - t1));
t1 = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = session.createQueue(queueName);
MessageProducer producer = session.createProducer(producerDestination);
t2 = System.currentTimeMillis();
System.out.printf("Producer Session for Thread %s Started. It took %d milliseconds \n", producerName,
(t2 - t1));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
System.out.printf("Writing messages into Amazon MQ. Queue Name: %s, Producer Id: %s, Message Group: %s \n",
queueName, producerName, msgGroup);
AtomicInteger counter = new AtomicInteger(msgIdSequence);
int temp = counter.get();
List<PutMsgMetric> putMsgMetricList = new ArrayList<PutMsgMetric>();
for (int i = temp; i < temp + numMsgs; i++) {
int msgId = counter.incrementAndGet();
String msgBody = msgPrefix + msgId;
TextMessage tMsg = session.createTextMessage(msgBody);
tMsg.setStringProperty("JMSXGroupID", msgGroup);
long t3 = System.currentTimeMillis();
producer.send(tMsg);
long t4 = System.currentTimeMillis();
PutMsgMetric putMsgMetric = new PutMsgMetric();
putMsgMetric.setMsg_id(Integer.valueOf(msgId).toString());
putMsgMetric.setMsg_body(msgBody);
putMsgMetric.setMsg_group(msgGroup);
putMsgMetric.setProducer_id(producerName);
putMsgMetric.setUsecase_id(useCaseId);
putMsgMetric.setTime_to_put_in_millis(Long.valueOf(t4 - t3).toString());
putMsgMetric.setTime_of_put_currenttime_millis(Long.valueOf(System.currentTimeMillis()).toString());
putMsgMetricList.add(putMsgMetric);
}
if (putMsgMetricList.size() != 0) {
try {
Path pathToPutMetricsFile = Paths
.get(new File("").getAbsolutePath().concat("/amazon-mq-poc-put-metrics").concat("-")
.concat(useCaseId).concat("-").concat(producerName).concat("-").concat(Long.toString(System.currentTimeMillis())).concat(".csv"));
Files.createDirectories(pathToPutMetricsFile.getParent());
Files.createFile(pathToPutMetricsFile);
CSVUtil.writePutMetricsToFile(pathToPutMetricsFile, putMsgMetricList);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}