in jupyter-gcs-contents-manager/gcs_contents_manager.py [0:0]
def save(self, model, path):
try:
self.run_pre_save_hook(model=model, path=path)
normalized_path = self._normalize_path(path)
if model['type'] == 'directory':
return self._mkdir(normalized_path)
gcs_path = self._gcs_path(normalized_path)
blob = self.bucket.get_blob(gcs_path)
if not blob:
blob = self.bucket.blob(gcs_path)
content_type = model.get('mimetype', None)
if not content_type:
content_type, _ = mimetypes.guess_type(normalized_path)
contents = model['content']
if model['type'] == 'notebook':
contents = nbformat.writes(nbformat.from_dict(contents))
elif model['type'] == 'file' and model['format'] == 'base64':
b64_bytes = contents.encode('ascii')
contents = base64.decodebytes(b64_bytes)
# GCS doesn't allow specifying the key version, so drop it if present
if blob.kms_key_name:
blob._properties['kmsKeyName'] = re.split('/cryptoKeyVersions/\d+$',
blob.kms_key_name)[0]
blob.upload_from_string(contents, content_type=content_type)
return self.get(path, type=model['type'], content=False)
except HTTPError as err:
raise err
except Exception as ex:
raise HTTPError(500, 'Internal server error: {}'.format(str(ex)))