void export()

in activemq-console/src/main/java/org/apache/activemq/console/command/store/StoreBackup.java [134:287]


    void export(PersistenceAdapter store, BufferedOutputStream fos) throws Exception {

        final long[] messageKeyCounter = new long[]{0};
        final long[] containerKeyCounter = new long[]{0};
        final BackupStreamManager manager = new BackupStreamManager(fos, 1);

        final int[] preparedTxs = new int[]{0};
        store.createTransactionStore().recover(new TransactionRecoveryListener() {
            @Override
            public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] aks) {
                preparedTxs[0] += 1;
            }
        });

        if (preparedTxs[0] > 0) {
            throw new IllegalStateException("Cannot export a store with prepared XA transactions.  Please commit or rollback those transactions before attempting to backup.");
        }

        for (ActiveMQDestination odest : store.getDestinations()) {

            if(queue != null && !queue.equals(odest.getPhysicalName())) {
                continue;
            }

            containerKeyCounter[0]++;
            if (odest instanceof ActiveMQQueue) {
                ActiveMQQueue dest = (ActiveMQQueue) odest;
                MessageStore queue = store.createQueueMessageStore(dest);

                QueuePB destRecord = new QueuePB();
                destRecord.setKey(containerKeyCounter[0]);
                destRecord.setBindingKind(ptp_kind);

                final long[] seqKeyCounter = new long[]{0};

                HashMap<String, Object> jsonMap = new HashMap<String, Object>();
                jsonMap.put("@class", "queue_destination");
                jsonMap.put("name", dest.getQueueName());
                String json = mapper.writeValueAsString(jsonMap);
                logger.info("Queue info:{}", json);
                destRecord.setBindingData(new UTF8Buffer(json));
                manager.store_queue(destRecord);

                MessageRecoveryContext.Builder builder = 
                        new MessageRecoveryContext.Builder()
                            .maxMessageCountReturned(count)
                            .messageRecoveryListener(new MessageRecoveryListener() {

                            @Override
                            public boolean hasSpace() {
                                return true;
                            }

                            @Override
                            public boolean recoverMessageReference(MessageId ref) throws Exception {
                                return true;
                            }

                            @Override
                            public boolean isDuplicate(MessageId ref) {
                                return false;
                            }

                            @Override
                            public boolean recoverMessage(Message message) throws IOException {
                                messageKeyCounter[0]++;
                                seqKeyCounter[0]++;

                                MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]);
                                manager.store_message(messageRecord);

                                QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
                                manager.store_queue_entry(entryRecord);

                                return true;
                            }
                        });

                if(startMsgId != null || endMsgId != null) {
                    logger.info("Backing up from startMsgId:{} to endMsgId:{} ", startMsgId, endMsgId);
                    queue.recoverMessages(builder.endMessageId(endMsgId).startMessageId(startMsgId).build());
                } else if(indexesList != null) {
                    logger.info("Backing up using indexes count:{}", indexesList.size());
                    for(int idx : indexesList) {
                        queue.recoverMessages(builder.maxMessageCountReturned(1).offset(idx).build());
                    }
                } else if(offset != null) {
                    logger.info("Backing up from offset:{} count:{} ", offset, count);
                    queue.recoverMessages(builder.offset(offset).build());
                } else {
                    queue.recover(builder.build());
                }
            } else if (odest instanceof ActiveMQTopic) {
                ActiveMQTopic dest = (ActiveMQTopic) odest;

                TopicMessageStore topic = store.createTopicMessageStore(dest);
                for (SubscriptionInfo sub : topic.getAllSubscriptions()) {

                    QueuePB destRecord = new QueuePB();
                    destRecord.setKey(containerKeyCounter[0]);
                    destRecord.setBindingKind(ds_kind);

                    HashMap<String, Object> jsonMap = new HashMap<String, Object>();
                    jsonMap.put("@class", "dsub_destination");
                    jsonMap.put("name", sub.getClientId() + ":" + sub.getSubscriptionName());
                    HashMap<String, Object> jsonTopic = new HashMap<String, Object>();
                    jsonTopic.put("name", dest.getTopicName());
                    jsonMap.put("topics", new Object[]{jsonTopic});
                    if (sub.getSelector() != null) {
                        jsonMap.put("selector", sub.getSelector());
                    }
                    jsonMap.put("noLocal", sub.isNoLocal());
                    String json = mapper.writeValueAsString(jsonMap);
                    logger.info("Topic info:{}", json);

                    destRecord.setBindingData(new UTF8Buffer(json));
                    manager.store_queue(destRecord);

                    final long seqKeyCounter[] = new long[]{0};
                    topic.recoverSubscription(sub.getClientId(), sub.getSubscriptionName(), new MessageRecoveryListener() {
                        @Override
                        public boolean hasSpace() {
                            return true;
                        }

                        @Override
                        public boolean recoverMessageReference(MessageId ref) throws Exception {
                            return true;
                        }

                        @Override
                        public boolean isDuplicate(MessageId ref) {
                            return false;
                        }

                        @Override
                        public boolean recoverMessage(Message message) throws IOException {
                            messageKeyCounter[0]++;
                            seqKeyCounter[0]++;

                            MessagePB messageRecord = createMessagePB(message, messageKeyCounter[0]);
                            manager.store_message(messageRecord);

                            QueueEntryPB entryRecord = createQueueEntryPB(message, containerKeyCounter[0], seqKeyCounter[0], messageKeyCounter[0]);
                            manager.store_queue_entry(entryRecord);
                            return true;
                        }
                    });

                }
            }
        }
        manager.finish();
    }