experimental/emulated_vault/opt/emulated-vault/storage/fs_adapter.py (94 lines of code) (raw):

# # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # # import os from . adapter_base import AdapterBase class FsAdapter(AdapterBase): """ Fs (file system) adapter class. This class implements the API required for storing and retriving the content kept in the vault. This specific Adapter works using files upon a file-system. Inputs configuration (held under "fs-adapter" section in the config file) include: :param db-base-os-path: The path in which the DB files are stored :param ping-os-path: The path of a variable mimicing the RIAK ping functionality """ def __init__ (self, logger): """ The class constructor. :param logger: logger to send log messages to :type logger: a python logging logger class """ self.logger = logger def init_cfg(self, fullConfig):# -> bool: """ Initialize the class basic parameters. Part of Adapter required API. Read the relevant section in the configuration """ myCfgData = dict(fullConfig.items("fs-adapter")) if fullConfig.has_section("fs-adapter") else {} self.basePath = myCfgData.get("db-base-os-path") if self.basePath is None: self.logger.error("Missing %s/%s configuration", "fs-adapter", "db-base-os-path") return False self.pingStoragePath = myCfgData.get("ping-os-path") if self.pingStoragePath is None: self.logger.error("Missing %s/%s configuration", "fs-adapter", "ping-os-path") return False return True def init(self):# -> bool: """ Initialize the class ability to answer for ping. Part of Adapter required API. """ value = ":)" success = self.write_parameter_by_storage_path(self.pingStoragePath, value) if not success: self.logger.error("Failed to set parameter %s", self.pingStoragePath) return False return True def get_parameter_storage_path(self, parameterUrlPath):# -> bool, str: """ Conversion function - taking a key's path and translate to a file path on the file system """ #verifying the existance of "/" at the beging if not parameterUrlPath.startswith("/"): self.logger.error("Failed to translate url path '%s': no leading '/'", parameterUrlPath) return False, "" # avoiding the use of os.path.join in purpuse. # we do not want the "/" at the begining push us to the root path return True, self.basePath + parameterUrlPath.replace("/", os.path.sep) def get_parameter_url_path_from_storage_path(self, parameterStoragePath):# -> str: """ Conversion function - taking file path on the file system and translate to key's path """ return True, "/"+os.path.relpath(parameterStoragePath, self.basePath).replace(os.path.sep, "/") def ping(self):# -> bool: """ Get value for the ping request by its path. Part of Adapter required API. """ success, value = self.read_parameter_by_storage_path(self.pingStoragePath) if not success or value is None: self.logger.error("no ping response") return False self.logger.debug("ping response: %s", value) return True def read_parameter_by_storage_path(self, parameterStoragePath):# -> (bool, str): """ Reading the value from the provided file name. """ self.logger.debug("Get parameter by os path: %s", parameterStoragePath) try: with open(parameterStoragePath) as fd: value = fd.read() except Exception as e: self.logger.exception("%s parameter os path not found.", parameterStoragePath) return False, None self.logger.debug("Get parameter by os path %s succeed", parameterStoragePath) return True, value def read_parameters_by_storage_path(self, parameterStoragePathPrefix, keyFilters):# -> (bool, dict(str, str)): """ Reading the values of the parameters the provided directory. """ self.logger.debug("Get parameters under os path %s", parameterStoragePathPrefix) parameters = {} fileNames = [] for (dirpath, _, filenames) in os.walk(parameterStoragePathPrefix): fileNames += [os.path.join(dirpath, file) for file in filenames] for fileName in fileNames: filteredOut = False for filterName, filterfunc in keyFilters.items():#items() - supporting python 2&3 if not filterfunc(fileName): self.logger.debug("Parameter os path %s dropped, not matching filter '%s'", self.get_parameter_url_path_from_storage_path(fileName)[1], filterName) filteredOut = True break if filteredOut: continue success_path, parameterUrlPath = self.get_parameter_url_path_from_storage_path(fileName) if not success_path: self.logger.error("%s parameter os path is invalid.", fileName) return False, None success, value = self.read_parameter_by_storage_path(fileName) if not success: self.logger.error("%s parameter os path not found.", fileName) return False, None parameters[parameterUrlPath] = value return True, parameters def write_parameter_by_storage_path(self, parameterStoragePath, value):# -> bool: """ Writing the value to the provided file name. """ self.logger.debug("Set parameter by os path %s", parameterStoragePath) try: dirname = os.path.dirname(parameterStoragePath) if dirname and not os.path.exists(dirname): os.makedirs(dirname) with open(parameterStoragePath, "w") as fd: fd.write(value) except Exception as e: self.logger.exception("could not post parameter os path %s", parameterStoragePath) return False self.logger.debug("Set parameter os path %s done", parameterStoragePath) return True def remove_parameter_by_storage_path(self, parameterStoragePath):# -> bool: """ Deleting the the provided file. """ self.logger.debug("Delete parameter os path %s", parameterStoragePath) try: os.remove(_parameterStoragePath) except Exception as e: self.logger.exception("could not delete parameter os path %s", parameterStoragePath) return False self.logger.debug("Delete parameter os path %s done", parameterStoragePath) return True