rostran/core/template.py (153 lines of code) (raw):
import json
import logging
from typing import Optional, Union
from functools import partial
import typer
from ruamel.yaml import YAML
from .exceptions import InvalidRosTemplateFormatVersion
from .format import FileFormat, TargetTemplateFormat
from .parameters import Parameters
from .resources import Resources
from .outputs import Outputs
from .metadata import MetaData
from .rules import Rules
from .conditions import Conditions
from .mappings import Mappings
from .workspace import Workspace
logger = logging.getLogger(__name__)
yaml = YAML()
yaml.preserve_quotes = True
class Template:
def __init__(self, source, *args, **kwargs):
self.source = source
@classmethod
def initialize(cls, path: str, format: FileFormat):
pass
def transform(self):
pass
class RosTemplate:
(
ROS_TEMPLATE_FORMAT_VERSION,
TRANSFORM,
DESCRIPTION,
CONDITIONS,
MAPPINGS,
PARAMETERS,
RESOURCES,
OUTPUTS,
RULES,
METADATA,
WORKSPACE,
) = (
"ROSTemplateFormatVersion",
"Transform",
"Description",
"Conditions",
"Mappings",
"Parameters",
"Resources",
"Outputs",
"Rules",
"Metadata",
"Workspace",
)
def __init__(
self,
description: Optional[Union[str, dict]] = None,
metadata: Optional[MetaData] = None,
mappings: Optional[Mappings] = None,
conditions: Optional[Conditions] = None,
parameters: Optional[Parameters] = None,
resources: Optional[Resources] = None,
outputs: Optional[Outputs] = None,
rules: Optional[Rules] = None,
transform: Optional[str] = None,
workspace: Optional[Workspace] = None,
):
self.description = description
self.metadata = metadata if metadata is not None else MetaData()
self.mappings = mappings if mappings is not None else Mappings()
self.conditions = conditions if conditions is not None else Conditions()
self.parameters = parameters if parameters is not None else Parameters()
self.resources = resources if resources is not None else Resources()
self.outputs = outputs if outputs is not None else Outputs()
self.rules = rules if rules is not None else Rules()
self.transform = transform
self.workspace = workspace
@classmethod
def initialize(cls, data: dict):
if cls.ROS_TEMPLATE_FORMAT_VERSION not in data:
raise InvalidRosTemplateFormatVersion(
reason=f"{cls.ROS_TEMPLATE_FORMAT_VERSION} is required"
)
if data.get(cls.ROS_TEMPLATE_FORMAT_VERSION) != "2015-09-01":
raise InvalidRosTemplateFormatVersion(
reason=f"{cls.ROS_TEMPLATE_FORMAT_VERSION} can only be 2015-09-01"
)
metadata = mappings = conditions = parameters = resources = outputs = rules = (
workspace
) = None
transform = data.get(cls.TRANSFORM)
description = data.get(cls.DESCRIPTION)
if cls.CONDITIONS in data:
conditions = Conditions.initialize(data[cls.CONDITIONS])
if cls.MAPPINGS in data:
mappings = Mappings.initialize(data[cls.MAPPINGS])
if cls.PARAMETERS in data:
parameters = Parameters.initialize(data[cls.PARAMETERS])
if cls.RESOURCES in data:
resources = Resources.initialize(data[cls.RESOURCES])
if cls.OUTPUTS in data:
outputs = Outputs.initialize(data[cls.OUTPUTS])
if cls.RULES in data:
rules = Rules.initialize(data[cls.RULES])
if cls.METADATA in data:
metadata = MetaData.initialize(data[cls.METADATA])
if cls.WORKSPACE in data:
workspace = Workspace.initialize(data[cls.WORKSPACE])
return cls(
description,
metadata,
mappings,
conditions,
parameters,
resources,
outputs,
rules,
transform,
workspace,
)
def as_dict(self, format=False):
data = {self.ROS_TEMPLATE_FORMAT_VERSION: "2015-09-01"}
if self.transform:
data[self.TRANSFORM] = self.transform
if self.description:
data[self.DESCRIPTION] = self.description
if self.conditions:
data[self.CONDITIONS] = self.conditions.as_dict(format)
if self.mappings:
data[self.MAPPINGS] = self.mappings.as_dict(format)
if self.parameters:
data[self.PARAMETERS] = self.parameters.as_dict(format, self.metadata)
if self.resources:
data[self.RESOURCES] = self.resources.as_dict(format)
if self.outputs:
data[self.OUTPUTS] = self.outputs.as_dict(format)
if self.rules:
data[self.RULES] = self.rules.as_dict(format)
if self.metadata:
data[self.METADATA] = self.metadata.as_dict(format)
if self.workspace:
data[self.WORKSPACE] = self.workspace.as_dict(format)
return data
def save(self, target_path, target_format: TargetTemplateFormat):
typer.secho(f"Save template to {target_path}.", fg="green")
if target_format == TargetTemplateFormat.Yaml:
dump = partial(yaml.dump)
kwargs = {}
else:
dump = json.dump
kwargs = {"indent": 2}
data = self.as_dict(format=True)
with open(target_path, "w") as f:
dump(data, f, **kwargs)