in artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/RecoverMessages.java [76:249]
public static void recover(ActionContext context, Configuration configuration, String journallocation, File journalOutput, File largeMessage, boolean reclaimed) throws Exception {
File journal = new File(journallocation);
if (!journalOutput.exists()) {
if (!journalOutput.mkdirs()) {
throw new IllegalStateException("It was not possible to create " + journalOutput);
}
}
if (journalOutput.exists() && !journalOutput.isDirectory()) {
throw new IllegalStateException(journalOutput + " is not a directory");
}
SequentialFileFactory outputFF = new NIOSequentialFileFactory(journalOutput, null, 1);
outputFF.setDatasync(false);
JournalImpl targetJournal = new JournalImpl(configuration.getJournalFileSize(), 2, 2, -1, 0, outputFF, "activemq-data", "amq", 1);
targetJournal.setAutoReclaim(false);
targetJournal.start();
targetJournal.loadInternalOnly();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(journal, null, 1);
SequentialFileFactory largeMessagesFF = new NIOSequentialFileFactory(largeMessage, null, 1);
// Will use only default values. The load function should adapt to anything different
JournalImpl messagesJournal = new JournalImpl(configuration.getJournalFileSize(), configuration.getJournalMinFiles(), configuration.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
List<JournalFile> files = messagesJournal.orderFiles();
HashSet<Byte> userRecordsOfInterest = new HashSet<>();
userRecordsOfInterest.add(JournalRecordIds.ADD_LARGE_MESSAGE);
userRecordsOfInterest.add(JournalRecordIds.ADD_MESSAGE);
userRecordsOfInterest.add(JournalRecordIds.ADD_MESSAGE_PROTOCOL);
userRecordsOfInterest.add(JournalRecordIds.ADD_REF);
userRecordsOfInterest.add(JournalRecordIds.PAGE_TRANSACTION);
HashSet<Pair<Long, Long>> routeBindigns = new HashSet<>();
for (JournalFile file : files) {
// For reviewers and future maintainers: I really meant System.out.println here
// This is part of the CLI, hence this is like user's output
context.out.println("Recovering messages from file " + file);
JournalImpl.readJournalFile(messagesFF, file, new JournalReaderCallback() {
long lastlargeMessageId = -1;
SequentialFile largeMessageFile;
@Override
public void done() {
try {
if (largeMessageFile != null) {
largeMessageFile.close();
largeMessageFile = null;
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void onReadEventRecord(RecordInfo info) throws Exception {
switch (info.getUserRecordType()) {
case JournalRecordIds.ADD_REF:
onReadUpdateRecord(info);
break;
case JournalRecordIds.ADD_MESSAGE_BODY:
if (lastlargeMessageId != info.id || largeMessageFile == null) {
if (largeMessageFile != null) {
largeMessageFile.close();
}
largeMessageFile = largeMessagesFF.createSequentialFile(info.id + ".msg");
largeMessageFile.open();
largeMessageFile.position(largeMessageFile.size());
lastlargeMessageId = info.id;
}
largeMessageFile.write(new ByteArrayEncoding(info.data), false, null);
break;
default:
onReadAddRecord(info);
}
}
@Override
public void onReadAddRecord(RecordInfo info) throws Exception {
if (userRecordsOfInterest.contains(info.getUserRecordType())) {
if (targetJournal.getRecords().get(info.id) != null) {
// Really meant System.out.. user's information on the CLI
context.out.println("RecordID " + info.id + " would been duplicated, ignoring it");
return;
}
try {
targetJournal.appendAddRecord(info.id, info.userRecordType, info.data, false);
} catch (Exception e) {
// Really meant System.out.. user's information on the CLI
context.out.println("Cannot append record for " + info.id + "->" + e.getMessage());
}
}
}
@Override
public void onReadUpdateRecord(RecordInfo info) throws Exception {
if (userRecordsOfInterest.contains(info.getUserRecordType())) {
if (info.getUserRecordType() == JournalRecordIds.ADD_REF) {
long queue = ByteUtil.bytesToLong(info.data);
Pair<Long, Long> pairQueue = new Pair<>(info.id, queue);
if (routeBindigns.contains(pairQueue)) {
// really meant system.out
context.out.println("AddReference on " + info.id + " / queue=" + queue + " has already been recorded, ignoring it");
return;
}
routeBindigns.add(pairQueue);
}
try {
targetJournal.appendUpdateRecord(info.id, info.userRecordType, info.data, true);
} catch (Exception e) {
context.out.println("Cannot update record " + info.id + "-> " + e.getMessage());
e.printStackTrace(context.err);
}
}
}
@Override
public void onReadDeleteRecord(long recordID) throws Exception {
}
@Override
public void onReadAddRecordTX(long transactionID, RecordInfo info) throws Exception {
onReadAddRecord(info);
}
@Override
public void onReadUpdateRecordTX(long transactionID, RecordInfo info) throws Exception {
onReadUpdateRecord(info);
}
@Override
public void onReadDeleteRecordTX(long transactionID, RecordInfo recordInfo) throws Exception {
}
@Override
public void onReadPrepareRecord(long transactionID,
byte[] extraData,
int numberOfRecords) throws Exception {
}
@Override
public void onReadCommitRecord(long transactionID, int numberOfRecords) throws Exception {
}
@Override
public void onReadRollbackRecord(long transactionID) throws Exception {
}
@Override
public void markAsDataFile(JournalFile file) {
}
}, null, reclaimed, null);
}
targetJournal.flush();
targetJournal.stop();
outputFF.stop();
}