in mozci/data/base.py [0:0]
def get(self, name: str, **context: Any) -> Union[Dict[Any, Any], List[Any]]:
"""Given a contract, find the first registered source that supports it
run it and return the results.
Args:
name (str): Name of the contract to run.
context (dict): Context to pass into the contract as defined by `Contract.schema_in`.
Returns:
dict: The output of the contract as defined by `Contract.schema_out`.
"""
logger.trace(f"Handling contract '{name}'")
if name not in all_contracts:
raise ContractNotFound(name)
# Validate input.
contract = all_contracts[name]
contract.validate_in(context)
for src in self.sources:
if name in src.supported_contracts:
logger.trace(f"Fulfilling with '{src.name}' source")
try:
result = src.get(name, **context)
except ContractNotFilled as e:
logger.trace(f"{e.msg}.. trying next source.")
continue
break
else:
raise SourcesNotFound(name, context)
# Validate output.
try:
contract.validate_out(result)
except validx.exc.SchemaError:
logger.error(f"Result from contract '{name}' does not conform to schema!")
raise
return result