public void send()

in activemq-broker/src/main/java/org/apache/activemq/broker/scheduler/SchedulerBroker.java [234:331]


    public void send(ProducerBrokerExchange producerExchange, final Message messageSend) throws Exception {
        ConnectionContext context = producerExchange.getConnectionContext();

        final String jobId = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
        final Object cronValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_CRON);
        final Object periodValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD);
        final Object delayValue = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY);

        String physicalName = messageSend.getDestination().getPhysicalName();
        boolean schedularManage = physicalName.regionMatches(true, 0, ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, 0,
            ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION.length());

        if (schedularManage == true) {

            JobScheduler scheduler = getInternalScheduler();
            ActiveMQDestination replyTo = messageSend.getReplyTo();

            String action = (String) messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION);

            if (action != null) {

                Object startTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME);
                Object endTime = messageSend.getProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME);

                if (replyTo != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_BROWSE)) {

                    if (startTime != null && endTime != null) {

                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);

                        for (Job job : scheduler.getAllJobs(start, finish)) {
                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
                        }
                    } else {
                        for (Job job : scheduler.getAllJobs()) {
                            sendScheduledJob(producerExchange.getConnectionContext(), job, replyTo);
                        }
                    }
                }
                if (jobId != null && action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE)) {
                    scheduler.remove(jobId);
                } else if (action.equals(ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL)) {

                    if (startTime != null && endTime != null) {

                        long start = (Long) TypeConversionSupport.convert(startTime, Long.class);
                        long finish = (Long) TypeConversionSupport.convert(endTime, Long.class);

                        scheduler.removeAllJobs(start, finish);
                    } else {
                        scheduler.removeAllJobs();
                    }
                }
            }

        } else if ((cronValue != null || periodValue != null || delayValue != null) && jobId == null) {

            // Check for room in the job scheduler store
            if (systemUsage.getJobSchedulerUsage() != null) {
                JobSchedulerUsage usage = systemUsage.getJobSchedulerUsage();
                if (usage.isFull()) {
                    final String logMessage = "Job Scheduler Store is Full (" +
                        usage.getPercentUsage() + "% of " + usage.getLimit() +
                        "). Stopping producer (" + messageSend.getProducerId() +
                        ") to prevent flooding of the job scheduler store." +
                        " See http://activemq.apache.org/producer-flow-control.html for more info";

                    long start = System.currentTimeMillis();
                    long nextWarn = start;
                    while (!usage.waitForSpace(1000)) {
                        if (context.getStopping().get()) {
                            throw new IOException("Connection closed, send aborted.");
                        }

                        long now = System.currentTimeMillis();
                        if (now >= nextWarn) {
                            LOG.info("{}: {} (blocking for: {}s)", usage, logMessage, (now - start) / 1000);
                            nextWarn = now + 30000l;
                        }
                    }
                }
            }

            if (context.isInTransaction()) {
                context.getTransaction().addSynchronization(new Synchronization() {
                    @Override
                    public void afterCommit() throws Exception {
                        doSchedule(messageSend, cronValue, periodValue, delayValue);
                    }
                });
            } else {
                doSchedule(messageSend, cronValue, periodValue, delayValue);
            }
        } else {
            super.send(producerExchange, messageSend);
        }
    }