in cid/plugin.py [0:0]
def __init__(self, name):
logger.info(f'Initializing plugin {name}')
self.resources = {}
self.name = name
pkg_resources_db_directory = 'data'
for pkg_resource in resource_listdir(self.name,pkg_resources_db_directory):
if not resource_isdir(self.name, f'data/{pkg_resource}'):
logger.info(f'Located data file: {pkg_resource}')
ext = pkg_resource.rsplit('.', -1)[-1].lower()
if ext == 'json':
content = json.loads(resource_string(self.name, f'data/{pkg_resource}'))
logger.info(f'Loaded {pkg_resource} as JSON')
elif ext in ['yaml', 'yml']:
with resource_stream(self.name, f'data/{pkg_resource}') as yaml_stream:
content = yaml.load(yaml_stream, Loader=yaml.SafeLoader)
logger.info(f'Loaded {pkg_resource} as YAML')
if content is None:
logger.info(f'Unsupported file type: {pkg_resource}')
continue
# If plugin has resources defined in different files,
# they will be merged into one dict
resource_kind = pkg_resource.rsplit('.', -1)[0]
supported_resource_kinds = ['dashboards', 'views', 'datasets']
if resource_kind in supported_resource_kinds:
self.resources.update({
resource_kind: content
})
logger.info(f'Loaded {resource_kind} from {pkg_resource}')
# If plugin has resources defined in one file,
# simply add it to resources dict
else:
self.resources.update(content)
# Add plugin name to every resource
for v in self.resources.values():
for item in v.values():
if item is not None:
item.update({'providedBy': self.name})
logger.info(f'Plugin {self.name} initialized')