public void execute()

in src/main/java/com/aws/amazonmq/blog/util/MsgConsumer_CustomPrefetch.java [42:132]


	public void execute(JobExecutionContext context) throws JobExecutionException {
		AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.standard().build();
		// Retrieve job specific settings
		JobKey key = context.getJobDetail().getKey();
		JobDataMap dataMap = context.getJobDetail().getJobDataMap();
		String amazonMQSSLEndpoint = dataMap.getString("amazonMQSSLEndPoint");
		String amazonMQLoginUsername = dataMap.getString("username");
		String amazonMQLoginPWD = dataMap.getString("password");
		String queueName = dataMap.getString("queueName");
		String consumerName = dataMap.getString("consumerName");
		String useCaseId = dataMap.getString("useCaseId");
		int numMsgs = dataMap.getInt("numMsgs");
		String prefetchSize = dataMap.getString("prefetchSize");

		System.out.println("Running Job: " + key.getName());
		// Create AmazonMQ connection factory
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(amazonMQSSLEndpoint);
		connectionFactory.setUserName(amazonMQLoginUsername);
		connectionFactory.setPassword(amazonMQLoginPWD);
		try {
			// Create a connection
			long t1 = System.currentTimeMillis();
			Connection connection = connectionFactory.createConnection();
			connection.start();
			long t2 = System.currentTimeMillis();
			System.out.printf("Connection started for %s. It took %d milliseconds. \n", consumerName, (t2 - t1));
			// Create a session.
			t1 = System.currentTimeMillis();
			Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			ActiveMQQueue queue = new ActiveMQQueue(
					queueName + "?consumer.dispatchAsync=false&consumer.prefetchSize=" + prefetchSize);
			MessageConsumer consumer = session.createConsumer(queue);
			t2 = System.currentTimeMillis();
			System.out.printf("Consumer Session for Thread %s started. It took %d milliseconds \n", consumerName,
					(t2 - t1));
			System.out.printf("Getting messages from Amazon MQ. Queue Name: %s, Consumer Id: %s \n", queueName,
					consumerName);
			List<GetMsgMetric> getMsgMetricList = new ArrayList<GetMsgMetric>();
			for (int i = 1; i < numMsgs + 1; i++) {
				long t3 = System.currentTimeMillis();
				Message consumerMessage = consumer.receive(50); // milliseconds is the timeout
				long t4 = System.currentTimeMillis();
				Optional<Message> message = Optional.ofNullable(consumerMessage);
				if (message.isPresent()) {
					TextMessage txtMessage = (TextMessage) message.get();
					String msgBody = txtMessage.getText();
					GetMsgMetric getMsgMetric = new GetMsgMetric();
					getMsgMetric.setMsg_id(Integer.valueOf(msgBody.substring(msgBody.lastIndexOf("-") + 1)).toString());
					getMsgMetric.setMsg_body(msgBody);
					getMsgMetric.setMsg_group(txtMessage.getStringProperty("JMSXGroupID"));
					getMsgMetric.setUsecase_id(useCaseId);
					getMsgMetric.setConsumer_id(consumerName);
					getMsgMetric.setTime_to_get_in_millis(Long.valueOf(t4 - t3).toString());
					getMsgMetric.setTime_of_get_currenttime_millis(Long.valueOf(System.currentTimeMillis()).toString());
					getMsgMetricList.add(getMsgMetric);
					// System.out.println("message returned at call #: " + i + ", consumer id: " +
					// consumerName);
				}
				// else {
				// System.out.println("no message returned at call #: " + i + ", consumer id: "
				// + consumerName);
				// }
			}
			if (getMsgMetricList.size() != 0) {
				try {
					Path pathToGetMetricsFile = Paths
							.get(new File("").getAbsolutePath().concat("/amazon-mq-poc-get-metrics").concat("-")
									.concat(useCaseId).concat("-").concat(consumerName).concat("-").concat(Long.toString(System.currentTimeMillis())).concat(".csv"));
					Files.createDirectories(pathToGetMetricsFile.getParent());
					Files.createFile(pathToGetMetricsFile);
					CSVUtil.writeGetMetricsToFile(pathToGetMetricsFile, getMsgMetricList);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (IOException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
			consumer.close();
			session.close();
			connection.close();
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		dynamoDBClient.shutdown();
	}