ForgeFiles/forgefiles/files_main.py (539 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. '''This is the main controller module for the Files Plugin.''' import logging from urllib.parse import unquote from tg import config, redirect, expose, flash from tg.decorators import with_trailing_slash, without_trailing_slash from tg import tmpl_context as c, app_globals as g from tg import request from jinja2.exceptions import TemplateNotFound from allura.app import Application from allura.controllers import BaseController from allura.lib.decorators import require_post from allura.lib.widgets.subscriptions import SubscribeForm from allura.lib.security import require_access from allura import model as M from allura.controllers import attachments as att from allura import version from allura.model.timeline import TransientActor from bson import ObjectId from webob import exc # local imports ## from forgefiles.model.files import UploadFolder, UploadFiles, Upload log = logging.getLogger(__name__) class FilesApp(Application): """Files plugin for the Allura platform""" __version__ = version.__version__ tool_label = 'Files' tool_description = """Upload executables for your project. You may maintain version specific executables as well.""" default_mount_label = 'Files' default_mount_point = 'files' uninstallable = True ordinal = 9 max_instances = 1 def __init__(self, project, config): Application.__init__(self, project, config) self.root = FilesController() def install(self, project): 'Set up any default permissions and roles here' self.config.options['project_name'] = project.name super().install(project) role_anon = M.ProjectRole.by_name('*anonymous')._id self.config.acl = [ M.ACE.allow(role_anon, 'read'), ] def uninstall(self, project): "Remove all the tool's artifacts from the database" app_config_id = {'app_config_id': c.app.config._id} Upload.query.remove(app_config_id) UploadFolder.query.remove(app_config_id) file_objects = UploadFiles.query.find(app_config_id).all() for file_object in file_objects: file_object.delete() super().uninstall(project) def has_linked_download(self): return UploadFiles.query.find({ 'app_config_id': c.app.config._id, 'linked_to_download': True, 'disabled': False}).count() def get_parent_folders(linked_file_object=None): '''Returns the list of the parent folders for the current file or folder''' parent_folder = linked_file_object.parent_folder if linked_file_object else None parent_folders_list = [] while parent_folder: parent_folders_list.append(str(parent_folder._id)) parent_folder = parent_folder.parent_folder parent_folders_list = list(set(parent_folders_list)) return parent_folders_list class FilesController(BaseController): """Root controller for the Files Application""" def _check_security(self): require_access(c.app, 'read') @expose('jinja:forgefiles:templates/files.html') def index(self): '''Index method for the Root controller''' require_access(c.app, 'read') folder_object = None file_object = None upload_object = Upload.query.get(app_config_id=c.app.config._id) self.attachment = AttachmentsController(upload_object) file_objects = UploadFiles.query.find({'app_config_id': c.app.config._id, 'parent_folder_id': None}) file_objects = file_objects.sort([('created_date', -1)]).all() folder_objects = UploadFolder.query.find({'app_config_id': c.app.config._id, 'parent_folder_id': None}) folder_objects = folder_objects.sort([('created_date', -1)]).all() if c.user in c.project.admins(): M.Mailbox.subscribe(type='direct') c.subscribe_form = SubscribeForm(thing='files') tool_subscribed = M.Mailbox.subscribed() if tool_subscribed: subscribed = M.Mailbox.subscribed() else: subscribed = False file_object = UploadFiles.query.get(app_config_id=c.app.config._id, linked_to_download=True) parents = get_parent_folders(linked_file_object=file_object) return dict(file_objects=file_objects, folder_objects=folder_objects, folder_object=folder_object, file_object=file_object, subscribed=subscribed, parents=parents) def get_parent_folder_url(self, parent_folder_id): ''' Returns the url,parent_folder and id of parent_folder object if object is there''' if (parent_folder_id == 'None') or (not parent_folder_id): parent_folder_id = None parent_folder = None url = c.app.url else: parent_folder = UploadFolder.query.get(_id=ObjectId(parent_folder_id), app_config_id=c.app.config._id) parent_folder_id = ObjectId(parent_folder._id) url = parent_folder.url() return parent_folder_id, parent_folder, url @require_post() @expose() def create_folder(self, parent_folder_id=None, folder_name=None): '''Controller method for creating a folder. The folder is stored in UploadFolder collection''' require_access(c.app, 'create') parent_folder_id, parent_folder, url = self.get_parent_folder_url(parent_folder_id) if folder_name: folder_object = UploadFolder.query.find({ 'app_config_id': c.app.config._id, 'folder_name': folder_name, 'parent_folder_id': parent_folder_id}).first() if folder_object: flash('Folder with the same name already exists!') else: folder_object = UploadFolder(folder_name=folder_name) folder_object.parent_folder_id = parent_folder_id parent = parent_folder while parent: parent.folder_ids.append(str(folder_object._id)) parent = parent.parent_folder flash('Folder is created successfully') g.director.create_activity(c.user, 'created', folder_object, related_nodes=[c.project]) else: flash('Folder is not created successfully') return redirect(url) @require_post() @expose() def upload_file(self, parent_folder_id=None, file_upload=None, filename=None): '''Controller method for creating a folder. The folder is stored in UploadFolder collection''' require_access(c.app, 'create') parent_folder_id, parent_folder, url = self.get_parent_folder_url(parent_folder_id) if file_upload is not None: file_object = UploadFiles.query.find({ 'app_config_id': c.app.config._id, 'filename': filename, 'parent_folder_id': parent_folder_id}).first() if file_object: flash('File with the same name already exists!') else: upload_object = Upload( app_config_id=c.app.config._id, filename=filename, filetype=file_upload.type) attach_object = upload_object.attach( filename, file_upload.file, parent_folder_id=parent_folder_id) if attach_object.parent_folder: upload_object.file_url = attach_object.parent_folder.url() else: upload_object.file_url = c.app.url parent = parent_folder while parent: parent.file_ids.append(str(attach_object._id)) parent = parent.parent_folder flash('File is uploaded successfully') g.director.create_activity(c.user, 'uploaded', upload_object, related_nodes=[c.project]) else: flash('File is not uploaded successfully') return redirect(url) @require_post() @expose() def delete_file(self, file_id=None): '''Controller method to delete a file''' file_object = UploadFiles.query.get(_id=ObjectId(file_id), app_config_id=c.app.config._id) upload_object = Upload.query.get(_id=file_object.artifact_id, app_config_id=c.app.config._id) file_name = file_object.filename transient_actor = TransientActor(activity_name=file_name) url = c.app.url if file_id is not None: require_access(upload_object, 'delete') self.delete_file_from_db(file_id=file_id) parent_folder = file_object.parent_folder if parent_folder: url = parent_folder.url() flash('File is successfully deleted') g.director.create_activity(c.user, 'deleted the file', transient_actor, related_nodes=[c.project]) else: flash('File is not deleted') return redirect(url) def delete_file_from_db(self, file_id=None): '''Method to delete a file from db''' file_object = UploadFiles.query.get(_id=ObjectId(file_id), app_config_id=c.app.config._id) Upload.query.remove({'_id': file_object.artifact_id, 'app_config_id': c.app.config._id}) file_object.delete() def delete_folder_recursively(self, folder_id): '''This method is called recursively to delete folder in a hierarchy''' sub_file_objects = UploadFiles.query.find(dict({ 'app_config_id': c.app.config._id, 'parent_folder_id': ObjectId(folder_id)})).all() for file_object in sub_file_objects: self.delete_file_from_db(file_id=file_object._id) sub_folder_objects = UploadFolder.query.find({ 'app_config_id': c.app.config._id, 'parent_folder_id': ObjectId(folder_id)}).all() for folder_object in sub_folder_objects: self.delete_folder_recursively(folder_object._id) UploadFolder.query.remove({'_id': ObjectId(folder_id), 'app_config_id': c.app.config._id}) @without_trailing_slash @require_post() @expose('jinja:forgefiles:templates/files.html') def delete_folder(self, folder_id=None): '''Controller method to delete a folder''' folder_object = UploadFolder.query.get(_id=ObjectId(folder_id), app_config_id=c.app.config._id) folder_name = folder_object.folder_name transient_actor = TransientActor(activity_name=folder_name) url = c.app.url if folder_id is not None: require_access(folder_object, 'delete') self.delete_folder_recursively(folder_id) if folder_object.parent_folder: url = folder_object.parent_folder.url() flash('Folder is deleted Successfully') g.director.create_activity(c.user, 'deleted the folder', transient_actor, related_nodes=[c.project]) else: flash('Folder is not deleted') return redirect(url) @without_trailing_slash @require_post() @expose() def link_file(self, file_id=None, status=None): '''Controller method to link a file to the download button''' linkable_file_object = UploadFiles.query.get(_id=ObjectId(file_id), app_config_id=c.app.config._id) upload_object = Upload.query.get(_id=linkable_file_object.artifact_id, app_config_id=c.app.config._id) require_access(upload_object, 'link') if status == 'False': linkable_file_object.linked_to_download = False else: file_objects = UploadFiles.query.find({'app_config_id': c.app.config._id}).all() for file_object in file_objects: if file_object.linked_to_download: file_object.linked_to_download = False linkable_file_object.linked_to_download = True @expose() def download_file(self, filename=None): '''Controller method to download a file''' if filename: request_path = request.path.split(c.app.url)[-1].rstrip('/') request_path = unquote(request_path) linked_file_object = UploadFiles.query.find({ 'app_config_id': c.app.config._id, 'filename': filename, 'path': request_path, 'disabled': False, }).first() else: linked_file_object = UploadFiles.query.find({ 'app_config_id': c.app.config._id, 'linked_to_download': True, 'disabled': False, }).first() if linked_file_object: try: if not c.user.is_anonymous(): M.Mailbox.subscribe(type='direct') return linked_file_object.serve(embed=False) except Exception as e: log.exception('%s error to download the file', e) else: flash('No artifact available') return redirect(c.app.url) @require_post() @expose() def edit_folder(self, folder_id=None, folder_name=None): '''Controller method to edit the folder name''' url = c.app.url folder_object = UploadFolder.query.get(_id=ObjectId(folder_id), app_config_id=c.app.config._id) if folder_object: require_access(folder_object, 'update') folder_object.folder_name = folder_name flash("Folder name edited successfully") if folder_object.parent_folder: url = folder_object.parent_folder.url() else: flash("Folder name not edited") redirect(url) @require_post() @expose() def edit_file(self, file_id=None, file_name=None): '''Controller method to edit the file name''' url = c.app.url file_object = UploadFiles.query.get(_id=ObjectId(file_id), app_config_id=c.app.config._id) upload_object = Upload.query.get(_id=file_object.artifact_id, app_config_id=c.app.config._id) if file_object: require_access(upload_object, 'update') upload_object.filename = file_name file_object.filename = file_name flash("File name edited successfully") if file_object.parent_folder: url = file_object.parent_folder.url() else: flash("File not edited") return redirect(url) @require_post() @expose() def publish_folder(self, folder_id=None, remarks=None): '''Controller which publishes the folder. It send update about the publishing of the folder.''' folder_object = UploadFolder.query.get(_id=ObjectId(folder_id), app_config_id=c.app.config._id) url = c.app.url if folder_object: require_access(folder_object, 'publish') folder_object.published = True folder_object.remarks = remarks mailbox_object = M.Mailbox.query.find({'app_config_id': c.app.config._id}).all() user_ids = [i.user_id for i in mailbox_object] admins = [i._id for i in c.project.admins()] user_ids += admins user_ids = list(set(user_ids)) from allura.tasks import mail_tasks from allura.lib import helpers as h template_name = '' try: for i in user_ids: user_object = M.User.query.get(_id=i) template_name = 'forgefiles:/templates/mail.html' text = g.jinja2_env.get_template(template_name).render(dict( base_url=config.get('base_url'), user_object=user_object, project=c.project, remarks=remarks, folder_object=folder_object, project_owner=c.user, domain=config.get('domain') )) email_addr = user_object.get_pref('email_address') if email_addr: mail_tasks.sendsimplemail.post( fromaddr=g.noreply, reply_to=g.noreply, toaddr=email_addr, subject='{} - {} Release Update'.format(config.get('site_name'), c.project.name), message_id=h.gen_message_id(), text=text) if folder_object.parent_folder: url = folder_object.parent_folder.url() flash('Successfully Published') except TemplateNotFound: log.exception('%s Template not found' % (template_name)) log.info('Folder %s is not published successfully' % (folder_object.folder_name)) flash('Folder is not published successfully') return redirect(url) @require_post() @expose() def disable_folder(self, folder_id=None, status=None): '''Controller method to disable the folder.''' folder_object = UploadFolder.query.get(_id=ObjectId(folder_id), app_config_id=c.app.config._id) if status == 'True': disable_status = True text = 'disabled' else: disable_status = False text = 'enabled' if folder_object: require_access(folder_object, 'disable') folder_object.disabled = disable_status '''Disabling Child folders & files of the current folder ''' for child_folder_id in folder_object.folder_ids: child_folder_object = UploadFolder.query.get( _id=ObjectId(child_folder_id), app_config_id=c.app.config._id) if child_folder_object: child_folder_object.disabled = disable_status for child_file_id in folder_object.file_ids: child_file_object = UploadFiles.query.get(_id=ObjectId(child_file_id), app_config_id=c.app.config._id) if child_file_object: child_file_object.disabled = disable_status flash('Folder %s successfully' % (text)) else: flash('No folder exists') @require_post() @expose() def disable_file(self, file_id=None, status=None): '''Controller method to disable the file.''' file_object = UploadFiles.query.get(_id=ObjectId(file_id), app_config_id=c.app.config._id) upload_object = Upload.query.get(_id=file_object.artifact_id, app_config_id=c.app.config._id) if status == 'True': disable_status = True text = 'disabled' else: disable_status = False text = 'enabled' if file_object: require_access(upload_object, 'disable') file_object.disabled = disable_status flash('File %s successfully' % (text)) else: flash('No file exists') @expose('json:') @require_post() def subscribe(self, subscribe=None, unsubscribe=None): '''Controller method that subscribes an user to the files plugin.''' if subscribe: M.Mailbox.subscribe(type='direct') elif unsubscribe: M.Mailbox.unsubscribe() return { 'status': 'ok', 'subscribed': M.Mailbox.subscribed(), } def get_folder_object(self, folder_id=None): '''Returns the folder object for input folder id''' folder_object = UploadFolder.query.get(_id=ObjectId(folder_id), app_config_id=c.app.config._id) return folder_object @expose('jinja:forgefiles:templates/create_folder.html') def get_parent_for_create_folder(self, folder_id=None): '''Returns the parent object of the input folder id''' folder_object = self.get_folder_object(folder_id) return dict(folder_object=folder_object) @expose('jinja:forgefiles:templates/upload_file.html') def get_parent_for_upload_file(self, folder_id=None): '''Returns the parent object of the input folder id''' folder_object = self.get_folder_object(folder_id) return dict(folder_object=folder_object) def get_folder_file_object(self, object_id=None): '''Returns corresponding file or folder object for the input id ''' folder_object = UploadFolder.query.get(_id=ObjectId(object_id), app_config_id=c.app.config._id) file_object = UploadFiles.query.get(_id=ObjectId(object_id), app_config_id=c.app.config._id) return dict(folder_object=folder_object, file_object=file_object) @expose('jinja:forgefiles:templates/edit.html') def get_editable_object(self, object_id=None): '''Returns object id of the folder or file to be edited''' object_dict = self.get_folder_file_object(object_id) return object_dict @expose('jinja:forgefiles:templates/delete.html') def get_deletable_object(self, object_id=None): '''Returns object id of the folder or file to be deleted''' object_dict = self.get_folder_file_object(object_id) return object_dict @expose('jinja:forgefiles:templates/publish_folder.html') def get_publishable_folder(self, folder_id=None): '''Returns the status and folder object if the folder can be published or not''' linked_file_object = UploadFiles.query.get( app_config_id=c.app.config._id, linked_to_download=True, disabled=False) parent_folders = get_parent_folders(linked_file_object=linked_file_object) if folder_id: folder_object = UploadFolder.query.get(_id=ObjectId(folder_id), app_config_id=c.app.config._id) status = str(folder_object._id) in parent_folders else: folder_object = None status = False return dict(folder_object=folder_object, status=status) @expose() def _lookup(self, name, *remainder): ''' Class method which is used to call individual files controller class''' if not remainder: argument = name else: argument = remainder[-1] if argument == 'createFolder': argument = None return IndividualFilesController(argument), remainder def folder_breadcrumbs(folder_object=None): ''' Function to create a breadcrumbs for folders ''' list_object = folder_object.path.split('/') second_list = [] length = 0 urls = {} for i in list_object: length += len(i) folder_object = UploadFolder.query.get(folder_name=i) urls[str(i)] = str(folder_object.url()) if length in range(1, (61-len(list_object[-1])+1)): second_list.append(i) second_list.append('...') second_list.append(list_object[-1]) string = '/'.join(second_list) if length > 61: return string, urls else: return folder_object.path, urls # handle requests for individual folder,file objects class IndividualFilesController(BaseController): """Handle requests for a specific folder/file objects""" def __init__(self, arg): path = request.path.split(c.app.url)[-1].rstrip('/') if path == arg: path = arg path = unquote(path) arg = unquote(arg) self.folder_object = UploadFolder.query.find({ 'app_config_id': ObjectId(c.app.config._id), 'folder_name': arg, 'path': path}).first() self.file_object = UploadFiles.query.find({ 'app_config_id': ObjectId(c.app.config._id), 'filename': arg, 'path': path}).first() methods = ('create_folder', 'upload_file', 'delete_file', 'delete_folder', 'subscribe') if (not self.folder_object) and (not self.file_object) and (arg not in methods): log.exception('No Folder/File object found') raise exc.HTTPNotFound() else: pass def _check_security(self): require_access(c.app, 'read') @expose('jinja:forgefiles:templates/files.html') @with_trailing_slash def index(self): ''' Index method of individual folder/file objects''' require_access(c.app, 'read') folder_objects = None file_objects = None folder_path, urls = '', '' if self.folder_object: folder_objects = UploadFolder.query.find({ 'app_config_id': c.app.config._id, 'parent_folder_id': self.folder_object._id}) folder_objects = folder_objects.sort([('created_date', -1)]).all() file_objects = UploadFiles.query.find({ 'app_config_id': c.app.config._id, 'parent_folder_id': self.folder_object._id}) file_objects = file_objects.sort([('created_date', -1)]).all() folder_path, urls = folder_breadcrumbs(folder_object=self.folder_object) elif self.file_object: return FilesController().download_file(filename=self.file_object.filename) if c.user in c.project.admins(): M.Mailbox.subscribe(type='direct') c.subscribe_form = SubscribeForm(thing='files') tool_subscribed = M.Mailbox.subscribed() if tool_subscribed: subscribed = M.Mailbox.subscribed() else: subscribed = False file_object = UploadFiles.query.get(app_config_id=c.app.config._id, linked_to_download=True) parents = get_parent_folders(linked_file_object=file_object) return dict(folder_objects=folder_objects, file_objects=file_objects, folder_object=self.folder_object, file_object=self.file_object, subscribed=subscribed, parents=parents, folder_path=folder_path, urls=urls) @require_post() @expose() def create_folder(self, parent_folder_id=None, folder_name=None): return FilesController().create_folder(parent_folder_id=parent_folder_id, folder_name=folder_name) @require_post() @expose() def upload_file(self, parent_folder_id=None, filename=None, file_upload=None): return FilesController().upload_file( parent_folder_id=parent_folder_id, filename=filename, file_upload=file_upload) @require_post() @expose() def delete_file(self, file_id=None): return FilesController().delete_file(file_id=file_id) @expose('json:') @require_post() def subscribe(self, subscribe=None, unsubscribe=None): if subscribe: M.Mailbox.subscribe(type='direct') elif unsubscribe: M.Mailbox.unsubscribe() return { 'status': 'ok', 'subscribed': M.Mailbox.subscribed(), } @expose() def _lookup(self, name, *remainder): if not remainder: argument = name else: argument = remainder[-1] return IndividualFilesController(argument), remainder class AttachmentController(att.AttachmentController): AttachmentClass = UploadFiles edit_perm = 'update' class AttachmentsController(att.AttachmentsController): AttachmentControllerClass = AttachmentController