lib/metric-config-parser/metric_config_parser/config.py (713 lines of code) (raw):
import datetime as dt
import shutil
import tempfile
from copy import deepcopy
from datetime import datetime
from pathlib import Path
from typing import List, Optional, Union
import attr
import jinja2
import toml
from git import Repo
from git.exc import InvalidGitRepositoryError
from git.objects.commit import Commit
from jinja2 import StrictUndefined
from pytz import UTC
from metric_config_parser.data_source import DataSourceDefinition
from metric_config_parser.definition import DefinitionSpec, DefinitionSpecSub
from metric_config_parser.function import FunctionsSpec
from metric_config_parser.monitoring import MonitoringSpec
from metric_config_parser.segment import SegmentDataSourceDefinition, SegmentDefinition
from .analysis import AnalysisSpec
from .errors import UnexpectedKeyConfigurationException
from .experiment import Channel, Experiment
from .metric import MetricDefinition
from .outcome import OutcomeSpec
from .sql import generate_data_source_sql, generate_metrics_sql
from .util import TemporaryDirectory
OUTCOMES_DIR = "outcomes"
DEFAULTS_DIR = "defaults"
DEFINITIONS_DIR = "definitions"
FUNCTIONS_FILE = "functions.toml"
JETSTREAM_CONFIG_URL = "https://github.com/mozilla/jetstream-config"
@attr.s(auto_attribs=True)
class Config:
"""Represent an external config file."""
slug: str
spec: DefinitionSpecSub
last_modified: dt.datetime
is_private: bool = False
def validate(self, configs: "ConfigCollection", experiment: Experiment) -> None:
if isinstance(self.spec, AnalysisSpec):
analysis_spec = AnalysisSpec.default_for_experiment(experiment, configs)
analysis_spec.merge(self.spec)
resolved = analysis_spec.resolve(experiment, configs)
# private configs need to override the default dataset
if self.is_private and resolved.experiment.dataset_id is None:
raise ValueError("dataset_id needs to be explicitly set for private experiments")
elif isinstance(self.spec, MonitoringSpec):
platform = (
experiment.app_name if experiment else self.spec.project.platform
) or "firefox_desktop"
monitoring_spec = MonitoringSpec.default_for_platform_or_type(
platform,
configs,
)
if experiment and experiment.is_rollout:
if platform == "firefox_desktop":
rollout_spec = MonitoringSpec.default_for_platform_or_type("rollout", configs)
monitoring_spec.merge(rollout_spec)
monitoring_spec.merge(self.spec)
monitoring_spec.resolve(experiment, configs)
def validate_config_settings(config_file: Path) -> None:
"""
Implemented to resolve a Github issue:
- https://github.com/mozilla/jetstream/issues/843
Loads external config file and runs a number of validation steps on it:
- Checks that all config keys/settings are lowercase
- Checks for missing core config keys
- Checks for unexpected core configuration keys
- Checks that all segments defined under experiment have configuration in segments section
- Checks if metric with custom config is defined in metrics.weekly or metrics.overall fields
Returns None, if issues found with the configuration an Exception is raised
"""
config = toml.loads(config_file.read_text())
optional_core_config_keys = (
"project",
"population",
"metrics",
"experiment",
"segments",
"data_sources",
"friendly_name",
"description",
"parameters",
"alerts",
"dimensions",
"functions",
)
core_config_keys_specified = config.keys()
# checks for unexpected core configuration keys
unexpected_config_keys = set(core_config_keys_specified) - set(optional_core_config_keys)
if unexpected_config_keys:
err_msg = (
f"Unexpected config key[s] found: {unexpected_config_keys}. "
f"config_file: {str(config_file).split('/')[-1]}"
)
raise UnexpectedKeyConfigurationException(err_msg)
return None
@attr.s(auto_attribs=True)
class DefaultConfig(Config):
"""
Represents an external config files with platform-specific defaults.
These config files are not associated to a specific experiment, since
they are applied to all experiments.
"""
def validate(self, configs: "ConfigCollection", _experiment: Experiment = None) -> None:
dummy_experiment = Experiment(
experimenter_slug="dummy-experiment",
normandy_slug="dummy_experiment",
type="v6",
status="Live",
branches=[],
end_date=None,
reference_branch="control",
is_high_population=False,
start_date=dt.datetime.now(UTC),
proposed_enrollment=14,
app_name=self.slug,
channel=Channel.NIGHTLY,
)
spec = AnalysisSpec.default_for_experiment(dummy_experiment, configs)
spec.merge(self.spec)
spec.resolve(dummy_experiment, configs)
@attr.s(auto_attribs=True)
class Outcome:
"""Represents an external outcome snippet."""
slug: str
spec: OutcomeSpec
platform: str
commit_hash: Optional[str]
is_private: bool = False
def validate(self, configs: "ConfigCollection") -> None:
dummy_experiment = Experiment(
experimenter_slug="dummy-experiment",
normandy_slug="dummy_experiment",
type="v6",
status="Live",
branches=[],
end_date=None,
reference_branch="control",
is_high_population=False,
start_date=dt.datetime.now(UTC),
proposed_enrollment=14,
app_name=self.platform,
channel=Channel.NIGHTLY,
)
spec = AnalysisSpec.default_for_experiment(dummy_experiment, configs)
spec.merge_outcome(self.spec)
spec.merge_parameters(self.spec.parameters)
spec.resolve(dummy_experiment, configs)
@attr.s(auto_attribs=True)
class DefinitionConfig(Config):
"""
Represents an definition config file with definition that can be referenced in other configs.
"""
platform: str = "firefox_desktop"
def validate(self, configs: "ConfigCollection", _experiment: Experiment = None) -> None:
dummy_experiment = Experiment(
experimenter_slug="dummy-experiment",
normandy_slug="dummy_experiment",
type="v6",
status="Live",
branches=[],
end_date=None,
reference_branch="control",
is_high_population=False,
start_date=dt.datetime.now(UTC),
proposed_enrollment=14,
app_name=self.platform,
channel=Channel.NIGHTLY,
)
if not isinstance(self.spec, DefinitionSpec):
# this should not happen
raise ValueError("Incorrect result type when parsing definition config")
analysis_spec = AnalysisSpec.from_definition_spec(self.spec)
analysis_spec.resolve(dummy_experiment, configs)
def entity_from_path(
path: Path, is_private: bool = False
) -> Union[Config, Outcome, DefaultConfig, DefinitionConfig, FunctionsSpec]:
is_outcome = path.parent.parent.name == OUTCOMES_DIR
is_default_config = path.parent.name == DEFAULTS_DIR
is_definition_config = path.parent.name == DEFINITIONS_DIR
slug = path.stem
validate_config_settings(path)
config_dict = toml.loads(path.read_text())
if is_outcome:
platform = path.parent.name
spec = OutcomeSpec.from_dict(config_dict)
return Outcome(
slug=slug,
spec=spec,
platform=platform,
commit_hash=None,
is_private=is_private,
)
elif is_default_config:
if "project" in config_dict:
# config is from opmon
return DefaultConfig(
slug=slug,
spec=MonitoringSpec.from_dict(config_dict),
last_modified=dt.datetime.fromtimestamp(path.stat().st_mtime, UTC),
is_private=is_private,
)
else:
return DefaultConfig(
slug=slug,
spec=AnalysisSpec.from_dict(config_dict),
last_modified=dt.datetime.fromtimestamp(path.stat().st_mtime, UTC),
is_private=is_private,
)
elif is_definition_config:
if path.name == FUNCTIONS_FILE:
return FunctionsSpec.from_dict(config_dict)
else:
return DefinitionConfig(
slug=slug,
spec=DefinitionSpec.from_dict(config_dict),
last_modified=dt.datetime.fromtimestamp(path.stat().st_mtime, UTC),
platform=slug,
is_private=is_private,
)
if "project" in config_dict:
# config is from opmon
return Config(
slug=slug,
spec=MonitoringSpec.from_dict(config_dict),
last_modified=dt.datetime.fromtimestamp(path.stat().st_mtime, UTC),
is_private=is_private,
)
else:
return Config(
slug=slug,
spec=AnalysisSpec.from_dict(config_dict),
last_modified=dt.datetime.fromtimestamp(path.stat().st_mtime, UTC),
is_private=is_private,
)
@attr.s(auto_attribs=True)
class Repository:
"""Local repository config files are loaded from."""
path: Path
repo: Repo
main_branch: str
# indicates whether repository lives in a temporary directory that should be removed on del
is_tmp_repo: bool = False
commit_hash: str = "HEAD"
def __del__(self):
# remove the temporary directories repos have been saved in
if self.is_tmp_repo:
git_dir = Path(self.repo.git_dir)
# don't delete repos that come from local directories
if git_dir.parent.exists():
shutil.rmtree(git_dir.parent)
@attr.s(auto_attribs=True)
class ConfigCollection:
"""
Collection of experiment-specific configurations pulled in
from an external GitHub repository.
"""
configs: List[Config] = attr.Factory(list)
outcomes: List[Outcome] = attr.Factory(list)
defaults: List[DefaultConfig] = attr.Factory(list)
definitions: List[DefinitionConfig] = attr.Factory(list)
functions: Optional[FunctionsSpec] = None
repos: List[Repository] = [] # repos configs were loaded from
is_private: bool = False
repo_url = "https://github.com/mozilla/metric-hub"
@classmethod
def from_github_repo(
cls,
repo_url: Optional[str] = None,
is_private: bool = False,
path: Optional[str] = None,
depth: Optional[int] = None,
) -> "ConfigCollection":
"""Pull in external config files."""
# download files to a persisted tmp directory
tmp_dir = None
is_tmp_repo = False
if repo_url is not None and Path(repo_url).exists() and Path(repo_url).is_dir():
if path:
repo = Repo(repo_url)
else:
repo_path = Path(repo_url)
dir_path = Path()
depth = depth or 3
# check if parent folder is a repository until search depth is exceeded
while depth > 0:
try:
repo = Repo(repo_path)
except InvalidGitRepositoryError:
dir_path = Path(repo_path.name) / dir_path
repo_path = repo_path.parent
depth -= 1
else:
depth = 0
repo = Repo(repo_path)
path = str(dir_path)
repo_url = str(repo_path)
tmp_dir = Path(repo_url)
else:
tmp_dir = Path(tempfile.mkdtemp())
is_tmp_repo = True
if repo_url is not None and "/tree/" in repo_url:
if not repo_url.endswith("/"):
repo_url += "/"
repo_url, tree = repo_url.split("/tree/")
branch, path = tree.split("/", 1)
repo = Repo.clone_from(repo_url or cls.repo_url, tmp_dir)
repo.git.checkout(branch)
else:
repo = Repo.clone_from(repo_url or cls.repo_url, tmp_dir)
return ConfigCollection.from_local_repo(
repo=repo,
path=path,
is_private=is_private,
main_branch=repo.active_branch.name,
is_tmp_repo=is_tmp_repo,
)
@classmethod
def from_github_repos(
cls, repo_urls: Optional[List[str]] = None, is_private: bool = False
) -> "ConfigCollection":
"""Load configs from the provided repos."""
if repo_urls is None or len(repo_urls) < 1:
return ConfigCollection.from_github_repo()
configs = None
for repo in repo_urls:
if configs is None:
configs = ConfigCollection.from_github_repo(repo, is_private=is_private)
else:
collection = ConfigCollection.from_github_repo(repo, is_private=is_private)
configs.merge(collection)
return configs or ConfigCollection.from_github_repo()
@classmethod
def from_local_repo(
cls, repo, path, is_private, main_branch, is_tmp_repo=False
) -> "ConfigCollection":
"""Load configs from a local repository."""
if path:
files_path = Path(repo.git_dir).parent / path
else:
files_path = Path(repo.git_dir).parent
external_configs = []
for config_file in files_path.glob("*.toml"):
last_modified = next(repo.iter_commits("HEAD", paths=config_file)).committed_date
config_json = toml.load(config_file)
if "project" in config_json:
# opmon spec
spec: DefinitionSpecSub = MonitoringSpec.from_dict(config_json)
else:
spec = AnalysisSpec.from_dict(config_json)
spec.experiment.is_private = spec.experiment.is_private or is_private
external_configs.append(
Config(
config_file.stem,
spec,
UTC.localize(dt.datetime.utcfromtimestamp(last_modified)),
is_private=is_private,
)
)
outcomes = []
for outcome_file in files_path.glob(f"{OUTCOMES_DIR}/*/*.toml"):
commit_hash = next(repo.iter_commits("HEAD", paths=outcome_file)).hexsha
outcomes.append(
Outcome(
slug=outcome_file.stem,
spec=OutcomeSpec.from_dict(toml.load(outcome_file)),
platform=outcome_file.parent.name,
commit_hash=commit_hash,
is_private=is_private,
)
)
default_configs = []
for default_config_file in files_path.glob(f"{DEFAULTS_DIR}/*.toml"):
last_modified = next(
repo.iter_commits("HEAD", paths=default_config_file)
).committed_date
default_config_json = toml.load(default_config_file)
if "project" in default_config_json:
# opmon spec
spec = MonitoringSpec.from_dict(default_config_json)
else:
spec = AnalysisSpec.from_dict(default_config_json)
spec.experiment.is_private = spec.experiment.is_private or is_private
default_configs.append(
DefaultConfig(
default_config_file.stem,
spec,
UTC.localize(dt.datetime.utcfromtimestamp(last_modified)),
is_private=is_private,
)
)
definitions = []
for definitions_config_file in files_path.glob(f"{DEFINITIONS_DIR}/*.toml"):
last_modified = next(
repo.iter_commits("HEAD", paths=definitions_config_file)
).committed_date
definitions.append(
DefinitionConfig(
definitions_config_file.stem,
DefinitionSpec.from_dict(toml.load(definitions_config_file)),
UTC.localize(dt.datetime.utcfromtimestamp(last_modified)),
platform=definitions_config_file.stem,
is_private=is_private,
)
)
functions_spec = None
for functions_file in files_path.glob(f"{DEFINITIONS_DIR}/{FUNCTIONS_FILE}"):
functions_spec = FunctionsSpec.from_dict(toml.load(functions_file))
return cls(
external_configs,
outcomes,
default_configs,
definitions,
functions_spec,
repos=[
Repository(
repo=repo,
path=path,
main_branch=main_branch,
is_tmp_repo=is_tmp_repo,
)
],
is_private=is_private,
)
def as_of(self, timestamp: datetime) -> "ConfigCollection":
"""Get configs as they were at the provided timestamp."""
if timestamp is None:
return self
config_collection = None
# configs can be loaded from multiple different repos
for repo in self.repos:
# copy the original repo to a temporary path were we can go back in history
with TemporaryDirectory() as tmp_dir:
git_dir = Path(repo.repo.git_dir)
# copy over repo content, except for lib/*, which doesn't contain metric information
shutil.copytree(
git_dir.parent,
tmp_dir,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns("lib*"),
)
tmp_repo = Repo(tmp_dir)
# find the commit that got added just before the `timestamp`
rev = "HEAD" # use the HEAD commit by default if no other commits exist
# make sure the most recent commit is checked out by checking out the main branch
tmp_repo.git.checkout(repo.main_branch)
# keep track of more recent commits to go back to in case invalid configs
# got checked into main that cannot be parsed
newer_commits: List[Commit] = []
# start iterating through all commits starting at HEAD
for commit in tmp_repo.iter_commits("HEAD"):
# check if the commit timestamp is older than the reference timestamp
if commit.committed_datetime <= timestamp:
rev = commit.hexsha
break
newer_commits.insert(0, commit)
if commit:
# if there is no commit that is older than the reference timestamp,
# use the oldest commit that we could find (= last commit)
rev = commit.hexsha
# checkout that commit
tmp_repo.git.checkout(rev)
# load configs as they were at the time of the commit
try:
configs = self.from_local_repo(
tmp_repo,
repo.path,
self.is_private,
repo.main_branch,
is_tmp_repo=True,
)
except Exception as e:
could_load_configs = False
# iterate through newer commits to find one that can be parsed
for newer_commit in newer_commits:
tmp_repo.git.checkout(newer_commit.hexsha)
try:
configs = self.from_local_repo(
tmp_repo,
repo.path,
self.is_private,
repo.main_branch,
is_tmp_repo=True,
)
could_load_configs = True
rev = newer_commit.hexsha
break
except Exception:
# continue searching
pass
if not could_load_configs:
# there is no newer commit, current state is broken
raise e
repo.commit_hash = rev
configs.repos = [repo] # point to the original repo, instead of the temporary one
if config_collection is None:
config_collection = configs
else:
config_collection.merge(configs)
if config_collection is None:
return self
return config_collection
def spec_for_outcome(self, slug: str, platform: str) -> Optional[OutcomeSpec]:
"""Return the spec for a specific outcome"""
for outcome in self.outcomes:
if outcome.slug == slug and outcome.platform == platform:
return outcome.spec
return None
def spec_for_experiment(self, slug: str) -> Optional[AnalysisSpec]:
"""Return the spec for a specific experiment."""
for config in self.configs:
if config.slug == slug:
if isinstance(config.spec, AnalysisSpec):
return config.spec
return None
def spec_for_project(self, slug: str) -> Optional[MonitoringSpec]:
"""Return the spec for a specific project."""
for config in self.configs:
if config.slug == slug:
if isinstance(config.spec, MonitoringSpec):
return config.spec
return None
def get_platform_defaults(self, platform: str) -> Optional[DefinitionSpecSub]:
for default in self.defaults:
if platform == default.slug:
return default.spec
return None
def get_platform_definitions(self, platform: str) -> Optional[DefinitionSpecSub]:
for definition in self.definitions:
if platform == definition.slug:
return definition.spec
return None
def get_metric_definition(self, slug: str, app_name: str) -> Optional[MetricDefinition]:
for definition in self.definitions:
if app_name == definition.platform:
for metric_slug, metric in definition.spec.metrics.definitions.items():
if metric_slug == slug:
return metric
return None
def get_data_source_definition(
self, slug: str, app_name: str
) -> Optional[DataSourceDefinition]:
for definition in self.definitions:
if app_name == definition.platform:
for (
data_source_slug,
data_source,
) in definition.spec.data_sources.definitions.items():
if data_source_slug == slug:
return data_source
return None
def get_segment_data_source_definition(
self, slug: str, app_name: str
) -> Optional[SegmentDataSourceDefinition]:
for definition in self.definitions:
if app_name == definition.platform:
if not isinstance(definition.spec, MonitoringSpec):
for (
segment_source_slug,
segment_source,
) in definition.spec.segments.data_sources.items():
if segment_source_slug == slug:
return segment_source
return None
def get_segment_definition(self, slug: str, app_name: str) -> Optional[SegmentDefinition]:
for definition in self.definitions:
if app_name == definition.platform:
if not isinstance(definition.spec, MonitoringSpec):
for (
segment_slug,
segment,
) in definition.spec.segments.definitions.items():
if segment_slug == slug:
return segment
return None
def get_segments_for_app(self, app_name: str) -> Optional[List[SegmentDefinition]]:
segments: List[SegmentDefinition] = []
for definition in self.definitions:
if app_name == definition.platform and not isinstance(definition.spec, MonitoringSpec):
segments.extend(definition.spec.segments.definitions.values())
return segments
def get_metrics_sql(
self,
metrics: List[str],
platform: str,
group_by: List[str] = [],
where: Optional[str] = None,
group_by_client_id: bool = True,
group_by_submission_date: bool = True,
) -> str:
"""Generate a SQL query for the specified metrics."""
return generate_metrics_sql(
self,
metrics=metrics,
platform=platform,
group_by=group_by,
where=where,
group_by_client_id=group_by_client_id,
group_by_submission_date=group_by_submission_date,
)
def get_data_source_sql(
self,
data_source: str,
platform: str,
where: Optional[str] = None,
select_fields: bool = True,
ignore_joins: bool = False,
) -> str:
return generate_data_source_sql(
self,
data_source=data_source,
platform=platform,
where=where,
select_fields=select_fields,
ignore_joins=ignore_joins,
)
def get_env(self) -> jinja2.Environment:
"""
Create a Jinja2 environment that understands the SQL agg_* helpers in mozanalysis.metrics.
Just a wrapper to avoid leaking temporary variables to the module scope."""
env = jinja2.Environment(autoescape=False, undefined=StrictUndefined)
if self.functions is not None:
for slug, function in self.functions.functions.items():
env.globals[slug] = function.definition
return env
def merge(self, other: "ConfigCollection"):
"""
Merge this config collection with another.
Configs in `other` will take precedence.
"""
# merge configs
other_configs = {config.slug: config for config in deepcopy(other.configs)}
configs = {config.slug: config for config in deepcopy(self.configs)}
for slug, config in other_configs.items():
if slug not in configs:
configs[slug] = config
else:
configs[slug].spec.merge(config.spec)
self.configs = list(configs.values())
# merge outcomes
outcomes = deepcopy(other.outcomes)
slugs = [outcome.slug for outcome in outcomes]
for outcome in self.outcomes:
if outcome.slug not in slugs:
outcomes.append(outcome)
self.outcomes = outcomes
# merge definitions
other_definitions = {
definition.slug: definition for definition in deepcopy(other.definitions)
}
definitions = {
definition.slug: deepcopy(definition) for definition in deepcopy(self.definitions)
}
for slug, definition in other_definitions.items():
if slug not in definitions:
definitions[slug] = deepcopy(definition)
else:
definitions[slug].spec.merge(definition.spec)
self.definitions = list(definitions.values())
# merge defaults
other_defaults = {default.slug: default for default in deepcopy(other.defaults)}
defaults = {default.slug: default for default in deepcopy(self.defaults)}
for slug, default in other_defaults.items():
if slug not in defaults:
defaults[slug] = deepcopy(default)
else:
defaults[slug].spec.merge(default.spec)
self.defaults = list(defaults.values())
# merge functions
functions = deepcopy(other.functions.functions) if other.functions else {}
slugs = [slug for slug, _ in functions.items()]
for slug, function in self.functions.functions.items() if self.functions else {}:
if slug not in slugs:
functions[slug] = function
if self.functions is None:
self.functions = FunctionsSpec(functions=functions)
else:
self.functions.functions = functions
self.repos += other.repos
@attr.s(auto_attribs=True)
class LocalConfigCollection(ConfigCollection):
"""
Collection of experiment-specific configurations pulled in
from a local directory (structured like metric-hub).
Unless you are Experimenter, you likely want to use ConfigCollection instead.
"""
@classmethod
def from_local_path(cls, path: str, is_private: bool = False) -> "ConfigCollection":
"""Load configs from a local non-repo copy of a metric-hub-like folder structure.
Args:
path (str): path to the configs. Looks for TOML files in the
following locations (all of which are optional):
- . (root) - creates Config
- outcomes - creates Outcome
- defaults - creates DefaultConfig
- definitions - creates DefinitionConfig
is_private (bool): whether the configs are private
"""
files_path = Path(path)
external_configs = []
last_modified = dt.datetime.now()
for config_file in files_path.glob("*.toml"):
config_json = toml.load(config_file)
if "project" in config_json:
# opmon spec
spec: DefinitionSpecSub = MonitoringSpec.from_dict(config_json)
else:
spec = AnalysisSpec.from_dict(config_json)
spec.experiment.is_private = spec.experiment.is_private or is_private
external_configs.append(
Config(
config_file.stem,
spec,
last_modified,
is_private=is_private,
)
)
outcomes = []
for outcome_file in files_path.glob(f"{OUTCOMES_DIR}/*/*.toml"):
outcomes.append(
Outcome(
slug=outcome_file.stem,
spec=OutcomeSpec.from_dict(toml.load(outcome_file)),
platform=outcome_file.parent.name,
commit_hash=None,
is_private=is_private,
)
)
default_configs = []
for default_config_file in files_path.glob(f"{DEFAULTS_DIR}/*.toml"):
default_config_json = toml.load(default_config_file)
if "project" in default_config_json:
# opmon spec
spec = MonitoringSpec.from_dict(default_config_json)
else:
spec = AnalysisSpec.from_dict(default_config_json)
spec.experiment.is_private = spec.experiment.is_private or is_private
default_configs.append(
DefaultConfig(
default_config_file.stem,
spec,
last_modified,
is_private=is_private,
)
)
definitions = []
for definitions_config_file in files_path.glob(f"{DEFINITIONS_DIR}/*.toml"):
definitions.append(
DefinitionConfig(
definitions_config_file.stem,
DefinitionSpec.from_dict(toml.load(definitions_config_file)),
last_modified,
platform=definitions_config_file.stem,
is_private=is_private,
)
)
functions_spec = None
for functions_file in files_path.glob(f"{DEFINITIONS_DIR}/{FUNCTIONS_FILE}"):
functions_spec = FunctionsSpec.from_dict(toml.load(functions_file))
return cls(
external_configs,
outcomes,
default_configs,
definitions,
functions_spec,
repos=[],
is_private=is_private,
)
@classmethod
def from_github_repo(
cls,
repo_url: Optional[str] = None,
is_private: bool = False,
path: Optional[str] = None,
depth: Optional[int] = None,
):
raise NotImplementedError(
"`from_github_repo` is not valid for non-repo-based LocalConfigCollection. "
+ "Use ConfigCollection for this feature."
)
@classmethod
def from_github_repos(
cls, repo_urls: Optional[List[str]] = None, is_private: bool = False
) -> "ConfigCollection":
raise NotImplementedError(
"`from_github_repos` is not valid for non-repo-based LocalConfigCollection. "
+ "Use ConfigCollection for this feature."
)
@classmethod
def from_local_repo(
cls, repo, path, is_private, main_branch, is_tmp_repo=False
) -> "ConfigCollection":
raise NotImplementedError(
"`from_local_repo` is not valid for non-repo-based LocalConfigCollection. "
+ "Use ConfigCollection for this feature."
)
@classmethod
def as_of(self, timestamp: datetime) -> "ConfigCollection":
raise NotImplementedError(
"`as_of` is not valid for non-repo-based LocalConfigCollection. "
+ "Use ConfigCollection for this feature."
)