in artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java [1982:2293]
private synchronized JournalLoadInformation load(final LoaderCallback loadManager,
final boolean changeData,
final JournalState replicationSync,
final AtomicReference<ByteBuffer> wholeFileBufferRef) throws Exception {
JournalState state;
assert (state = this.state) != JournalState.STOPPED &&
state != JournalState.LOADED &&
state != replicationSync;
checkControlFile(wholeFileBufferRef);
records.clear();
filesRepository.clear();
transactions.clear();
currentFile = null;
final Map<Long, TransactionHolder> loadTransactions = new LinkedHashMap<>();
final List<JournalFile> orderedFiles = orderFiles();
filesRepository.calculateNextfileID(orderedFiles);
int lastDataPos = JournalImpl.SIZE_HEADER;
// AtomicLong is used only as a reference, not as an Atomic value
final AtomicLong maxID = new AtomicLong(-1);
for (final JournalFile file : orderedFiles) {
logger.trace("Loading file {}", file.getFile().getFileName());
final AtomicBoolean hasData = new AtomicBoolean(false);
int resultLastPost = JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback() {
private void checkID(final long id) {
if (id > maxID.longValue()) {
maxID.lazySet(id);
}
}
@Override
public void onReadAddRecord(final RecordInfo info) throws Exception {
checkID(info.id);
hasData.lazySet(true);
loadManager.addRecord(info);
records.put(info.id, new JournalRecord(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1));
}
@Override
public void onReadUpdateRecord(final RecordInfo info) throws Exception {
checkID(info.id);
hasData.lazySet(true);
loadManager.updateRecord(info);
JournalRecord posFiles = records.get(info.id);
if (posFiles != null) {
// It's legal for this to be null. The file(s) with the may
// have been deleted
// just leaving some updates in this file
posFiles.addUpdateFile(file, info.data.length + JournalImpl.SIZE_ADD_RECORD + 1, info.replaceableUpdate); // +1 = compact
// count
}
}
@Override
public void onReadDeleteRecord(final long recordID) throws Exception {
hasData.lazySet(true);
loadManager.deleteRecord(recordID);
JournalRecord posFiles = records.remove(recordID);
if (posFiles != null) {
posFiles.delete(file);
}
}
@Override
public void onReadUpdateRecordTX(final long transactionID, final RecordInfo info) throws Exception {
onReadAddRecordTX(transactionID, info);
}
@Override
public void onReadAddRecordTX(final long transactionID, final RecordInfo info) throws Exception {
checkID(info.id);
hasData.lazySet(true);
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.recordInfos.add(info);
JournalTransaction tnp = transactions.get(transactionID);
if (tnp == null) {
tnp = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, tnp);
}
tnp.addPositive(file, info.id, info.data.length + JournalImpl.SIZE_ADD_RECORD_TX + 1, info.replaceableUpdate); // +1 = compact
// count
}
@Override
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo info) throws Exception {
hasData.lazySet(true);
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.recordsToDelete.add(info);
JournalTransaction tnp = transactions.get(transactionID);
if (tnp == null) {
tnp = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, tnp);
}
tnp.addNegative(file, info.id);
}
@Override
public void onReadPrepareRecord(final long transactionID,
final byte[] extraData,
final int numberOfRecords) throws Exception {
hasData.lazySet(true);
TransactionHolder tx = loadTransactions.get(transactionID);
if (tx == null) {
// The user could choose to prepare empty transactions
tx = new TransactionHolder(transactionID);
loadTransactions.put(transactionID, tx);
}
tx.prepared = true;
tx.extraData = extraData;
JournalTransaction journalTransaction = transactions.get(transactionID);
if (journalTransaction == null) {
journalTransaction = new JournalTransaction(transactionID, JournalImpl.this);
transactions.put(transactionID, journalTransaction);
}
boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
if (healthy) {
journalTransaction.prepare(file);
} else {
ActiveMQJournalLogger.LOGGER.preparedTXIncomplete(transactionID);
tx.invalid = true;
}
}
@Override
public void onReadCommitRecord(final long transactionID, final int numberOfRecords) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
// The commit could be alone on its own journal-file and the
// whole transaction body was reclaimed but not the
// commit-record
// So it is completely legal to not find a transaction at this
// point
// If we can't find it, we assume the TX was reclaimed and we
// ignore this
if (tx != null) {
JournalTransaction journalTransaction = transactions.remove(transactionID);
if (journalTransaction == null) {
throw new IllegalStateException("Cannot find tx " + transactionID);
}
boolean healthy = checkTransactionHealth(file, journalTransaction, orderedFiles, numberOfRecords);
if (healthy) {
for (RecordInfo txRecord : tx.recordInfos) {
if (txRecord.isUpdate) {
loadManager.updateRecord(txRecord);
} else {
loadManager.addRecord(txRecord);
}
}
for (RecordInfo deleteValue : tx.recordsToDelete) {
loadManager.deleteRecord(deleteValue.id);
}
journalTransaction.commit(file);
} else {
ActiveMQJournalLogger.LOGGER.txMissingElements(transactionID);
journalTransaction.forget();
}
hasData.lazySet(true);
}
}
@Override
public void onReadRollbackRecord(final long transactionID) throws Exception {
TransactionHolder tx = loadTransactions.remove(transactionID);
// The rollback could be alone on its own journal-file and the
// whole transaction body was reclaimed but the commit-record
// So it is completely legal to not find a transaction at this
// point
if (tx != null) {
JournalTransaction tnp = transactions.remove(transactionID);
if (tnp == null) {
throw new IllegalStateException("Cannot find tx " + transactionID);
}
// There is no need to validate summaries/holes on
// Rollbacks.. We will ignore the data anyway.
tnp.rollback(file);
hasData.lazySet(true);
}
}
@Override
public void markAsDataFile(final JournalFile file) {
hasData.lazySet(true);
}
}, wholeFileBufferRef, false, this.replaceableRecords);
if (hasData.get()) {
lastDataPos = resultLastPost;
filesRepository.addDataFileOnBottom(file);
} else {
if (changeData) {
// Empty dataFiles with no data
filesRepository.addFreeFile(file, false, isRemoveExtraFilesOnLoad());
}
}
}
if (replicationSync == JournalState.SYNCING) {
assert filesRepository.getDataFiles().isEmpty();
setJournalState(JournalState.SYNCING);
return new JournalLoadInformation(0, -1);
}
setUpCurrentFile(lastDataPos);
setJournalState(JournalState.LOADED);
for (TransactionHolder transaction : loadTransactions.values()) {
if ((!transaction.prepared || transaction.invalid) && replicationSync != JournalState.SYNCING_UP_TO_DATE) {
ActiveMQJournalLogger.LOGGER.uncomittedTxFound(transaction.transactionID);
if (changeData) {
// I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
this.appendRollbackRecord(transaction.transactionID, false);
}
loadManager.failedTransaction(transaction.transactionID, transaction.recordInfos, transaction.recordsToDelete);
} else {
for (RecordInfo info : transaction.recordInfos) {
if (info.id > maxID.get()) {
maxID.lazySet(info.id);
}
}
PreparedTransactionInfo info = new PreparedTransactionInfo(transaction.transactionID, transaction.extraData);
info.getRecords().addAll(transaction.recordInfos);
info.getRecordsToDelete().addAll(transaction.recordsToDelete);
loadManager.addPreparedTransaction(info);
}
}
if (changeData) {
checkReclaimStatus();
}
return new JournalLoadInformation(records.size(), maxID.longValue());
}