in activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java [234:331]
public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
ConnectionContext context = producerExchange.getConnectionContext();
final String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
final Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
final Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
final Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);
String physicalName = messageSend.getDestination().getPhysicalName();
boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());
if (schedularManage == true) {
JobScheduler scheduler = getInternalScheduler();
ActiveMQDestination replyTo = messageSend.getReplyTo();
String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);
if (action != null) {
Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);
if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {
if (startTime != null && endTime != null) {
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
for (Job job : scheduler.getAllJobs(start, finish)) {
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
}
} else {
for (Job job : scheduler.getAllJobs()) {
sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
}
}
}
if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
scheduler.remove(jobId);
} else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {
if (startTime != null && endTime != null) {
long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);
scheduler.removeAllJobs(start, finish);
} else {
scheduler.removeAllJobs();
}
}
}
} else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {
// Check for room in the job scheduler store
if (systemUsage.getJobSchedulerUsage() != null) {
JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
if (usage.isFull()) {
final String logMessage = "Job Scheduler Store is Full (" +
usage.getPercentUsage() + "% of " + usage.getLimit() +
"). Stopping producer (" + messageSend.getProducerId() +
") to prevent flooding of the job scheduler store." +
" See http://activemq.apache.org/producer-flow-control.html for more info";
long start = System.currentTimeMillis();
long nextWarn = start;
while (!usage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
long now = System.currentTimeMillis();
if (now >= nextWarn) {
LOG.info("{}: {} (blocking for: {}s)", usage, logMessage, (now - start) / 1000);
nextWarn = now + 30000l;
}
}
}
}
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
@Override
public void afterCommit() throws Exception {
doSchedule(messageSend, cronValue, periodValue, delayValue);
}
});
} else {
doSchedule(messageSend, cronValue, periodValue, delayValue);
}
} else {
super.send(producerExchange, messageSend);
}
}