private void applyToDocumentStore()

in oak-store-document/src/main/java/org/apache/jackrabbit/oak/plugins/document/Commit.java [294:440]


    private void applyToDocumentStore(RevisionVector baseBranchRevision)
            throws ConflictException, DocumentStoreException {
        // initially set the rollback to always fail until we have changes
        // in an oplog list and a commit root.
        rollback = Rollback.FAILED;

        // the value in _revisions.<revision> property of the commit root node
        // regular commits use "c", which makes the commit visible to
        // other readers. branch commits use the base revision to indicate
        // the visibility of the commit
        String commitValue = baseBranchRevision != null ? baseBranchRevision.getBranchRevision().toString() : "c";
        DocumentStore store = nodeStore.getDocumentStore();
        Path commitRootPath = null;
        if (baseBranchRevision != null) {
            // branch commits always use root node as commit root
            commitRootPath = Path.ROOT;
        }
        ArrayList<UpdateOp> changedNodes = new ArrayList<UpdateOp>();
        // operations are added to this list before they are executed,
        // so that all operations can be rolled back if there is a conflict
        ArrayList<UpdateOp> opLog = new ArrayList<UpdateOp>();

        // Compute the commit root
        for (Path p : operations.keySet()) {
            markChanged(p);
            if (commitRootPath == null) {
                commitRootPath = p;
            } else {
                while (!commitRootPath.isAncestorOf(p)) {
                    Path parent = commitRootPath.getParent();
                    if (parent == null) {
                        break;
                    }
                    commitRootPath = parent;
                }
            }
        }

        // adjust commit root when it falls on a bundled node
        commitRootPath = bundledNodes.getOrDefault(commitRootPath, commitRootPath);

        rollback = new Rollback(revision, opLog,
                Utils.getIdFromPath(commitRootPath),
                nodeStore.getCreateOrUpdateBatchSize());

        for (Path p : bundledNodes.keySet()){
            markChanged(p);
        }

        // push branch changes to journal
        if (baseBranchRevision != null) {
            // store as external change
            JournalEntry doc = JOURNAL.newDocument(store);
            doc.modified(modifiedNodes);
            Revision r = revision.asBranchRevision();
            store.create(JOURNAL, singletonList(doc.asUpdateOp(r)));
        }

        int commitRootDepth = commitRootPath.getDepth();
        // check if there are real changes on the commit root
        boolean commitRootHasChanges = operations.containsKey(commitRootPath);
        for (UpdateOp op : operations.values()) {
            NodeDocument.setCommitRoot(op, revision, commitRootDepth);
            changedNodes.add(op);
        }
        // create a "root of the commit" if there is none
        UpdateOp commitRoot = getUpdateOperationForNode(commitRootPath);

        boolean success = false;
        try {
            opLog.addAll(changedNodes);

            if (conditionalCommit(changedNodes, commitValue)) {
                success = true;
            } else {
                int batchSize = nodeStore.getCreateOrUpdateBatchSize();
                for (List<UpdateOp> updates : partition(changedNodes, batchSize)) {
                    List<NodeDocument> oldDocs = store.createOrUpdate(NODES, updates);
                    checkConflicts(oldDocs, updates);
                    checkSplitCandidate(oldDocs);
                }

                // finally write the commit root (the commit root might be written
                // twice, first to check if there was a conflict, and only then to
                // commit the revision, with the revision property set)
                NodeDocument.setRevision(commitRoot, revision, commitValue);
                if (commitRootHasChanges) {
                    // remove previously added commit root
                    NodeDocument.removeCommitRoot(commitRoot, revision);
                }
                opLog.add(commitRoot);
                if (baseBranchRevision == null) {
                    // create a clone of the commitRoot in order
                    // to set isNew to false. If we get here the
                    // commitRoot document already exists and
                    // only needs an update
                    UpdateOp commit = commitRoot.copy();
                    commit.setNew(false);
                    // only set revision on commit root when there is
                    // no collision for this commit revision
                    commit.containsMapEntry(COLLISIONS, revision, false);
                    NodeDocument before = nodeStore.updateCommitRoot(commit, revision);
                    if (before == null) {
                        String msg = "Conflicting concurrent change. " +
                                "Update operation failed: " + commit;
                        NodeDocument commitRootDoc = store.find(NODES, commit.getId());
                        if (commitRootDoc == null) {
                            throw new DocumentStoreException(msg);
                        } else {
                            throw new ConflictException(msg,
                                    commitRootDoc.getConflictsFor(
                                            Collections.singleton(revision)));
                        }
                    } else {
                        success = true;
                        // if we get here the commit was successful and
                        // the commit revision is set on the commitRoot
                        // document for this commit.
                        // now check for conflicts/collisions by other commits.
                        // use original commitRoot operation with
                        // correct isNew flag.
                        checkConflicts(commitRoot, before);
                        checkSplitCandidate(before);
                    }
                } else {
                    // this is a branch commit, do not fail on collisions now
                    // trying to merge the branch will fail later
                    createOrUpdateNode(store, commitRoot);
                }
            }
        } catch (Exception e) {
            // OAK-3084 do not roll back if already committed
            if (success) {
                LOG.error("Exception occurred after commit. Rollback will be suppressed.", e);
            } else {
                if (e instanceof ConflictException) {
                    throw e;
                } else {
                    throw DocumentStoreException.convert(e);
                }
            }
        } finally {
            if (success) {
                rollback = Rollback.NONE;
            }
        }
    }