void flush()

in grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoCodecSession.groovy [101:268]


    void flush(WriteConcern writeConcern) {
        WriteConcern currentWriteConcern = this.getWriteConcern();
        try {
            this.writeConcern = writeConcern;
            final Map<PersistentEntity, Collection<PendingUpdate>> pendingUpdates = getPendingUpdates();
            final Map<PersistentEntity, Collection<PendingInsert>> pendingInserts = getPendingInserts();
            final Map<PersistentEntity, Collection<PendingDelete>> pendingDeletes = getPendingDeletes();

            if(pendingUpdates.isEmpty() && pendingInserts.isEmpty() && pendingDeletes.isEmpty()) {
                return;
            }


            Map<String,Integer> numberOfOptimisticUpdates = [:].withDefault { 0 }
            Map<String,Integer> numberOfPessimisticUpdates = [:].withDefault { 0 }

            Map<PersistentEntity, List<WriteModel<Document>>> writeModels = [:]
            for (PersistentEntity persistentEntity in pendingInserts.keySet()) {
                final Collection<PendingInsert> inserts = pendingInserts[persistentEntity]
                if(inserts) {
                    List<WriteModel<?>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels)
                    for (PendingInsert insert in inserts) {
                        insert.run()

                        if(insert.vetoed) continue


                        def object = insert.nativeEntry
                        entityWrites << new InsertOneModel<?>(object)

                        final List<PendingOperation> cascadeOperations = insert.cascadeOperations
                        addPostFlushOperations cascadeOperations
                    }
                }
            }


            for (PersistentEntity persistentEntity in pendingUpdates.keySet()) {

                final String name = persistentEntity.isRoot() ? persistentEntity.name : persistentEntity.rootEntity.name

                final Collection<PendingUpdate> updates = pendingUpdates[persistentEntity]
                if(updates) {
                    List<WriteModel<?>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
                    for (PendingUpdate update in updates) {
                        update.run()

                        if(update.vetoed) continue

                        DirtyCheckable changedObject = (DirtyCheckable) update.getNativeEntry()
                        PersistentEntityCodec codec = (PersistentEntityCodec)datastore.codecRegistry.get(changedObject.getClass())

                        final Object nativeKey = update.nativeKey
                        final Document id = new Document(MongoEntityPersister.MONGO_ID_FIELD, nativeKey)

                        EntityAccess entityAccess = update.entityAccess
                        boolean isVersioned = persistentEntity.isVersioned()
                        def currentVersion = null
                        if(isVersioned) {
                            currentVersion = entityAccess.getProperty( persistentEntity.version.name )
                        }
                        def updateDoc = codec.encodeUpdate(changedObject, entityAccess)

                        if(updateDoc) {

                            if(isVersioned) {
                                // if the entity is versioned we add to the query the current version
                                // if the query doesn't match a result this means the document has been updated by
                                // another thread and an optimistic locking exception should be thrown
                                if(currentVersion == null) {
                                    currentVersion = entityAccess.getProperty( persistentEntity.version.name )
                                }
                                id[GormProperties.VERSION] = currentVersion
                                numberOfOptimisticUpdates[name]++
                            }
                            else {
                                numberOfPessimisticUpdates[name]++
                            }
                            final options = new UpdateOptions()

                            entityWrites << new UpdateOneModel<Document>(id, updateDoc, options.upsert(false))

                            final List cascadeOperations = update.cascadeOperations
                            addPostFlushOperations cascadeOperations
                        }

                    }
                }
            }


            for (PersistentEntity persistentEntity in pendingDeletes.keySet()) {
                final Collection<PendingDelete> deletes = pendingDeletes[persistentEntity]
                if(deletes) {
                    List<WriteModel<?>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels)
                    List<Object> nativeKeys = []
                    for (PendingDelete delete in deletes) {
                        delete.run()

                        if(delete.vetoed) continue

                        final Object k = delete.nativeKey
                        if(k) {
                            nativeKeys << k
                            final List cascadeOperations = delete.cascadeOperations
                            addPostFlushOperations cascadeOperations
                        }

                    }
                    if(nativeKeys.size() == 1) {
                        entityWrites << new DeleteOneModel<Document>(new Document( MongoEntityPersister.MONGO_ID_FIELD, nativeKeys.get(0)))
                    }
                    else {
                        entityWrites << new DeleteManyModel<Document>(new Document( MongoEntityPersister.MONGO_ID_FIELD, new Document(BsonQuery.IN_OPERATOR, nativeKeys)))
                    }
                }
            }


            for (PersistentEntity persistentEntity : writeModels.keySet()) {
                MongoCollection collection = getCollection(persistentEntity)
                                                .withDocumentClass(persistentEntity.javaClass)

                WriteConcern wc = writeConcern
                if(wc == null) {
                    org.grails.datastore.mapping.mongo.config.MongoCollection mapping = (org.grails.datastore.mapping.mongo.config.MongoCollection)persistentEntity.mapping.mappedForm
                    wc = mapping.writeConcern
                }
                if(wc != null) {
                    collection = collection.withWriteConcern(wc)
                }
                else {
                    wc = collection.writeConcern
                }
                final List<WriteModel<?>> writes = writeModels[persistentEntity]
                if(writes) {

                    final BulkWriteResult bulkWriteResult = collection
                                                                .bulkWrite(writes)

                    final boolean isAcknowledged = wc.isAcknowledged()
                    if( !bulkWriteResult.wasAcknowledged() && isAcknowledged) {
                        errorOccured = true;
                        throw new DataIntegrityViolationException("Write operation was not acknowledged");
                    }
                    else if(isAcknowledged) {
                        final int matchedCount = bulkWriteResult.matchedCount
                        final String name = persistentEntity.name
                        final Integer numOptimistic = numberOfOptimisticUpdates[name]
                        final Integer numPessimistic = numberOfPessimisticUpdates[name]
                        if((matchedCount - numPessimistic) != numOptimistic) {
                            setFlushMode(FlushModeType.COMMIT)
                            throw new OptimisticLockingException(persistentEntity, null)
                        }
                    }
                }
            }

            for (Runnable postFlushOperation : postFlushOperations) {
                postFlushOperation.run();
            }
        } finally {
            clearPendingOperations();
            postFlushOperations.clear();
            firstLevelCollectionCache.clear();
            this.writeConcern = currentWriteConcern;
        }
    }