in src/main/java/com/aws/amazonmq/blog/testcases/FIFO_Testcase_2.java [33:112]
public static void main(String[] args) throws InterruptedException {
Logger logger = LoggerFactory.getLogger(FIFO_Testcase_2.class);
String amazonMQSSLEndPoint = args[0]; // OpenWire Endpoint for AmazonMQ Broker
String username = args[1]; // Username
String password = args[2]; // Password
String queueName = args[3]; // name of the AmazonMQ Queue Name
int numMsgs = Integer.valueOf(args[4]); // number of messages to be inserted by each producer
String useCaseId = args[5];
String prefetchSize = args[6];
try {
// Grab the Scheduler instance from the Factory and start it off
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.start();
// define producer jobs and tie them to MsgProducer which does the actual work
JobDetail job1 = newJob(MsgProducer_FIFO.class)
.withIdentity(useCaseId.concat("-").concat("producer-job1"), "group1")
.usingJobData("producerName", "producer_1").usingJobData("queueName", queueName)
.usingJobData("amazonMQSSLEndPoint", amazonMQSSLEndPoint).usingJobData("username", username)
.usingJobData("password", password).usingJobData("useCaseId", useCaseId)
.usingJobData("msgGroup", "Group-A").usingJobData("numMsgs", numMsgs)
.usingJobData("msgIdSequence", 250000).usingJobData("msgPrefix", "A-").build();
JobDetail job2 = newJob(MsgProducer_FIFO.class)
.withIdentity(useCaseId.concat("-").concat("producer-job2"), "group1")
.usingJobData("producerName", "producer_2").usingJobData("queueName", queueName)
.usingJobData("amazonMQSSLEndPoint", amazonMQSSLEndPoint).usingJobData("username", username)
.usingJobData("password", password).usingJobData("useCaseId", useCaseId)
.usingJobData("msgGroup", "Group-B").usingJobData("numMsgs", numMsgs)
.usingJobData("msgIdSequence", 300000).usingJobData("msgPrefix", "B-").build();
JobDetail job3 = newJob(MsgProducer_FIFO.class)
.withIdentity(useCaseId.concat("-").concat("producer-job3"), "group1")
.usingJobData("producerName", "producer_3").usingJobData("queueName", queueName)
.usingJobData("amazonMQSSLEndPoint", amazonMQSSLEndPoint).usingJobData("username", username)
.usingJobData("password", password).usingJobData("useCaseId", useCaseId)
.usingJobData("msgGroup", "Group-C").usingJobData("numMsgs", numMsgs)
.usingJobData("msgIdSequence", 350000).usingJobData("msgPrefix", "C-").build();
// define consumer jobs and tie them to MsgConsumer which does the actual work
JobDetail job4 = newJob(MsgConsumer_CustomPrefetch.class)
.withIdentity(useCaseId.concat("-").concat("consumer-job1"), "group1")
.usingJobData("consumerName", "consumer_1").usingJobData("queueName", queueName)
.usingJobData("amazonMQSSLEndPoint", amazonMQSSLEndPoint).usingJobData("username", username)
.usingJobData("password", password).usingJobData("useCaseId", useCaseId)
.usingJobData("numMsgs", numMsgs * 4).usingJobData("prefetchSize", prefetchSize).build(); // numMsgs * 3
JobDetail job5 = newJob(MsgConsumer_CustomPrefetch.class)
.withIdentity(useCaseId.concat("-").concat("consumer-job2"), "group1")
.usingJobData("consumerName", "consumer_2").usingJobData("queueName", queueName)
.usingJobData("amazonMQSSLEndPoint", amazonMQSSLEndPoint).usingJobData("username", username)
.usingJobData("password", password).usingJobData("useCaseId", useCaseId)
.usingJobData("numMsgs", numMsgs * 4).usingJobData("prefetchSize", prefetchSize).build(); // numMsgs * 3
Trigger trigger1 = newTrigger().withIdentity("trigger1", "group1").startNow()
.withSchedule(simpleSchedule().withRepeatCount(0)).build();
Trigger trigger2 = newTrigger().withIdentity("trigger2", "group1").startNow()
.withSchedule(simpleSchedule().withRepeatCount(0)).build();
Trigger trigger3 = newTrigger().withIdentity("trigger3", "group1").startNow()
.withSchedule(simpleSchedule().withRepeatCount(0)).build();
Trigger trigger4 = newTrigger().withIdentity("trigger4", "group1").startNow()
.withSchedule(simpleSchedule().withRepeatCount(0)).build();
Trigger trigger5 = newTrigger().withIdentity("trigger5", "group1").startNow()
.withSchedule(simpleSchedule().withRepeatCount(0)).build();
logger.info("scheduling producer jobs");
scheduler.scheduleJob(job1, trigger1);
scheduler.scheduleJob(job2, trigger2);
scheduler.scheduleJob(job3, trigger3);
// Thread.sleep(120000);
logger.info("scheduling consumer jobs");
scheduler.scheduleJob(job4, trigger4);
scheduler.scheduleJob(job5, trigger5);
Thread.sleep(3000 * 50);
logger.info("unscheduling jobs and stopping the scheduler. Use case id: " + useCaseId);
scheduler.unscheduleJob(trigger1.getKey());
scheduler.unscheduleJob(trigger2.getKey());
scheduler.unscheduleJob(trigger3.getKey());
scheduler.unscheduleJob(trigger4.getKey());
scheduler.unscheduleJob(trigger5.getKey());
scheduler.shutdown();
} catch (SchedulerException se) {
se.printStackTrace();
}
}