faker_datasets/__init__.py (100 lines of code) (raw):
from functools import partialmethod, wraps
from pathlib import Path
from faker.providers import BaseProvider
__version__ = "0.1.0"
def load_json(fp):
import json
return json.load(fp)
def load_dataset(filename):
suffix = Path(filename).suffix[1:]
with open(filename) as fp:
if suffix == "json":
return load_json(fp)
else:
raise ValueError("Unsupported format: %s", suffix)
def chroot(dataset, common_path, item_path=None):
for part in common_path.split(".")[1:]:
dataset = dataset[part]
if item_path:
item_path_parts = item_path.split(".")[1:]
new_dataset = []
for item in dataset:
for part in item_path_parts:
item = item[part]
new_dataset.append(item)
dataset = new_dataset
return dataset
def dataset(filename, root):
paths = root.split("[]")
if not all(x.startswith(".") for x in paths if x):
raise ValueError(f"Malformed root: {root}")
if root != "." and root.endswith(".") or root.find("..") != -1 or len(paths) > 2:
raise ValueError(f"Malformed root: {root}")
dataset = load_dataset(filename)
return dataset if root == "." else chroot(dataset, *paths)
def pick(faker, dataset, *, match=None, max_attempts=1000):
max = len(dataset) - 1
if not match:
return dataset[faker.random_int(0, max)]
while max_attempts:
entry = dataset[faker.random_int(0, max)]
if match(entry):
return entry
max_attempts -= 1
raise ValueError("Run out of attempts")
class Provider(BaseProvider):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# At the first instantiation, likely the only one, propagate
# the datasets to all the decorated methods needing them.
if hasattr(self.__class__, "__datasets__"):
for member in self.__class__.__dict__.values():
if hasattr(member, "set_datasets"):
member.set_datasets(self.__class__.__datasets__)
# Pickers and decorated methods have a reference to the datasets of
# their interest, let's drop all the unused others and free memory.
del self.__class__.__datasets__
class add_dataset:
def __init__(self, name, filename, *, picker=None, root=None):
self.name = str(name)
self.picker = str(picker or "")
self.dataset = dataset(filename, str(root or "."))
def __call__(self, cls):
if not hasattr(cls, "__pick__"):
cls.__pick__ = pick
if not hasattr(cls, "__datasets__"):
cls.__datasets__ = {}
cls.__datasets__[self.name] = self.dataset
if self.picker:
setattr(cls, self.picker, partialmethod(pick, self.dataset))
return cls
class with_datasets:
def __init__(self, name, *others):
names = (name,) + others
self.names = [str(name) for name in names]
def __call__(self, func):
@wraps(func)
def _func(faker, *args, **kwargs):
args = func.datasets + args
return func(faker, *args, **kwargs)
def set_datasets(datasets):
try:
func.datasets = tuple(datasets[name] for name in self.names)
except KeyError as e:
raise ValueError(f"dataset not found: '{e.args[0]}'") from None
if hasattr(func, "set_datasets"):
func.set_datasets(datasets)
_func.set_datasets = set_datasets
return _func
class with_match:
def __init__(self, match):
self.match = match
def __call__(self, func):
@wraps(func)
def _func(faker, *args, **kwargs):
args = func.datasets + args[len(func.datasets) :]
return func(faker, *args, **kwargs)
def set_datasets(datasets):
if not hasattr(_func, "datasets"):
raise ValueError("Use @with_datasets first")
func.datasets = tuple([x for x in d if self.match(x)] for d in _func.datasets)
if hasattr(func, "set_datasets"):
func.set_datasets(datasets)
_func.set_datasets = set_datasets
return _func