in core/src/main/java/com/jetbrains/youtrackdb/internal/core/storage/impl/local/AbstractStorage.java [1855:2053]
protected List<RecordOperation> commit(
final FrontendTransactionImpl frontendTransaction, final boolean allocated) {
// XXX: At this moment, there are two implementations of the commit method. One for regular
// client transactions and one for
// implicit micro-transactions. The implementations are quite identical, but operate on slightly
// different data. If you change
// this method don't forget to change its counterpart:
//
//
try {
final var session = frontendTransaction.getDatabaseSession();
final var indexOperations =
getSortedIndexOperations(frontendTransaction);
session.getMetadata().makeThreadLocalSchemaSnapshot();
final var recordOperations = frontendTransaction.getRecordOperationsInternal();
final var collectionsToLock = new TreeMap<Integer, StorageCollection>();
final Map<RecordOperation, Integer> collectionOverrides = new IdentityHashMap<>(8);
final Set<RecordOperation> newRecords = new TreeSet<>(COMMIT_RECORD_OPERATION_COMPARATOR);
for (final var recordOperation : recordOperations) {
var record = recordOperation.record;
if (recordOperation.type == RecordOperation.CREATED
|| recordOperation.type == RecordOperation.UPDATED) {
if (record.isUnloaded()) {
throw new IllegalStateException(
"Unloaded record " + record.getIdentity() + " cannot be committed");
}
if (record instanceof EntityImpl) {
((EntityImpl) record).validate();
}
}
if (recordOperation.type == RecordOperation.UPDATED
|| recordOperation.type == RecordOperation.DELETED) {
final var collectionId = recordOperation.record.getIdentity().getCollectionId();
collectionsToLock.put(collectionId, doGetAndCheckCollection(collectionId));
} else if (recordOperation.type == RecordOperation.CREATED) {
newRecords.add(recordOperation);
final RID rid = record.getIdentity();
var collectionId = rid.getCollectionId();
if (record.isDirty()
&& collectionId == RID.COLLECTION_ID_INVALID
&& record instanceof EntityImpl) {
// TRY TO FIX COLLECTION ID TO THE DEFAULT COLLECTION ID DEFINED IN SCHEMA CLASS
var cls = ((EntityImpl) record).getImmutableSchemaClass(session);
if (cls != null) {
collectionId = cls.getCollectionForNewInstance((EntityImpl) record);
collectionOverrides.put(recordOperation, collectionId);
}
}
collectionsToLock.put(collectionId, doGetAndCheckCollection(collectionId));
}
}
final List<RecordOperation> result = new ArrayList<>(8);
stateLock.readLock().lock();
try {
try {
checkOpennessAndMigration();
makeStorageDirty();
Throwable error = null;
startStorageTx(frontendTransaction);
try {
final var atomicOperation = atomicOperationsManager.getCurrentOperation();
lockCollections(collectionsToLock);
lockLinkBags(collectionsToLock);
lockIndexes(indexOperations);
final Map<RecordOperation, PhysicalPosition> positions = new IdentityHashMap<>(8);
for (final var recordOperation : newRecords) {
final var rec = recordOperation.record;
if (allocated) {
if (rec.getIdentity().isPersistent()) {
positions.put(
recordOperation,
new PhysicalPosition(rec.getIdentity().getCollectionPosition()));
} else {
throw new StorageException(name,
"Impossible to commit a transaction with not valid rid in pre-allocated"
+ " commit");
}
} else if (rec.isDirty() && !rec.getIdentity().isPersistent()) {
final var rid = rec.getIdentity();
final var oldRID = rid.copy();
final var collectionOverride = collectionOverrides.get(recordOperation);
final int collectionId =
Optional.ofNullable(collectionOverride).orElseGet(rid::getCollectionId);
final var collection = doGetAndCheckCollection(collectionId);
var physicalPosition =
collection.allocatePosition(
rec.getRecordType(),
atomicOperation);
if (rid.getCollectionPosition() > -1) {
// CREATE EMPTY RECORDS UNTIL THE POSITION IS REACHED. THIS IS THE CASE WHEN A
// SERVER IS OUT OF SYNC
// BECAUSE A TRANSACTION HAS BEEN ROLLED BACK BEFORE TO SEND THE REMOTE CREATES.
// SO THE OWNER NODE DELETED
// RECORD HAVING A HIGHER COLLECTION POSITION
while (rid.getCollectionPosition() > physicalPosition.collectionPosition) {
physicalPosition =
collection.allocatePosition(
rec.getRecordType(),
atomicOperation);
}
if (rid.getCollectionPosition() != physicalPosition.collectionPosition) {
throw new ConcurrentCreateException(name,
rid, new RecordId(collection.getId(),
physicalPosition.collectionPosition));
}
}
positions.put(recordOperation, physicalPosition);
if (rid instanceof ChangeableRecordId changeableRecordId) {
changeableRecordId.setCollectionAndPosition(collection.getId(),
physicalPosition.collectionPosition);
} else {
throw new DatabaseException(name,
"Provided record is not new and its identity cannot be changed");
}
assert frontendTransaction.assertIdentityChangedAfterCommit(oldRID, rid);
}
}
for (final var recordOperation : recordOperations) {
commitEntry(
frontendTransaction,
atomicOperation,
recordOperation,
positions.get(recordOperation),
session.getSerializer());
result.add(recordOperation);
}
//update of b-tree based link bags
var recordSerializationContext = frontendTransaction.getRecordSerializationContext();
recordSerializationContext.executeOperations(atomicOperation, this);
commitIndexes(frontendTransaction.getDatabaseSession(), indexOperations);
} catch (final IOException | RuntimeException e) {
error = e;
if (e instanceof RuntimeException) {
throw ((RuntimeException) e);
} else {
throw BaseException.wrapException(
new StorageException(name, "Error during transaction commit"), e, name);
}
} finally {
if (error != null) {
rollback(error);
} else {
endStorageTx();
}
this.transaction.set(null);
}
} finally {
atomicOperationsManager.ensureThatComponentsUnlocked();
session.getMetadata().clearThreadLocalSchemaSnapshot();
}
} finally {
stateLock.readLock().unlock();
}
if (logger.isDebugEnabled()) {
LogManager.instance()
.debug(
this,
"%d Committed transaction %d on database '%s' (result=%s)",
logger, Thread.currentThread().getId(),
frontendTransaction.getId(),
session.getDatabaseName(),
result);
}
return result;
} catch (final RuntimeException ee) {
throw logAndPrepareForRethrow(ee);
} catch (final Error ee) {
atomicOperationsManager.alarmClearOfAtomicOperation();
throw logAndPrepareForRethrow(ee);
} catch (final Throwable t) {
throw logAndPrepareForRethrow(t);
}
}