in grails-data-mongodb/core/src/main/groovy/org/grails/datastore/mapping/mongo/MongoSession.java [79:231]
public void flush(WriteConcern writeConcern) {
WriteConcern currentWriteConcern = this.getWriteConcern();
try {
this.writeConcern = writeConcern;
final Map<PersistentEntity, Collection<PendingUpdate>> pendingUpdates = getPendingUpdates();
final Map<PersistentEntity, Collection<PendingInsert>> pendingInserts = getPendingInserts();
final Map<PersistentEntity, Collection<PendingDelete>> pendingDeletes = getPendingDeletes();
if(pendingUpdates.isEmpty() && pendingInserts.isEmpty() && pendingDeletes.isEmpty()) {
return;
}
Map<String,Integer> numberOfOptimisticUpdates = new LinkedHashMap<String, Integer>();
Map<String,Integer> numberOfPessimisticUpdates = new LinkedHashMap<String, Integer>();
Map<PersistentEntity, List<WriteModel<Document>>> writeModels = new LinkedHashMap<PersistentEntity, List<WriteModel<Document>>>();
for (PersistentEntity persistentEntity : pendingInserts.keySet()) {
final Collection<PendingInsert> inserts = pendingInserts.get(persistentEntity);
if(inserts != null && !inserts.isEmpty()) {
List<WriteModel<Document>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
for (PendingInsert insert : inserts) {
insert.run();
if(insert.isVetoed()) continue;
entityWrites.add(new InsertOneModel<Document>((Document) insert.getNativeEntry()));
final List<PendingOperation> cascadeOperations = insert.getCascadeOperations();
addPostFlushOperations(cascadeOperations);
}
}
}
for (PersistentEntity persistentEntity : pendingUpdates.keySet()) {
final String name = persistentEntity.isRoot() ? persistentEntity.getName() : persistentEntity.getRootEntity().getName();
int numberOfOptimistic = numberOfOptimisticUpdates.containsKey(name) ? numberOfOptimisticUpdates.get(name) : 0;
int numberOfPessimistic = numberOfPessimisticUpdates.containsKey(name) ? numberOfPessimisticUpdates.get(name) : 0;
final Collection<PendingUpdate> updates = pendingUpdates.get(persistentEntity);
if(updates != null && !updates.isEmpty()) {
List<WriteModel<Document>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
for (PendingUpdate update : updates) {
update.run();
if(update.isVetoed()) continue;
Document updateDoc = (Document) update.getNativeEntry();
updateDoc.remove(MongoConstants.MONGO_ID_FIELD);
updateDoc = createSetAndUnsetDoc(updateDoc);
final Object nativeKey = update.getNativeKey();
final Document id = new Document(MongoConstants.MONGO_ID_FIELD, nativeKey);
MongoEntityPersister documentEntityPersister = (MongoEntityPersister) getPersister(persistentEntity);
final EntityAccess entityAccess = update.getEntityAccess();
if(documentEntityPersister.isVersioned(entityAccess)) {
Object currentVersion = documentEntityPersister.getCurrentVersion(entityAccess);
documentEntityPersister.incrementVersion(entityAccess);
// if the entity is versioned we add to the query the current version
// if the query doesn't match a result this means the document has been updated by
// another thread an an optimistic locking exception should be thrown
id.put(GormProperties.VERSION, currentVersion);
numberOfOptimistic++;
}
else {
numberOfPessimistic++;
}
final UpdateOptions options = new UpdateOptions();
entityWrites.add(new UpdateOneModel<Document>(id, updateDoc, options.upsert(false)));
final List cascadeOperations = update.getCascadeOperations();
addPostFlushOperations(cascadeOperations);
}
}
numberOfOptimisticUpdates.put(name, numberOfOptimistic);
numberOfPessimisticUpdates.put(name, numberOfPessimistic);
}
for (PersistentEntity persistentEntity : pendingDeletes.keySet()) {
final Collection<PendingDelete> deletes = pendingDeletes.get(persistentEntity);
if(deletes != null && !deletes.isEmpty()) {
List<WriteModel<Document>> entityWrites = getWriteModelsForEntity(persistentEntity, writeModels);
List<Object> nativeKeys = new ArrayList<Object>();
for (PendingDelete delete : deletes) {
delete.run();
if(delete.isVetoed()) continue;
final Object k = delete.getNativeKey();
if(k != null) {
if(k instanceof Document) {
entityWrites.add(new DeleteManyModel<Document>((Document)k));
}
else {
nativeKeys.add(k);
}
}
final List cascadeOperations = delete.getCascadeOperations();
addPostFlushOperations(cascadeOperations);
}
entityWrites.add(new DeleteManyModel<Document>(new Document( MongoConstants.MONGO_ID_FIELD, new Document(BsonQuery.IN_OPERATOR, nativeKeys))));
}
}
for (PersistentEntity persistentEntity : writeModels.keySet()) {
com.mongodb.client.MongoCollection collection = getCollection(persistentEntity);
final WriteConcern wc = getWriteConcern();
if(wc != null) {
collection = collection.withWriteConcern(wc);
}
final List<WriteModel<Document>> writes = writeModels.get(persistentEntity);
if(!writes.isEmpty()) {
final com.mongodb.bulk.BulkWriteResult bulkWriteResult = collection
.bulkWrite(writes);
if( !bulkWriteResult.wasAcknowledged() ) {
errorOccured = true;
throw new DataIntegrityViolationException("Write operation was not acknowledged");
}
else {
final int matchedCount = bulkWriteResult.getMatchedCount();
final String name = persistentEntity.getName();
final Integer numOptimistic = numberOfOptimisticUpdates.get(name);
final Integer numPessimistic = numberOfPessimisticUpdates.get(name);
final int no = numOptimistic != null ? numOptimistic : 0;
final int pe = numPessimistic != null ? numPessimistic : 0;
if((matchedCount - pe) != no) {
setFlushMode(FlushModeType.COMMIT);
throw new OptimisticLockingException(persistentEntity, null);
}
}
}
}
for (Runnable postFlushOperation : postFlushOperations) {
postFlushOperation.run();
}
} finally {
clearPendingOperations();
postFlushOperations.clear();
firstLevelCollectionCache.clear();
this.writeConcern = currentWriteConcern;
}
}