modules/backend/migrations/1516948939797-migrate-configs.js (288 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ const _ = require('lodash'); const log = require('./migration-utils').log; const error = require('./migration-utils').error; const getClusterForMigration = require('./migration-utils').getClusterForMigration; const getCacheForMigration = require('./migration-utils').getCacheForMigration; const _debug = false; const DUPLICATE_KEY_ERROR = 11000; let dup = 1; function makeDup(name) { return name + `_dup_${dup++}`; } function linkCacheToCluster(clustersModel, cluster, cachesModel, cache, domainsModel) { return clustersModel.updateOne({_id: cluster._id}, {$addToSet: {caches: cache._id}}).exec() .then(() => cachesModel.updateOne({_id: cache._id}, {clusters: [cluster._id]}).exec()) .then(() => { if (_.isEmpty(cache.domains)) return Promise.resolve(); return _.reduce(cache.domains, (start, domain) => start.then(() => { return domainsModel.updateOne({_id: domain}, {clusters: [cluster._id]}).exec() .then(() => clustersModel.updateOne({_id: cluster._id}, {$addToSet: {models: domain}}).exec()); }), Promise.resolve()); }) .catch((err) => error(`Failed link cache to cluster [cache=${cache.name}, cluster=${cluster.name}]`, err)); } function cloneCache(clustersModel, cachesModel, domainsModel, cache) { const cacheId = cache._id; const clusters = cache.clusters; cache.clusters = []; if (cache.cacheStoreFactory && cache.cacheStoreFactory.kind === null) delete cache.cacheStoreFactory.kind; return _.reduce(clusters, (start, cluster, idx) => start.then(() => { if (idx > 0) { delete cache._id; const newCache = _.clone(cache); const domainIds = newCache.domains; newCache.clusters = [cluster]; newCache.domains = []; return clustersModel.updateMany({_id: {$in: newCache.clusters}}, {$pull: {caches: cacheId}}).exec() .then(() => cachesModel.create(newCache)) .catch((err) => { if (err.code === DUPLICATE_KEY_ERROR) { const retryWith = makeDup(newCache.name); error(`Failed to clone cache, will change cache name and retry [cache=${newCache.name}, retryWith=${retryWith}]`); newCache.name = retryWith; return cachesModel.create(newCache); } return Promise.reject(err); }) .then((clone) => clustersModel.updateMany({_id: {$in: newCache.clusters}}, {$addToSet: {caches: clone._id}}).exec() .then(() => clone)) .then((clone) => { if (_.isEmpty(domainIds)) return Promise.resolve(); return _.reduce(domainIds, (start, domainId) => start.then(() => { return domainsModel.findOne({_id: domainId}).lean().exec() .then((domain) => { delete domain._id; const newDomain = _.clone(domain); newDomain.caches = [clone._id]; newDomain.clusters = [cluster]; return domainsModel.create(newDomain) .catch((err) => { if (err.code === DUPLICATE_KEY_ERROR) { const retryWith = makeDup(newDomain.valueType); error(`Failed to clone domain, will change type name and retry [cache=${newCache.name}, valueType=${newDomain.valueType}, retryWith=${retryWith}]`); newDomain.valueType = retryWith; return domainsModel.create(newDomain); } }) .then((createdDomain) => { return clustersModel.updateOne({_id: cluster}, {$addToSet: {models: createdDomain._id}}).exec() .then(() => cachesModel.updateOne({_id: clone.id}, {$addToSet: {domains: createdDomain._id}})); }) .catch((err) => error('Failed to clone domain during cache clone', err)); }) .catch((err) => error(`Failed to duplicate domain model[domain=${domainId}], cache=${clone.name}]`, err)); }), Promise.resolve()); }) .catch((err) => error(`Failed to clone cache[id=${cacheId}, name=${cache.name}]`, err)); } return cachesModel.updateOne({_id: cacheId}, {clusters: [cluster]}).exec() .then(() => clustersModel.updateOne({_id: cluster}, {$addToSet: {models: {$each: cache.domains}}}).exec()); }), Promise.resolve()); } function migrateCache(clustersModel, cachesModel, domainsModel, cache) { const clustersCnt = _.size(cache.clusters); if (clustersCnt < 1) { if (_debug) log(`Found cache not linked to cluster [cache=${cache.name}]`); return getClusterForMigration(clustersModel, cache.space) .then((clusterLostFound) => linkCacheToCluster(clustersModel, clusterLostFound, cachesModel, cache, domainsModel)); } if (clustersCnt > 1) { if (_debug) log(`Found cache linked to many clusters [cache=${cache.name}, clustersCnt=${clustersCnt}]`); return cloneCache(clustersModel, cachesModel, domainsModel, cache); } // Nothing to migrate, cache linked to cluster 1-to-1. return Promise.resolve(); } function migrateCaches(clustersModel, cachesModel, domainsModel) { return cachesModel.find({}).lean().exec() .then((caches) => { const cachesCnt = _.size(caches); if (cachesCnt > 0) { log(`Caches to migrate: ${cachesCnt}`); return _.reduce(caches, (start, cache) => start.then(() => migrateCache(clustersModel, cachesModel, domainsModel, cache)), Promise.resolve()) .then(() => log('Caches migration finished.')); } return Promise.resolve(); }) .catch((err) => error('Caches migration failed', err)); } function linkIgfsToCluster(clustersModel, cluster, igfsModel, igfs) { return clustersModel.updateOne({_id: cluster._id}, {$addToSet: {igfss: igfs._id}}).exec() .then(() => igfsModel.updateOne({_id: igfs._id}, {clusters: [cluster._id]}).exec()) .catch((err) => error(`Failed link IGFS to cluster [IGFS=${igfs.name}, cluster=${cluster.name}]`, err)); } function cloneIgfs(clustersModel, igfsModel, igfs) { const igfsId = igfs._id; const clusters = igfs.clusters; delete igfs._id; igfs.clusters = []; return _.reduce(clusters, (start, cluster, idx) => start.then(() => { const newIgfs = _.clone(igfs); newIgfs.clusters = [cluster]; if (idx > 0) { return clustersModel.updateMany({_id: {$in: newIgfs.clusters}}, {$pull: {igfss: igfsId}}).exec() .then(() => igfsModel.create(newIgfs)) .then((clone) => clustersModel.updateMany({_id: {$in: newIgfs.clusters}}, {$addToSet: {igfss: clone._id}}).exec()) .catch((err) => error(`Failed to clone IGFS: id=${igfsId}, name=${igfs.name}]`, err)); } return igfsModel.updateOne({_id: igfsId}, {clusters: [cluster]}).exec(); }), Promise.resolve()); } function migrateIgfs(clustersModel, igfsModel, igfs) { const clustersCnt = _.size(igfs.clusters); if (clustersCnt < 1) { if (_debug) log(`Found IGFS not linked to cluster [IGFS=${igfs.name}]`); return getClusterForMigration(clustersModel, igfs.space) .then((clusterLostFound) => linkIgfsToCluster(clustersModel, clusterLostFound, igfsModel, igfs)); } if (clustersCnt > 1) { if (_debug) log(`Found IGFS linked to many clusters [IGFS=${igfs.name}, clustersCnt=${clustersCnt}]`); return cloneIgfs(clustersModel, igfsModel, igfs); } // Nothing to migrate, IGFS linked to cluster 1-to-1. return Promise.resolve(); } function migrateIgfss(clustersModel, igfsModel) { return igfsModel.find({}).lean().exec() .then((igfss) => { const igfsCnt = _.size(igfss); if (igfsCnt > 0) { log(`IGFS to migrate: ${igfsCnt}`); return _.reduce(igfss, (start, igfs) => start.then(() => migrateIgfs(clustersModel, igfsModel, igfs)), Promise.resolve()) .then(() => log('IGFS migration finished.')); } return Promise.resolve(); }) .catch((err) => error('IGFS migration failed', err)); } function linkDomainToCluster(clustersModel, cluster, domainsModel, domain) { return clustersModel.updateOne({_id: cluster._id}, {$addToSet: {models: domain._id}}).exec() .then(() => domainsModel.updateOne({_id: domain._id}, {clusters: [cluster._id]}).exec()) .catch((err) => error(`Failed link domain model to cluster [domain=${domain._id}, cluster=${cluster.name}]`, err)); } function linkDomainToCache(cachesModel, cache, domainsModel, domain) { return cachesModel.updateOne({_id: cache._id}, {$addToSet: {domains: domain._id}}).exec() .then(() => domainsModel.updateOne({_id: domain._id}, {caches: [cache._id]}).exec()) .catch((err) => error(`Failed link domain model to cache [cache=${cache.name}, domain=${domain._id}]`, err)); } function migrateDomain(clustersModel, cachesModel, domainsModel, domain) { const cachesCnt = _.size(domain.caches); if (cachesCnt < 1) { if (_debug) log(`Found domain model not linked to cache [domain=${domain._id}]`); return getClusterForMigration(clustersModel, domain.space) .then((clusterLostFound) => linkDomainToCluster(clustersModel, clusterLostFound, domainsModel, domain)) .then(() => getCacheForMigration(clustersModel, cachesModel, domain.space)) .then((cacheLostFound) => linkDomainToCache(cachesModel, cacheLostFound, domainsModel, domain)) .catch((err) => error(`Failed to migrate not linked domain [domain=${domain._id}]`, err)); } if (_.isEmpty(domain.clusters)) { const cachesCnt = _.size(domain.caches); if (_debug) log(`Found domain model without cluster: [domain=${domain._id}, cachesCnt=${cachesCnt}]`); const grpByClusters = {}; return cachesModel.find({_id: {$in: domain.caches}}).lean().exec() .then((caches) => { if (caches) { _.forEach(caches, (cache) => { const c = _.get(grpByClusters, cache.clusters[0]); if (c) c.push(cache._id); else grpByClusters[cache.clusters[0]] = [cache._id]; }); return _.reduce(_.keys(grpByClusters), (start, cluster, idx) => start.then(() => { const domainId = domain._id; const clusterCaches = grpByClusters[cluster]; if (idx > 0) { delete domain._id; domain.caches = clusterCaches; return domainsModel.create(domain) .then((clonedDomain) => { return cachesModel.updateOne({_id: {$in: clusterCaches}}, {$addToSet: {domains: clonedDomain._id}}).exec() .then(() => clonedDomain); }) .then((clonedDomain) => linkDomainToCluster(clustersModel, {_id: cluster, name: `stub${idx}`}, domainsModel, clonedDomain)) .then(() => { return cachesModel.updateMany({_id: {$in: clusterCaches}}, {$pull: {domains: domainId}}).exec(); }); } return domainsModel.updateOne({_id: domainId}, {caches: clusterCaches}).exec() .then(() => linkDomainToCluster(clustersModel, {_id: cluster, name: `stub${idx}`}, domainsModel, domain)); }), Promise.resolve()); } error(`Found domain with orphaned caches: [domain=${domain._id}, caches=${domain.caches}]`); return Promise.resolve(); }) .catch((err) => error(`Failed to migrate domain [domain=${domain._id}]`, err)); } // Nothing to migrate, other domains will be migrated with caches. return Promise.resolve(); } function migrateDomains(clustersModel, cachesModel, domainsModel) { return domainsModel.find({}).lean().exec() .then((domains) => { const domainsCnt = _.size(domains); if (domainsCnt > 0) { log(`Domain models to migrate: ${domainsCnt}`); return _.reduce(domains, (start, domain) => start.then(() => migrateDomain(clustersModel, cachesModel, domainsModel, domain)), Promise.resolve()) .then(() => log('Domain models migration finished.')); } return Promise.resolve(); }) .catch((err) => error('Domain models migration failed', err)); } function deduplicate(title, model, name) { return model.find({}).lean().exec() .then((items) => { const sz = _.size(items); if (sz > 0) { log(`Deduplication of ${title} started...`); let cnt = 0; return _.reduce(items, (start, item) => start.then(() => { const data = item[name]; const dataSz = _.size(data); if (dataSz < 2) return Promise.resolve(); const deduped = _.uniqWith(data, _.isEqual); if (dataSz !== _.size(deduped)) { return model.updateOne({_id: item._id}, {$set: {[name]: deduped}}) .then(() => cnt++); } return Promise.resolve(); }), Promise.resolve()) .then(() => log(`Deduplication of ${title} finished: ${cnt}.`)); } return Promise.resolve(); }); } exports.up = function up(done) { const clustersModel = this('Cluster'); const cachesModel = this('Cache'); const domainsModel = this('DomainModel'); const igfsModel = this('Igfs'); process.on('unhandledRejection', function(reason, p) { console.log('Unhandled rejection at:', p, 'reason:', reason); }); Promise.resolve() .then(() => deduplicate('Cluster caches', clustersModel, 'caches')) .then(() => deduplicate('Cluster IGFS', clustersModel, 'igfss')) .then(() => deduplicate('Cache clusters', cachesModel, 'clusters')) .then(() => deduplicate('Cache domains', cachesModel, 'domains')) .then(() => deduplicate('IGFS clusters', igfsModel, 'clusters')) .then(() => deduplicate('Domain model caches', domainsModel, 'caches')) .then(() => migrateCaches(clustersModel, cachesModel, domainsModel)) .then(() => migrateIgfss(clustersModel, igfsModel)) .then(() => migrateDomains(clustersModel, cachesModel, domainsModel)) .then(() => log(`Duplicates counter: ${dup}`)) .then(() => done()) .catch(done); }; exports.down = function down(done) { log('Model migration can not be reverted'); done(); };