public static void recover()

in artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java [76:249]


   public static void recover(ActionContext context, Configuration configuration, String journallocation, File journalOutput, File largeMessage, boolean reclaimed) throws Exception {

      File journal = new File(journallocation);

      if (!journalOutput.exists()) {
         if (!journalOutput.mkdirs()) {
            throw new IllegalStateException("It was not possible to create " + journalOutput);
         }
      }

      if (journalOutput.exists() && !journalOutput.isDirectory()) {
         throw new IllegalStateException(journalOutput + " is not a directory");
      }

      SequentialFileFactory outputFF = new NIOSequentialFileFactory(journalOutput, null, 1);
      outputFF.setDatasync(false);
      JournalImpl targetJournal = new JournalImpl(configuration.getJournalFileSize(), 2, 2, -1, 0, outputFF, "activemq-data", "amq", 1);
      targetJournal.setAutoReclaim(false);

      targetJournal.start();
      targetJournal.loadInternalOnly();

      SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journal, null, 1);
      SequentialFileFactory largeMessagesFF = new NIOSequentialFileFactory(largeMessage, null, 1);

      // Will use only default values. The load function should adapt to anything different
      JournalImpl messagesJournal = new JournalImpl(configuration.getJournalFileSize(), configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);

      List<JournalFile> files = messagesJournal.orderFiles();

      HashSet<Byte> userRecordsOfInterest = new HashSet<>();
      userRecordsOfInterest.add(JournalRecordIds.ADD_LARGE_MESSAGE);
      userRecordsOfInterest.add(JournalRecordIds.ADD_MESSAGE);
      userRecordsOfInterest.add(JournalRecordIds.ADD_MESSAGE_PROTOCOL);
      userRecordsOfInterest.add(JournalRecordIds.ADD_REF);
      userRecordsOfInterest.add(JournalRecordIds.PAGE_TRANSACTION);

      HashSet<Pair<Long, Long>> routeBindigns = new HashSet<>();

      for (JournalFile file : files) {
         // For reviewers and future maintainers: I really meant System.out.println here
         // This is part of the CLI, hence this is like user's output
         context.out.println("Recovering messages from file " + file);

         JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() {
            long lastlargeMessageId = -1;
            SequentialFile largeMessageFile;
            @Override
            public void done() {
               try {
                  if (largeMessageFile != null) {
                     largeMessageFile.close();
                     largeMessageFile = null;
                  }
               } catch (Exception e) {
                  e.printStackTrace();
               }
            }
            @Override
            public void onReadEventRecord(RecordInfo info) throws Exception {
               switch (info.getUserRecordType()) {
                  case JournalRecordIds.ADD_REF:
                     onReadUpdateRecord(info);
                     break;

                  case JournalRecordIds.ADD_MESSAGE_BODY:
                     if (lastlargeMessageId != info.id || largeMessageFile == null) {
                        if (largeMessageFile != null) {
                           largeMessageFile.close();
                        }

                        largeMessageFile = largeMessagesFF.createSequentialFile(info.id + ".msg");
                        largeMessageFile.open();
                        largeMessageFile.position(largeMessageFile.size());
                        lastlargeMessageId = info.id;
                     }
                     largeMessageFile.write(new ByteArrayEncoding(info.data), false, null);
                     break;

                  default:
                     onReadAddRecord(info);
               }
            }

            @Override
            public void onReadAddRecord(RecordInfo info) throws Exception {
               if (userRecordsOfInterest.contains(info.getUserRecordType())) {

                  if (targetJournal.getRecords().get(info.id) != null) {
                     // Really meant System.out.. user's information on the CLI
                     context.out.println("RecordID " + info.id + " would been duplicated, ignoring it");
                     return;
                  }
                  try {
                     targetJournal.appendAddRecord(info.id, info.userRecordType, info.data, false);
                  } catch (Exception e) {
                     // Really meant System.out.. user's information on the CLI
                     context.out.println("Cannot append record for " + info.id + "->" + e.getMessage());
                  }
               }
            }

            @Override
            public void onReadUpdateRecord(RecordInfo info) throws Exception {
               if (userRecordsOfInterest.contains(info.getUserRecordType())) {
                  if (info.getUserRecordType() == JournalRecordIds.ADD_REF) {
                     long queue = ByteUtil.bytesToLong(info.data);
                     Pair<Long, Long> pairQueue = new Pair<>(info.id, queue);
                     if (routeBindigns.contains(pairQueue)) {
                        // really meant system.out
                        context.out.println("AddReference on " + info.id + " / queue=" + queue + " has already been recorded, ignoring it");
                        return;
                     }

                     routeBindigns.add(pairQueue);
                  }
                  try {
                     targetJournal.appendUpdateRecord(info.id, info.userRecordType, info.data, true);
                  } catch (Exception e) {
                     context.out.println("Cannot update record " + info.id + "-> " + e.getMessage());
                     e.printStackTrace(context.err);
                  }
               }
            }

            @Override
            public void onReadDeleteRecord(long recordID) throws Exception {

            }

            @Override
            public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception {
               onReadAddRecord(info);
            }

            @Override
            public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception {
               onReadUpdateRecord(info);
            }

            @Override
            public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception {

            }

            @Override
            public void onReadPrepareRecord(long transactionID,
                                            byte[] extraData,
                                            int numberOfRecords) throws Exception {

            }

            @Override
            public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception {

            }

            @Override
            public void onReadRollbackRecord(long transactionID) throws Exception {

            }

            @Override
            public void markAsDataFile(JournalFile file) {

            }
         }, null, reclaimed, null);
      }

      targetJournal.flush();

      targetJournal.stop();
      outputFF.stop();
   }