in grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoCodecSession.groovy [101:268]
void flush(WriteConcern writeConcern) {
WriteConcern currentWriteConcern = this.getWriteConcern();
try {
this.writeConcern = writeConcern;
final Map<PersistentEntity, Collection<PendingUpdate>> pendingUpdates = getPendingUpdates();
final Map<PersistentEntity, Collection<PendingInsert>> pendingInserts = getPendingInserts();
final Map<PersistentEntity, Collection<PendingDelete>> pendingDeletes = getPendingDeletes();
if(pendingUpdates.isEmpty() && pendingInserts.isEmpty() && pendingDeletes.isEmpty()) {
return;
}
Map<String,Integer> numberOfOptimisticUpdates = [:].withDefault { 0 }
Map<String,Integer> numberOfPessimisticUpdates = [:].withDefault { 0 }
Map<PersistentEntity, List<WriteModel<Document>>> writeModels = [:]
for (PersistentEntity persistentEntity in pendingInserts.keySet()) {
final Collection<PendingInsert> inserts = pendingInserts[persistentEntity]
if(inserts) {
List<WriteModel<?>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels)
for (PendingInsert insert in inserts) {
insert.run()
if(insert.vetoed) continue
def object = insert.nativeEntry
entityWrites << new InsertOneModel<?>(object)
final List<PendingOperation> cascadeOperations = insert.cascadeOperations
addPostFlushOperations cascadeOperations
}
}
}
for (PersistentEntity persistentEntity in pendingUpdates.keySet()) {
final String name = persistentEntity.isRoot() ? persistentEntity.name : persistentEntity.rootEntity.name
final Collection<PendingUpdate> updates = pendingUpdates[persistentEntity]
if(updates) {
List<WriteModel<?>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
for (PendingUpdate update in updates) {
update.run()
if(update.vetoed) continue
DirtyCheckable changedObject = (DirtyCheckable) update.getNativeEntry()
PersistentEntityCodec codec = (PersistentEntityCodec)datastore.codecRegistry.get(changedObject.getClass())
final Object nativeKey = update.nativeKey
final Document id = new Document(MongoEntityPersister.MONGO_ID_FIELD, nativeKey)
EntityAccess entityAccess = update.entityAccess
boolean isVersioned = persistentEntity.isVersioned()
def currentVersion = null
if(isVersioned) {
currentVersion = entityAccess.getProperty( persistentEntity.version.name )
}
def updateDoc = codec.encodeUpdate(changedObject, entityAccess)
if(updateDoc) {
if(isVersioned) {
// if the entity is versioned we add to the query the current version
// if the query doesn't match a result this means the document has been updated by
// another thread and an optimistic locking exception should be thrown
if(currentVersion == null) {
currentVersion = entityAccess.getProperty( persistentEntity.version.name )
}
id[GormProperties.VERSION] = currentVersion
numberOfOptimisticUpdates[name]++
}
else {
numberOfPessimisticUpdates[name]++
}
final options = new UpdateOptions()
entityWrites << new UpdateOneModel<Document>(id, updateDoc, options.upsert(false))
final List cascadeOperations = update.cascadeOperations
addPostFlushOperations cascadeOperations
}
}
}
}
for (PersistentEntity persistentEntity in pendingDeletes.keySet()) {
final Collection<PendingDelete> deletes = pendingDeletes[persistentEntity]
if(deletes) {
List<WriteModel<?>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels)
List<Object> nativeKeys = []
for (PendingDelete delete in deletes) {
delete.run()
if(delete.vetoed) continue
final Object k = delete.nativeKey
if(k) {
nativeKeys << k
final List cascadeOperations = delete.cascadeOperations
addPostFlushOperations cascadeOperations
}
}
if(nativeKeys.size() == 1) {
entityWrites << new DeleteOneModel<Document>(new Document( MongoEntityPersister.MONGO_ID_FIELD, nativeKeys.get(0)))
}
else {
entityWrites << new DeleteManyModel<Document>(new Document( MongoEntityPersister.MONGO_ID_FIELD, new Document(BsonQuery.IN_OPERATOR, nativeKeys)))
}
}
}
for (PersistentEntity persistentEntity : writeModels.keySet()) {
MongoCollection collection = getCollection(persistentEntity)
.withDocumentClass(persistentEntity.javaClass)
WriteConcern wc = writeConcern
if(wc == null) {
org.grails.datastore.mapping.mongo.config.MongoCollection mapping = (org.grails.datastore.mapping.mongo.config.MongoCollection)persistentEntity.mapping.mappedForm
wc = mapping.writeConcern
}
if(wc != null) {
collection = collection.withWriteConcern(wc)
}
else {
wc = collection.writeConcern
}
final List<WriteModel<?>> writes = writeModels[persistentEntity]
if(writes) {
final BulkWriteResult bulkWriteResult = collection
.bulkWrite(writes)
final boolean isAcknowledged = wc.isAcknowledged()
if( !bulkWriteResult.wasAcknowledged() && isAcknowledged) {
errorOccured = true;
throw new DataIntegrityViolationException("Write operation was not acknowledged");
}
else if(isAcknowledged) {
final int matchedCount = bulkWriteResult.matchedCount
final String name = persistentEntity.name
final Integer numOptimistic = numberOfOptimisticUpdates[name]
final Integer numPessimistic = numberOfPessimisticUpdates[name]
if((matchedCount - numPessimistic) != numOptimistic) {
setFlushMode(FlushModeType.COMMIT)
throw new OptimisticLockingException(persistentEntity, null)
}
}
}
}
for (Runnable postFlushOperation : postFlushOperations) {
postFlushOperation.run();
}
} finally {
clearPendingOperations();
postFlushOperations.clear();
firstLevelCollectionCache.clear();
this.writeConcern = currentWriteConcern;
}
}