public void flush()

in grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoSession.java [79:231]


    public void flush(WriteConcern writeConcern) {
        WriteConcern currentWriteConcern = this.getWriteConcern();
        try {
            this.writeConcern = writeConcern;
            final Map<PersistentEntity, Collection<PendingUpdate>> pendingUpdates = getPendingUpdates();
            final Map<PersistentEntity, Collection<PendingInsert>> pendingInserts = getPendingInserts();
            final Map<PersistentEntity, Collection<PendingDelete>> pendingDeletes = getPendingDeletes();

            if(pendingUpdates.isEmpty() && pendingInserts.isEmpty() && pendingDeletes.isEmpty()) {
                return;
            }


            Map<String,Integer> numberOfOptimisticUpdates = new LinkedHashMap<String, Integer>();
            Map<String,Integer> numberOfPessimisticUpdates = new LinkedHashMap<String, Integer>();

            Map<PersistentEntity, List<WriteModel<Document>>> writeModels = new LinkedHashMap<PersistentEntity, List<WriteModel<Document>>>();
            for (PersistentEntity persistentEntity : pendingInserts.keySet()) {
                final Collection<PendingInsert> inserts = pendingInserts.get(persistentEntity);
                if(inserts != null && !inserts.isEmpty()) {
                    List<WriteModel<Document>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
                    for (PendingInsert insert : inserts) {
                        insert.run();

                        if(insert.isVetoed()) continue;

                        entityWrites.add(new InsertOneModel<Document>((Document) insert.getNativeEntry()));

                        final List<PendingOperation> cascadeOperations = insert.getCascadeOperations();
                        addPostFlushOperations(cascadeOperations);
                    }
                }
            }


            for (PersistentEntity persistentEntity : pendingUpdates.keySet()) {

                final String name = persistentEntity.isRoot() ? persistentEntity.getName() : persistentEntity.getRootEntity().getName();
                int numberOfOptimistic = numberOfOptimisticUpdates.containsKey(name) ? numberOfOptimisticUpdates.get(name) : 0;
                int numberOfPessimistic = numberOfPessimisticUpdates.containsKey(name) ? numberOfPessimisticUpdates.get(name) : 0;

                final Collection<PendingUpdate> updates = pendingUpdates.get(persistentEntity);
                if(updates != null && !updates.isEmpty()) {
                    List<WriteModel<Document>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
                    for (PendingUpdate update : updates) {
                        update.run();

                        if(update.isVetoed()) continue;

                        Document updateDoc = (Document) update.getNativeEntry();
                        updateDoc.remove(MongoConstants.MONGO_ID_FIELD);
                        updateDoc = createSetAndUnsetDoc(updateDoc);
                        final Object nativeKey = update.getNativeKey();
                        final Document id = new Document(MongoConstants.MONGO_ID_FIELD, nativeKey);
                        MongoEntityPersister documentEntityPersister = (MongoEntityPersister) getPersister(persistentEntity);
                        final EntityAccess entityAccess = update.getEntityAccess();
                        if(documentEntityPersister.isVersioned(entityAccess)) {
                            Object currentVersion = documentEntityPersister.getCurrentVersion(entityAccess);
                            documentEntityPersister.incrementVersion(entityAccess);

                            // if the entity is versioned we add to the query the current version
                            // if the query doesn't match a result this means the document has been updated by
                            // another thread an an optimistic locking exception should be thrown
                            id.put(GormProperties.VERSION, currentVersion);
                            numberOfOptimistic++;
                        }
                        else {
                            numberOfPessimistic++;
                        }
                        final UpdateOptions options = new UpdateOptions();

                        entityWrites.add(new UpdateOneModel<Document>(id, updateDoc, options.upsert(false)));

                        final List cascadeOperations = update.getCascadeOperations();
                        addPostFlushOperations(cascadeOperations);
                    }
                }
                numberOfOptimisticUpdates.put(name, numberOfOptimistic);
                numberOfPessimisticUpdates.put(name, numberOfPessimistic);

            }


            for (PersistentEntity persistentEntity : pendingDeletes.keySet()) {
                final Collection<PendingDelete> deletes = pendingDeletes.get(persistentEntity);
                if(deletes != null && !deletes.isEmpty()) {
                    List<WriteModel<Document>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
                    List<Object> nativeKeys = new ArrayList<Object>();
                    for (PendingDelete delete : deletes) {
                        delete.run();

                        if(delete.isVetoed()) continue;

                        final Object k = delete.getNativeKey();
                        if(k != null) {
                            if(k instanceof Document) {
                                entityWrites.add(new DeleteManyModel<Document>((Document)k));
                            }
                            else {
                                nativeKeys.add(k);
                            }
                        }

                        final List cascadeOperations = delete.getCascadeOperations();
                        addPostFlushOperations(cascadeOperations);
                    }
                    entityWrites.add(new DeleteManyModel<Document>(new Document( MongoConstants.MONGO_ID_FIELD, new Document(BsonQuery.IN_OPERATOR, nativeKeys))));
                }
            }


            for (PersistentEntity persistentEntity : writeModels.keySet()) {
                com.mongodb.client.MongoCollection collection = getCollection(persistentEntity);
                final WriteConcern wc = getWriteConcern();
                if(wc != null) {
                    collection = collection.withWriteConcern(wc);
                }
                final List<WriteModel<Document>> writes = writeModels.get(persistentEntity);
                if(!writes.isEmpty()) {

                    final com.mongodb.bulk.BulkWriteResult bulkWriteResult = collection
                            .bulkWrite(writes);

                    if( !bulkWriteResult.wasAcknowledged() ) {
                        errorOccured = true;
                        throw new DataIntegrityViolationException("Write operation was not acknowledged");
                    }
                    else {
                        final int matchedCount = bulkWriteResult.getMatchedCount();
                        final String name = persistentEntity.getName();
                        final Integer numOptimistic = numberOfOptimisticUpdates.get(name);
                        final Integer numPessimistic = numberOfPessimisticUpdates.get(name);
                        final int no = numOptimistic != null ? numOptimistic : 0;
                        final int pe = numPessimistic != null ? numPessimistic : 0;
                        if((matchedCount - pe) != no) {
                            setFlushMode(FlushModeType.COMMIT);
                            throw new OptimisticLockingException(persistentEntity, null);
                        }
                    }
                }
            }

            for (Runnable postFlushOperation : postFlushOperations) {
                postFlushOperation.run();
            }
        } finally {
            clearPendingOperations();
            postFlushOperations.clear();
            firstLevelCollectionCache.clear();
            this.writeConcern = currentWriteConcern;
        }

    }