jupyter-gcs-contents-manager/gcs_contents_manager.py (535 lines of code) (raw):

# Copyright 2020 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Setup: # First, install the GCS Python client library using the # command: `pip install google-cloud-storage` # # Then, copy this library into a directory in your PYTHONPATH. # # Finally, make sure you have application default credentials # set up by running: `gcloud auth application-default login` # # Usage: Add the following lines to your Jupyter config file # (e.g. jupyter_notebook_config.py): # # from gcs_contents_manager import CombinedContentsManager, GCSContentsManager # c.NotebookApp.contents_manager_class = CombinedContentsManager # c.GCSContentsManager.bucket_name = '${NOTEBOOK_BUCKET}' # c.GCSContentsManager.bucket_notebooks_path = '${NOTEBOOK_PATH}' # c.GCSContentsManager.project = '${NOTEBOOK_BUCKET_PROJECT}' # c.FileContentsManager.root_dir = '${LOCAL_DISK_NOTEBOOK_DIR}' # # For '${NOTEBOOK_BUCKET}' specify the name of the GCS bucket where # you want to store your notebooks, and for '${NOTEBOOK_PATH}', # specify the name of the directory within that bucket that will be # treated as your root directory by Jupyter. For # '${NOTEBOOK_BUCKET_PROJECT}', specify the ID of the GCP project # that owns the GCS bucket. # # If you run JupyterLab with widgets that assume the current file # browser path is a location on your local disk (e.g. the # jupyterlab-git extension), then you will also need to set up a # link somewhere on your local disk for those widgets to use. # # For example, you could run the following: # # mkdir -p ~/.jupyter/symlinks_for_jupyterlab_widgets # ln -s ${LOCAL_DISK_NOTEBOOK_DIR} ~/.jupyter/symlinks_for_jupyterlab_widgets/Local\ Disk # # And then add the following snippet to your Jupyter config: # # c.CombinedContentsManager.root_dir = '~/.jupyter/symlinks_for_jupyterlab_widgets' import base64 import errno import json import logging import mimetypes import posixpath import re import nbformat from notebook.services.contents.filecheckpoints import GenericFileCheckpoints from notebook.services.contents.filemanager import FileContentsManager from notebook.services.contents.manager import ContentsManager from notebook.services.contents.checkpoints import Checkpoints, GenericCheckpointsMixin from tornado.web import HTTPError from traitlets import Unicode, default from google.cloud import storage utf8_encoding = 'utf-8' class GCSCheckpointManager(GenericCheckpointsMixin, Checkpoints): checkpoints_dir = '.ipynb_checkpoints' def __init__(self, **kwargs): self._kwargs = kwargs self._parent = kwargs['parent'] @property def bucket(self): return self._parent.bucket def checkpoint_path(self, checkpoint_id, path): path = (path or '').strip('/') return posixpath.join(self.checkpoints_dir, path, checkpoint_id) def checkpoint_blob(self, checkpoint_id, path, create_if_missing=False): blob_name = self.checkpoint_path(checkpoint_id, path) blob = self.bucket.get_blob(blob_name) if not blob and create_if_missing: blob = self.bucket.blob(blob_name) return blob def create_file_checkpoint(self, content, format, path): checkpoint_id = 'checkpoint' blob = self.checkpoint_blob(checkpoint_id, path, create_if_missing=True) content_type = 'text/plain' if format == 'text' else 'application/octet-stream' # GCS doesn't allow specifying the key version, so drop it if present if blob.kms_key_name: blob._properties['kmsKeyName'] = re.split('/cryptoKeyVersions/\d+$', blob.kms_key_name)[0] blob.upload_from_string(content, content_type=content_type) return { 'id': checkpoint_id, 'last_modified': blob.updated, } def create_notebook_checkpoint(self, nb, path): content = nbformat.writes(nb) return self.create_file_checkpoint(content, 'text', path) def _checkpoint_contents(self, checkpoint_id, path): blob = self.checkpoint_blob(checkpoint_id, path) if not blob: raise HTTPError( 404, 'No such checkpoint for "{}": {}'.format(path, checkpoint_id)) return blob.download_as_string(), blob.content_type def get_file_checkpoint(self, checkpoint_id, path): contents, content_type = self._checkpoint_contents(checkpoint_id, path) checkpoint_obj = { 'type': 'file', 'content': contents.decode(utf8_encoding), } checkpoint_obj[ 'format'] = 'text' if content_type == 'text/plain' else 'base64' return checkpoint_obj def get_notebook_checkpoint(self, checkpoint_id, path): contents, _ = self._checkpoint_contents(checkpoint_id, path) checkpoint_obj = { 'type': 'notebook', 'content': nbformat.reads(contents, as_version=4), } return checkpoint_obj def delete_checkpoint(self, checkpoint_id, path): blob = self.checkpoint_blob(checkpoint_id, path) if blob: blob.delete() return None def list_checkpoints(self, path): checkpoints = [] for b in self.bucket.list_blobs( prefix=posixpath.join(self.checkpoints_dir, path)): checkpoint = { 'id': posixpath.basename(b.name), 'last_modified': b.updated, } checkpoints.append(checkpoint) return checkpoints def rename_checkpoint(self, checkpoint_id, old_path, new_path): blob = self.checkpoint_blob(checkpoint_id, old_path) if not blob: return None new_blob_name = self.checkpoint_path(checkpoint_id, new_path) self.bucket.rename_blob(blob, new_blob_name) return None class GCSContentsManager(ContentsManager): bucket_name = Unicode(config=True) bucket_notebooks_path = Unicode(config=True) project = Unicode(config=True) @default('checkpoints_class') def _checkpoints_class_default(self): return GCSCheckpointManager @default('bucket_notebooks_path') def _bucket_notebooks_path_default(self): return '' def __init__(self, **kwargs): super(GCSContentsManager, self).__init__(**kwargs) self._bucket = None @property def bucket(self): if not self._bucket: if self.project: storage_client = storage.Client(project=self.project) else: storage_client = storage.Client() self._bucket = storage_client.get_bucket(self.bucket_name) return self._bucket def _normalize_path(self, path): path = path or '' return path.strip('/') def _gcs_path(self, normalized_path): if not self.bucket_notebooks_path: return normalized_path if not normalized_path: return self.bucket_notebooks_path return posixpath.join(self.bucket_notebooks_path, normalized_path) def is_hidden(self, path): try: path = self._normalize_path(path) return posixpath.basename(path).startswith('.') except HTTPError as err: raise err except Exception as ex: raise HTTPError(500, 'Internal server error: {}'.format(str(ex))) def file_exists(self, path): try: path = self._normalize_path(path) if not path: return False blob_name = self._gcs_path(path) blob = self.bucket.get_blob(blob_name) return blob is not None except HTTPError as err: raise err except Exception as ex: raise HTTPError(500, 'Internal server error: {}'.format(str(ex))) def dir_exists(self, path): try: path = self._normalize_path(path) if not path: return self.bucket.exists() dir_gcs_path = self._gcs_path(path) if self.bucket.get_blob(dir_gcs_path): # There is a regular file matching the specified directory. # # Would could have both a blob matching a directory path # and other blobs under that path. In that case, we cannot # treat the path as both a directory and a regular file, # so we treat the regular file as overriding the logical # directory. return False dir_contents = self.bucket.list_blobs(prefix=dir_gcs_path) for _ in dir_contents: return True return False except HTTPError as err: raise err except Exception as ex: raise HTTPError(500, 'Internal server error: {}'.format(str(ex))) def _blob_model(self, normalized_path, blob, content=True): blob_obj = {} blob_obj['path'] = normalized_path blob_obj['name'] = posixpath.basename(normalized_path) blob_obj['last_modified'] = blob.updated blob_obj['created'] = blob.time_created blob_obj['writable'] = True blob_obj['type'] = 'notebook' if blob_obj['name'].endswith( '.ipynb') else 'file' if not content: blob_obj['mimetype'] = None blob_obj['format'] = None blob_obj['content'] = None return blob_obj content_str = blob.download_as_string() if content else None if blob_obj['type'] == 'notebook': blob_obj['mimetype'] = None blob_obj['format'] = 'json' blob_obj['content'] = nbformat.reads(content_str, as_version=4) elif blob.content_type.startswith('text/'): blob_obj['mimetype'] = 'text/plain' blob_obj['format'] = 'text' blob_obj['content'] = content_str.decode(utf8_encoding) else: blob_obj['mimetype'] = 'application/octet-stream' blob_obj['format'] = 'base64' blob_obj['content'] = base64.b64encode(content_str) return blob_obj def _empty_dir_model(self, normalized_path, content=True): dir_obj = {} dir_obj['path'] = normalized_path dir_obj['name'] = posixpath.basename(normalized_path) dir_obj['type'] = 'directory' dir_obj['mimetype'] = None dir_obj['writable'] = True dir_obj['last_modified'] = self.bucket.time_created dir_obj['created'] = self.bucket.time_created dir_obj['format'] = None dir_obj['content'] = None if content: dir_obj['format'] = 'json' dir_obj['content'] = [] return dir_obj def _list_dir(self, normalized_path, content=True): dir_obj = self._empty_dir_model(normalized_path, content=content) if not content: return dir_obj # We have to convert a list of GCS blobs, which may include multiple # entries corresponding to a single sub-directory, into a list of immediate # directory contents with no duplicates. # # To do that, we keep a dictionary of immediate children, and then convert # that dictionary into a list once it is fully populated. children = {} def add_child(name, model, override_existing=False): """Add the given child model (for either a regular file or directory), to the list of children for the current directory model being built. It is possible that we will encounter a GCS blob corresponding to a regular file after we encounter blobs indicating that name should be a directory. For example, if we have the following blobs: some/dir/path/ some/dir/path/with/child some/dir/path ... then the first two entries tell us that 'path' is a subdirectory of 'dir', but the third one tells us that it is a regular file. In this case, we treat the regular file as shadowing the directory. The 'override_existing' keyword argument handles that by letting the caller specify that the child being added should override (i.e. hide) any pre-existing children with the same name. """ if self.is_hidden(model['path']) and not self.allow_hidden: return if (name in children) and not override_existing: return children[name] = model dir_gcs_path = self._gcs_path(normalized_path) for b in self.bucket.list_blobs(prefix=dir_gcs_path): # For each nested blob, identify the corresponding immediate child # of the directory, and then add that child to the directory model. prefix_len = len(dir_gcs_path) + 1 if dir_gcs_path else 0 suffix = b.name[prefix_len:] if suffix: # Ignore the place-holder blob for the directory itself first_slash = suffix.find('/') if first_slash < 0: child_path = posixpath.join(normalized_path, suffix) add_child(suffix, self._blob_model(child_path, b, content=False), override_existing=True) else: subdir = suffix[0:first_slash] if subdir: child_path = posixpath.join(normalized_path, subdir) add_child(subdir, self._empty_dir_model(child_path, content=False)) for child in children: dir_obj['content'].append(children[child]) return dir_obj def get(self, path, content=True, type=None, format=None): try: path = self._normalize_path(path) if not type and self.dir_exists(path): type = 'directory' if type == 'directory': return self._list_dir(path, content=content) gcs_path = self._gcs_path(path) blob = self.bucket.get_blob(gcs_path) return self._blob_model(path, blob, content=content) except HTTPError as err: raise err except Exception as ex: raise HTTPError(500, 'Internal server error: {}'.format(str(ex))) def _mkdir(self, normalized_path): gcs_path = self._gcs_path(normalized_path) + '/' blob = self.bucket.blob(gcs_path) blob.upload_from_string('', content_type='text/plain') return self._empty_dir_model(normalized_path, content=False) def save(self, model, path): try: self.run_pre_save_hook(model=model, path=path) normalized_path = self._normalize_path(path) if model['type'] == 'directory': return self._mkdir(normalized_path) gcs_path = self._gcs_path(normalized_path) blob = self.bucket.get_blob(gcs_path) if not blob: blob = self.bucket.blob(gcs_path) content_type = model.get('mimetype', None) if not content_type: content_type, _ = mimetypes.guess_type(normalized_path) contents = model['content'] if model['type'] == 'notebook': contents = nbformat.writes(nbformat.from_dict(contents)) elif model['type'] == 'file' and model['format'] == 'base64': b64_bytes = contents.encode('ascii') contents = base64.decodebytes(b64_bytes) # GCS doesn't allow specifying the key version, so drop it if present if blob.kms_key_name: blob._properties['kmsKeyName'] = re.split('/cryptoKeyVersions/\d+$', blob.kms_key_name)[0] blob.upload_from_string(contents, content_type=content_type) return self.get(path, type=model['type'], content=False) except HTTPError as err: raise err except Exception as ex: raise HTTPError(500, 'Internal server error: {}'.format(str(ex))) def delete_file(self, path): try: normalized_path = self._normalize_path(path) gcs_path = self._gcs_path(normalized_path) blob = self.bucket.get_blob(gcs_path) if blob: # The path corresponds to a regular file; just delete it. blob.delete() return None # The path (possibly) corresponds to a directory. Delete # every file underneath it. for blob in self.bucket.list_blobs(prefix=gcs_path): blob.delete() return None except HTTPError as err: raise err except Exception as ex: raise HTTPError(500, 'Internal server error: {}'.format(str(ex))) def rename_file(self, old_path, new_path): try: old_gcs_path = self._gcs_path(self._normalize_path(old_path)) new_gcs_path = self._gcs_path(self._normalize_path(new_path)) blob = self.bucket.get_blob(old_gcs_path) if blob: # The path corresponds to a regular file. self.bucket.rename_blob(blob, new_gcs_path) return None # The path (possibly) corresponds to a directory. Rename # every file underneath it. for b in self.bucket.list_blobs(prefix=old_gcs_path): self.bucket.rename_blob(b, b.name.replace(old_gcs_path, new_gcs_path)) return None except HTTPError as err: raise err except Exception as ex: raise HTTPError(500, 'Internal server error: {}'.format(str(ex))) class CombinedCheckpointsManager(GenericCheckpointsMixin, Checkpoints): def __init__(self, content_managers): self._content_managers = content_managers def _checkpoint_manager_for_path(self, path): path = path or '' path = path.strip('/') for path_prefix in self._content_managers: if path == path_prefix or path.startswith(path_prefix + '/'): relative_path = path[len(path_prefix):] return self._content_managers[path_prefix].checkpoints, relative_path raise HTTPError(400, 'Unsupported checkpoint path: {}'.format(path)) def checkpoint_path(self, checkpoint_id, path): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.checkpoint_path(checkpoint_id, relative_path) def checkpoint_blob(self, checkpoint_id, path, create_if_missing=False): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.checkpoint_blob( checkpoint_id, relative_path, create_if_missing=create_if_missing) def create_file_checkpoint(self, content, format, path): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.create_file_checkpoint(content, format, relative_path) def create_notebook_checkpoint(self, nb, path): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.create_notebook_checkpoint(nb, relative_path) def get_file_checkpoint(self, checkpoint_id, path): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.get_file_checkpoint(checkpoint_id, relative_path) def get_notebook_checkpoint(self, checkpoint_id, path): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.get_notebook_checkpoint(checkpoint_id, relative_path) def delete_checkpoint(self, checkpoint_id, path): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.delete_checkpoint(checkpoint_id, relative_path) def list_checkpoints(self, path): checkpoint_manager, relative_path = self._checkpoint_manager_for_path(path) return checkpoint_manager.list_checkpoints(relative_path) def rename_checkpoint(self, checkpoint_id, old_path, new_path): checkpoint_manager, old_relative_path = self._checkpoint_manager_for_path( old_path) new_checkpoint_manager, new_relative_path = self._checkpoint_manager_for_path( new_path) if new_checkpoint_manager != checkpoint_manager: raise HTTPError( 400, 'Unsupported rename across file systems: {}->{}'.format( old_path, new_path)) return checkpoint_manager.rename_checkpoint(checkpoint_id, old_relative_path, new_relative_path) class CombinedContentsManager(ContentsManager): root_dir = Unicode(config=True) preferred_dir = Unicode("", config=True) @default('checkpoints') def _default_checkpoints(self): return CombinedCheckpointsManager(self._content_managers) def __init__(self, **kwargs): print('Creating the combined contents manager...') super(CombinedContentsManager, self).__init__(**kwargs) file_cm = FileContentsManager(**kwargs) file_cm.checkpoints = GenericFileCheckpoints(**file_cm.checkpoints_kwargs) gcs_cm = GCSContentsManager(**kwargs) self._content_managers = { 'Local Disk': file_cm, 'GCS': gcs_cm, } def _content_manager_for_path(self, path): path = path or '' path = path.strip('/') for path_prefix in self._content_managers: if path == path_prefix or path.startswith(path_prefix + '/'): relative_path = path[len(path_prefix):] return self._content_managers[path_prefix], relative_path, path_prefix if '/' in path: path_parts = path.split('/', 1) return None, path_parts[1], path_parts[0] return None, path, '' def is_hidden(self, path): try: cm, relative_path, unused_path_prefix = self._content_manager_for_path( path) if not cm: return False return cm.is_hidden(relative_path) except HTTPError as err: raise err except Exception as ex: raise HTTPError( 500, 'Internal server error: [{}] {}'.format(type(ex), str(ex))) def file_exists(self, path): try: cm, relative_path, unused_path_prefix = self._content_manager_for_path( path) if not cm: return False return cm.file_exists(relative_path) except HTTPError as err: raise err except Exception as ex: raise HTTPError( 500, 'Internal server error: [{}] {}'.format(type(ex), str(ex))) def dir_exists(self, path): if path in ['', '/']: return True try: cm, relative_path, unused_path_prefix = self._content_manager_for_path( path) if not cm: return False return cm.dir_exists(relative_path) except HTTPError as err: raise err except Exception as ex: raise HTTPError( 500, 'Internal server error: [{}] {}'.format(type(ex), str(ex))) def _make_model_relative(self, model, path_prefix): if 'path' in model: model['path'] = '{}/{}'.format(path_prefix, model['path']) if model.get('type', None) == 'directory': self._make_children_relative(model, path_prefix) def _make_children_relative(self, model, path_prefix): children = model.get('content', None) if children: for child in children: self._make_model_relative(child, path_prefix) def get(self, path, content=True, type=None, format=None): if path in ['', '/']: dir_obj = {} dir_obj['path'] = '' dir_obj['name'] = '' dir_obj['type'] = 'directory' dir_obj['mimetype'] = None dir_obj['writable'] = False dir_obj['format'] = None dir_obj['content'] = None dir_obj['format'] = 'json' contents = [] for path_prefix in self._content_managers: child_obj = self._content_managers[path_prefix].get('', content=False) child_obj['path'] = path_prefix child_obj['name'] = path_prefix child_obj['writable'] = False contents.append(child_obj) dir_obj['content'] = contents dir_obj['created'] = contents[0]['created'] dir_obj['last_modified'] = contents[0]['last_modified'] return dir_obj try: cm, relative_path, path_prefix = self._content_manager_for_path(path) if not cm: raise HTTPError(404, 'No content manager defined for "{}"'.format(path)) model = cm.get(relative_path, content=content, type=type, format=format) self._make_model_relative(model, path_prefix) return model except HTTPError as err: raise err except Exception as ex: raise HTTPError( 500, 'Internal server error: [{}] {}'.format(type(ex), str(ex))) def save(self, model, path): if path in ['', '/']: raise HTTPError(403, 'The top-level directory is read-only') try: self.run_pre_save_hook(model=model, path=path) cm, relative_path, path_prefix = self._content_manager_for_path(path) if (relative_path in ['', '/']) or (path_prefix in ['', '/']): raise HTTPError(403, 'The top-level directory contents are read-only') if not cm: raise HTTPError(404, 'No content manager defined for "{}"'.format(path)) if 'path' in model: model['path'] = relative_path model = cm.save(model, relative_path) if 'path' in model: model['path'] = path return model except HTTPError as err: raise err except Exception as ex: raise HTTPError( 500, 'Internal server error: [{}] {}'.format(type(ex), str(ex))) def delete_file(self, path): if path in ['', '/']: raise HTTPError(403, 'The top-level directory is read-only') try: cm, relative_path, path_prefix = self._content_manager_for_path(path) if (relative_path in ['', '/']) or (path_prefix in ['', '/']): raise HTTPError(403, 'The top-level directory contents are read-only') if not cm: raise HTTPError(404, 'No content manager defined for "{}"'.format(path)) return cm.delete_file(relative_path) except OSError as err: # The built-in file contents manager will not attempt to wrap permissions # errors when deleting files if they occur while trying to move the # to-be-deleted file to the trash, because the underlying send2trash # library does not set the errno attribute of the raised OSError. # # To work around this we explicitly catch such errors, check if they # start with the magic text "Permission denied", and then wrap them # in an HTTPError. if str(err).startswith('Permission denied'): raise HTTPError(403, str(err)) raise HTTPError( 500, 'Internal server error: [{}] {}'.format(err.errno, str(err))) except HTTPError as err: raise err except Exception as ex: raise HTTPError( 500, 'Internal server error: [{}] {}'.format(type(ex), str(ex))) def rename_file(self, old_path, new_path): if (old_path in ['', '/']) or (new_path in ['', '/']): raise HTTPError(403, 'The top-level directory is read-only') try: old_cm, old_relative_path, old_prefix = self._content_manager_for_path( old_path) if (old_relative_path in ['', '/']) or (old_prefix in ['', '/']): raise HTTPError(403, 'The top-level directory contents are read-only') if not old_cm: raise HTTPError(404, 'No content manager defined for "{}"'.format(old_path)) new_cm, new_relative_path, new_prefix = self._content_manager_for_path( new_path) if (new_relative_path in ['', '/']) or (new_prefix in ['', '/']): raise HTTPError(403, 'The top-level directory contents are read-only') if not new_cm: raise HTTPError(404, 'No content manager defined for "{}"'.format(new_path)) if old_cm != new_cm: raise HTTPError(400, 'Unsupported rename across file systems') return old_cm.rename_file(old_relative_path, new_relative_path) except HTTPError as err: raise err except Exception as ex: raise HTTPError( 500, 'Internal server error: [{}] {}'.format(type(ex), str(ex)))