in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/scheduler/JobSchedulerStoreImpl.java [678:792]
protected void process(JournalCommand<?> data, final Location location) throws IOException {
data.visit(new Visitor() {
@Override
public void visit(final KahaAddScheduledJobCommand command) throws IOException {
final JobSchedulerImpl scheduler;
indexLock.writeLock().lock();
try {
try {
scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
} catch (Exception e) {
throw new IOException(e);
}
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
scheduler.process(tx, command, location);
}
});
processLocation(location);
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void visit(final KahaRemoveScheduledJobCommand command) throws IOException {
final JobSchedulerImpl scheduler;
indexLock.writeLock().lock();
try {
try {
scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
} catch (Exception e) {
throw new IOException(e);
}
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
scheduler.process(tx, command, location);
}
});
processLocation(location);
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void visit(final KahaRemoveScheduledJobsCommand command) throws IOException {
final JobSchedulerImpl scheduler;
indexLock.writeLock().lock();
try {
try {
scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
} catch (Exception e) {
throw new IOException(e);
}
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
scheduler.process(tx, command, location);
}
});
processLocation(location);
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void visit(final KahaRescheduleJobCommand command) throws IOException {
final JobSchedulerImpl scheduler;
indexLock.writeLock().lock();
try {
try {
scheduler = (JobSchedulerImpl) getJobScheduler(command.getScheduler());
} catch (Exception e) {
throw new IOException(e);
}
getPageFile().tx().execute(new Transaction.Closure<IOException>() {
@Override
public void execute(Transaction tx) throws IOException {
scheduler.process(tx, command, location);
}
});
processLocation(location);
} finally {
indexLock.writeLock().unlock();
}
}
@Override
public void visit(final KahaDestroySchedulerCommand command) {
try {
removeJobScheduler(command.getScheduler());
} catch (Exception e) {
LOG.warn("Failed to remove scheduler: {}", command.getScheduler());
}
processLocation(location);
}
@Override
public void visit(KahaTraceCommand command) {
processLocation(location);
}
});
}