in activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreExporter.java [102:235]
void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
final long[] messageKeyCounter = new long[]{0};
final long[] containerKeyCounter = new long[]{0};
final ExportStreamManager manager = new ExportStreamManager(fos, 1);
final int[] preparedTxs = new int[]{0};
store.createTransactionStore().recover(new TransactionRecoveryListener() {
@Override
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
preparedTxs[0] += 1;
}
});
if (preparedTxs[0] > 0) {
throw new Exception("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to export.");
}
for (ActiveMQDestination odest : store.getDestinations()) {
containerKeyCounter[0]++;
if (odest instanceof ActiveMQQueue) {
ActiveMQQueue dest = (ActiveMQQueue) odest;
MessageStore queue = store.createQueueMessageStore(dest);
QueuePB.Bean destRecord = new QueuePB.Bean();
destRecord.setKey(containerKeyCounter[0]);
destRecord.setBindingKind(ptp_kind);
final long[] seqKeyCounter = new long[]{0};
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("@class", "queue_destination");
jsonMap.put("name", dest.getQueueName());
String json = mapper.writeValueAsString(jsonMap);
System.out.println(json);
destRecord.setBindingData(new UTF8Buffer(json));
manager.store_queue(destRecord);
queue.recover(new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
return true;
}
});
} else if (odest instanceof ActiveMQTopic) {
ActiveMQTopic dest = (ActiveMQTopic) odest;
TopicMessageStore topic = store.createTopicMessageStore(dest);
for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
QueuePB.Bean destRecord = new QueuePB.Bean();
destRecord.setKey(containerKeyCounter[0]);
destRecord.setBindingKind(ds_kind);
// TODO: use a real JSON encoder like jackson.
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("@class", "dsub_destination");
jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName());
HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
jsonTopic.put("name", dest.getTopicName());
jsonMap.put("topics", new Object[]{jsonTopic});
if (sub.getSelector() != null) {
jsonMap.put("selector", sub.getSelector());
}
jsonMap.put("noLocal", sub.isNoLocal());
String json = mapper.writeValueAsString(jsonMap);
System.out.println(json);
destRecord.setBindingData(new UTF8Buffer(json));
manager.store_queue(destRecord);
final long seqKeyCounter[] = new long[]{0};
topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
MessagePB.Bean messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
QueueEntryPB.Bean entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
return true;
}
});
}
}
}
manager.finish();
}