protected List commit()

in core/src/main/java/com/jetbrains/youtrackdb/internal/core/storage/impl/local/AbstractStorage.java [1855:2053]


  protected List<RecordOperation> commit(
      final FrontendTransactionImpl frontendTransaction, final boolean allocated) {
    // XXX: At this moment, there are two implementations of the commit method. One for regular
    // client transactions and one for
    // implicit micro-transactions. The implementations are quite identical, but operate on slightly
    // different data. If you change
    // this method don't forget to change its counterpart:
    //
    //
    try {
      final var session = frontendTransaction.getDatabaseSession();
      final var indexOperations =
          getSortedIndexOperations(frontendTransaction);

      session.getMetadata().makeThreadLocalSchemaSnapshot();

      final var recordOperations = frontendTransaction.getRecordOperationsInternal();
      final var collectionsToLock = new TreeMap<Integer, StorageCollection>();
      final Map<RecordOperation, Integer> collectionOverrides = new IdentityHashMap<>(8);

      final Set<RecordOperation> newRecords = new TreeSet<>(COMMIT_RECORD_OPERATION_COMPARATOR);
      for (final var recordOperation : recordOperations) {
        var record = recordOperation.record;
        if (recordOperation.type == RecordOperation.CREATED
            || recordOperation.type == RecordOperation.UPDATED) {

          if (record.isUnloaded()) {
            throw new IllegalStateException(
                "Unloaded record " + record.getIdentity() + " cannot be committed");
          }

          if (record instanceof EntityImpl) {
            ((EntityImpl) record).validate();
          }
        }

        if (recordOperation.type == RecordOperation.UPDATED
            || recordOperation.type == RecordOperation.DELETED) {
          final var collectionId = recordOperation.record.getIdentity().getCollectionId();
          collectionsToLock.put(collectionId, doGetAndCheckCollection(collectionId));
        } else if (recordOperation.type == RecordOperation.CREATED) {
          newRecords.add(recordOperation);

          final RID rid = record.getIdentity();

          var collectionId = rid.getCollectionId();

          if (record.isDirty()
              && collectionId == RID.COLLECTION_ID_INVALID
              && record instanceof EntityImpl) {
            // TRY TO FIX COLLECTION ID TO THE DEFAULT COLLECTION ID DEFINED IN SCHEMA CLASS

            var cls = ((EntityImpl) record).getImmutableSchemaClass(session);
            if (cls != null) {
              collectionId = cls.getCollectionForNewInstance((EntityImpl) record);
              collectionOverrides.put(recordOperation, collectionId);
            }
          }
          collectionsToLock.put(collectionId, doGetAndCheckCollection(collectionId));
        }
      }

      final List<RecordOperation> result = new ArrayList<>(8);
      stateLock.readLock().lock();
      try {
        try {
          checkOpennessAndMigration();

          makeStorageDirty();

          Throwable error = null;
          startStorageTx(frontendTransaction);
          try {
            final var atomicOperation = atomicOperationsManager.getCurrentOperation();
            lockCollections(collectionsToLock);
            lockLinkBags(collectionsToLock);
            lockIndexes(indexOperations);

            final Map<RecordOperation, PhysicalPosition> positions = new IdentityHashMap<>(8);
            for (final var recordOperation : newRecords) {
              final var rec = recordOperation.record;

              if (allocated) {
                if (rec.getIdentity().isPersistent()) {
                  positions.put(
                      recordOperation,
                      new PhysicalPosition(rec.getIdentity().getCollectionPosition()));
                } else {
                  throw new StorageException(name,
                      "Impossible to commit a transaction with not valid rid in pre-allocated"
                          + " commit");
                }
              } else if (rec.isDirty() && !rec.getIdentity().isPersistent()) {
                final var rid = rec.getIdentity();
                final var oldRID = rid.copy();

                final var collectionOverride = collectionOverrides.get(recordOperation);
                final int collectionId =
                    Optional.ofNullable(collectionOverride).orElseGet(rid::getCollectionId);

                final var collection = doGetAndCheckCollection(collectionId);

                var physicalPosition =
                    collection.allocatePosition(
                        rec.getRecordType(),
                        atomicOperation);

                if (rid.getCollectionPosition() > -1) {
                  // CREATE EMPTY RECORDS UNTIL THE POSITION IS REACHED. THIS IS THE CASE WHEN A
                  // SERVER IS OUT OF SYNC
                  // BECAUSE A TRANSACTION HAS BEEN ROLLED BACK BEFORE TO SEND THE REMOTE CREATES.
                  // SO THE OWNER NODE DELETED
                  // RECORD HAVING A HIGHER COLLECTION POSITION
                  while (rid.getCollectionPosition() > physicalPosition.collectionPosition) {
                    physicalPosition =
                        collection.allocatePosition(
                            rec.getRecordType(),
                            atomicOperation);
                  }

                  if (rid.getCollectionPosition() != physicalPosition.collectionPosition) {
                    throw new ConcurrentCreateException(name,
                        rid, new RecordId(collection.getId(),
                        physicalPosition.collectionPosition));
                  }
                }
                positions.put(recordOperation, physicalPosition);

                if (rid instanceof ChangeableRecordId changeableRecordId) {
                  changeableRecordId.setCollectionAndPosition(collection.getId(),

                      physicalPosition.collectionPosition);
                } else {
                  throw new DatabaseException(name,
                      "Provided record is not new and its identity cannot be changed");
                }
                assert frontendTransaction.assertIdentityChangedAfterCommit(oldRID, rid);
              }
            }

            for (final var recordOperation : recordOperations) {
              commitEntry(
                  frontendTransaction,
                  atomicOperation,
                  recordOperation,
                  positions.get(recordOperation),
                  session.getSerializer());
              result.add(recordOperation);
            }

            //update of b-tree based link bags
            var recordSerializationContext = frontendTransaction.getRecordSerializationContext();
            recordSerializationContext.executeOperations(atomicOperation, this);

            commitIndexes(frontendTransaction.getDatabaseSession(), indexOperations);
          } catch (final IOException | RuntimeException e) {
            error = e;
            if (e instanceof RuntimeException) {
              throw ((RuntimeException) e);
            } else {
              throw BaseException.wrapException(
                  new StorageException(name, "Error during transaction commit"), e, name);
            }
          } finally {
            if (error != null) {
              rollback(error);
            } else {
              endStorageTx();
            }
            this.transaction.set(null);
          }
        } finally {
          atomicOperationsManager.ensureThatComponentsUnlocked();
          session.getMetadata().clearThreadLocalSchemaSnapshot();
        }
      } finally {
        stateLock.readLock().unlock();
      }

      if (logger.isDebugEnabled()) {
        LogManager.instance()
            .debug(
                this,
                "%d Committed transaction %d on database '%s' (result=%s)",
                logger, Thread.currentThread().getId(),
                frontendTransaction.getId(),
                session.getDatabaseName(),
                result);
      }
      return result;
    } catch (final RuntimeException ee) {
      throw logAndPrepareForRethrow(ee);
    } catch (final Error ee) {
      atomicOperationsManager.alarmClearOfAtomicOperation();
      throw logAndPrepareForRethrow(ee);
    } catch (final Throwable t) {
      throw logAndPrepareForRethrow(t);
    }
  }