in activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java [134:287]
void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {
final long[] messageKeyCounter = new long[]{0};
final long[] containerKeyCounter = new long[]{0};
final BackupStreamManager manager = new BackupStreamManager(fos, 1);
final int[] preparedTxs = new int[]{0};
store.createTransactionStore().recover(new TransactionRecoveryListener() {
@Override
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
preparedTxs[0] += 1;
}
});
if (preparedTxs[0] > 0) {
throw new IllegalStateException("Cannot export a store with prepared XA transactions. Please commit or rollback those transactions before attempting to backup.");
}
for (ActiveMQDestination odest : store.getDestinations()) {
if(queue != null && !queue.equals(odest.getPhysicalName())) {
continue;
}
containerKeyCounter[0]++;
if (odest instanceof ActiveMQQueue) {
ActiveMQQueue dest = (ActiveMQQueue) odest;
MessageStore queue = store.createQueueMessageStore(dest);
QueuePB destRecord = new QueuePB();
destRecord.setKey(containerKeyCounter[0]);
destRecord.setBindingKind(ptp_kind);
final long[] seqKeyCounter = new long[]{0};
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("@class", "queue_destination");
jsonMap.put("name", dest.getQueueName());
String json = mapper.writeValueAsString(jsonMap);
logger.info("Queue info:{}", json);
destRecord.setBindingData(new UTF8Buffer(json));
manager.store_queue(destRecord);
MessageRecoveryContext.Builder builder =
new MessageRecoveryContext.Builder()
.maxMessageCountReturned(count)
.messageRecoveryListener(new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
return true;
}
});
if(startMsgId != null || endMsgId != null) {
logger.info("Backing up from startMsgId:{} to endMsgId:{} ", startMsgId, endMsgId);
queue.recoverMessages(builder.endMessageId(endMsgId).startMessageId(startMsgId).build());
} else if(indexesList != null) {
logger.info("Backing up using indexes count:{}", indexesList.size());
for(int idx : indexesList) {
queue.recoverMessages(builder.maxMessageCountReturned(1).offset(idx).build());
}
} else if(offset != null) {
logger.info("Backing up from offset:{} count:{} ", offset, count);
queue.recoverMessages(builder.offset(offset).build());
} else {
queue.recover(builder.build());
}
} else if (odest instanceof ActiveMQTopic) {
ActiveMQTopic dest = (ActiveMQTopic) odest;
TopicMessageStore topic = store.createTopicMessageStore(dest);
for (SubscriptionInfo sub : topic.getAllSubscriptions()) {
QueuePB destRecord = new QueuePB();
destRecord.setKey(containerKeyCounter[0]);
destRecord.setBindingKind(ds_kind);
HashMap<String, Object> jsonMap = new HashMap<String, Object>();
jsonMap.put("@class", "dsub_destination");
jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName());
HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
jsonTopic.put("name", dest.getTopicName());
jsonMap.put("topics", new Object[]{jsonTopic});
if (sub.getSelector() != null) {
jsonMap.put("selector", sub.getSelector());
}
jsonMap.put("noLocal", sub.isNoLocal());
String json = mapper.writeValueAsString(jsonMap);
logger.info("Topic info:{}", json);
destRecord.setBindingData(new UTF8Buffer(json));
manager.store_queue(destRecord);
final long seqKeyCounter[] = new long[]{0};
topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() {
@Override
public boolean hasSpace() {
return true;
}
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return true;
}
@Override
public boolean isDuplicate(MessageId ref) {
return false;
}
@Override
public boolean recoverMessage(Message message) throws IOException {
messageKeyCounter[0]++;
seqKeyCounter[0]++;
MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]);
manager.store_message(messageRecord);
QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
manager.store_queue_entry(entryRecord);
return true;
}
});
}
}
}
manager.finish();
}