modules/backend/services/clusters.js (168 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ 'use strict'; const _ = require('lodash'); // Fire me up! module.exports = { implements: 'services/clusters', inject: ['mongo', 'services/spaces', 'services/caches', 'services/domains', 'services/igfss', 'errors'] }; /** * @param mongo * @param {SpacesService} spacesService * @param {CachesService} cachesService * @param {DomainsService} modelsService * @param {IgfssService} igfssService * @param errors * @returns {ClustersService} */ module.exports.factory = (mongo, spacesService, cachesService, modelsService, igfssService, errors) => { /** * Convert remove status operation to own presentation. * * @param {RemoveResult} result - The results of remove operation. */ const convertRemoveStatus = (result) => ({rowsAffected: result.n}); /** * Update existing cluster. * * @param {Object} cluster - The cluster for updating. * @returns {Promise.<mongo.ObjectId>} that resolves cluster id. */ const update = (cluster) => { const clusterId = cluster._id; return mongo.Cluster.updateOne({_id: clusterId}, cluster, {upsert: true}).exec() .then(() => mongo.Cache.updateMany({_id: {$in: cluster.caches}}, {$addToSet: {clusters: clusterId}}).exec()) .then(() => mongo.Cache.updateMany({_id: {$nin: cluster.caches}}, {$pull: {clusters: clusterId}}).exec()) .then(() => mongo.Igfs.updateMany({_id: {$in: cluster.igfss}}, {$addToSet: {clusters: clusterId}}).exec()) .then(() => mongo.Igfs.updateMany({_id: {$nin: cluster.igfss}}, {$pull: {clusters: clusterId}}).exec()) .then(() => cluster) .catch((err) => { if (err.code === mongo.errCodes.DUPLICATE_KEY_UPDATE_ERROR || err.code === mongo.errCodes.DUPLICATE_KEY_ERROR) throw new errors.DuplicateKeyException('Cluster with name: "' + cluster.name + '" already exist.'); else throw err; }); }; /** * Create new cluster. * * @param {Object} cluster - The cluster for creation. * @returns {Promise.<mongo.ObjectId>} that resolves cluster id. */ const create = (cluster) => { return mongo.Cluster.create(cluster) .catch((err) => { if (err.code === mongo.errCodes.DUPLICATE_KEY_ERROR) throw new errors.DuplicateKeyException(`Cluster with name: "${cluster.name}" already exist.`); else throw err; }) .then((savedCluster) => mongo.Cache.updateMany({_id: {$in: savedCluster.caches}}, {$addToSet: {clusters: savedCluster._id}}).exec() .then(() => mongo.Igfs.updateMany({_id: {$in: savedCluster.igfss}}, {$addToSet: {clusters: savedCluster._id}}).exec()) .then(() => savedCluster) ); }; /** * Remove all caches by space ids. * * @param {Number[]} spaceIds - The space ids for cache deletion. * @returns {Promise.<RemoveResult>} - that resolves results of remove operation. */ const removeAllBySpaces = (spaceIds) => { return Promise.all([ mongo.DomainModel.deleteMany({space: {$in: spaceIds}}).exec(), mongo.Cache.deleteMany({space: {$in: spaceIds}}).exec(), mongo.Igfs.deleteMany({space: {$in: spaceIds}}).exec() ]) .then(() => mongo.Cluster.deleteMany({space: {$in: spaceIds}}).exec()); }; class ClustersService { static shortList(userId, demo) { return spacesService.spaceIds(userId, demo) .then((spaceIds) => mongo.Cluster.find({space: {$in: spaceIds}}).select('name discovery.kind caches models igfss').lean().exec()) .then((clusters) => _.map(clusters, (cluster) => ({ _id: cluster._id, name: cluster.name, discovery: cluster.discovery.kind, cachesCount: _.size(cluster.caches), modelsCount: _.size(cluster.models), igfsCount: _.size(cluster.igfss) }))); } static get(userId, demo, _id) { return spacesService.spaceIds(userId, demo) .then((spaceIds) => mongo.Cluster.findOne({space: {$in: spaceIds}, _id}).lean().exec()); } static normalize(spaceId, cluster, ...models) { cluster.space = spaceId; _.forEach(models, (model) => { _.forEach(model, (item) => { item.space = spaceId; item.clusters = [cluster._id]; }); }); } static removedInCluster(oldCluster, newCluster, field) { return _.difference(_.invokeMap(_.get(oldCluster, field), 'toString'), _.get(newCluster, field)); } static upsertBasic(userId, demo, {cluster, caches}) { if (_.isNil(cluster._id)) return Promise.reject(new errors.IllegalArgumentException('Cluster id can not be undefined or null')); return spacesService.spaceIds(userId, demo) .then((spaceIds) => { this.normalize(_.head(spaceIds), cluster, caches); const query = _.pick(cluster, ['space', '_id']); const basicCluster = _.pick(cluster, [ 'space', '_id', 'name', 'discovery', 'caches', 'memoryConfiguration.memoryPolicies', 'dataStorageConfiguration.defaultDataRegionConfiguration.maxSize' ]); return mongo.Cluster.findOneAndUpdate(query, {$set: basicCluster}, {projection: 'caches', upsert: true}).lean().exec() .catch((err) => { if (err.code === mongo.errCodes.DUPLICATE_KEY_ERROR) throw new errors.DuplicateKeyException(`Cluster with name: "${cluster.name}" already exist.`); throw err; }) .then((oldCluster) => { if (oldCluster) { const ids = this.removedInCluster(oldCluster, cluster, 'caches'); return cachesService.remove(ids); } cluster.caches = _.map(caches, '_id'); return mongo.Cluster.updateOne(query, {$set: cluster, new: true}, {upsert: true}).exec(); }); }) .then(() => _.map(caches, cachesService.upsertBasic)) .then(() => ({rowsAffected: 1})); } static upsert(userId, demo, {cluster, caches, models, igfss}) { if (_.isNil(cluster._id)) return Promise.reject(new errors.IllegalArgumentException('Cluster id can not be undefined or null')); return spacesService.spaceIds(userId, demo) .then((spaceIds) => { this.normalize(_.head(spaceIds), cluster, caches, models, igfss); const query = _.pick(cluster, ['space', '_id']); return mongo.Cluster.findOneAndUpdate(query, {$set: cluster}, {projection: {models: 1, caches: 1, igfss: 1}, upsert: true}).lean().exec() .catch((err) => { if (err.code === mongo.errCodes.DUPLICATE_KEY_ERROR) throw new errors.DuplicateKeyException(`Cluster with name: "${cluster.name}" already exist.`); throw err; }) .then((oldCluster) => { const modelIds = this.removedInCluster(oldCluster, cluster, 'models'); const cacheIds = this.removedInCluster(oldCluster, cluster, 'caches'); const igfsIds = this.removedInCluster(oldCluster, cluster, 'igfss'); return Promise.all([modelsService.remove(modelIds), cachesService.remove(cacheIds), igfssService.remove(igfsIds)]); }); }) .then(() => Promise.all(_.concat(_.map(models, modelsService.upsert), _.map(caches, cachesService.upsert), _.map(igfss, igfssService.upsert)))) .then(() => ({rowsAffected: 1})); } /** * Create or update cluster. * * @param {Object} cluster - The cluster. * @returns {Promise.<mongo.ObjectId>} that resolves cluster id of merge operation. */ static merge(cluster) { if (cluster._id) return update(cluster); return create(cluster); } /** * Get clusters and linked objects by space. * * @param {mongo.ObjectId|String} spaceIds The spaces ids that own clusters. * @returns {Promise.<Array<mongo.Cluster>>} Requested clusters. */ static listBySpaces(spaceIds) { return mongo.Cluster.find({space: {$in: spaceIds}}).sort('name').lean().exec(); } /** * Remove clusters. * * @param {Array.<String>|String} ids - The cluster ids for remove. * @returns {Promise.<{rowsAffected}>} - The number of affected rows. */ static remove(ids) { if (_.isNil(ids)) return Promise.reject(new errors.IllegalArgumentException('Cluster id can not be undefined or null')); if (_.isEmpty(ids)) return Promise.resolve({rowsAffected: 0}); ids = _.castArray(ids); return Promise.all(_.map(ids, (id) => { return mongo.Cluster.findByIdAndRemove(id).exec() .then((cluster) => { if (_.isNil(cluster)) return 0; return Promise.all([ mongo.DomainModel.deleteMany({_id: {$in: cluster.models}}).exec(), mongo.Cache.deleteMany({_id: {$in: cluster.caches}}).exec(), mongo.Igfs.deleteMany({_id: {$in: cluster.igfss}}).exec() ]) .then(() => 1); }); })) .then((res) => ({rowsAffected: _.sum(res)})); } /** * Remove all clusters by user. * @param {mongo.ObjectId|String} userId - The user id that own cluster. * @param {Boolean} demo - The flag indicates that need lookup in demo space. * @returns {Promise.<{rowsAffected}>} - The number of affected rows. */ static removeAll(userId, demo) { return spacesService.spaceIds(userId, demo) .then(removeAllBySpaces) .then(convertRemoveStatus); } } return ClustersService; };