tools/connector.py (37 lines of code) (raw):

import shutil from pathlib import Path import click import jinja2 ROOT_DIR = (Path(__file__).parent / "..").resolve() TEMPLATES_DIR = ROOT_DIR / "tools" / "templates" CONNECTOR_DIR = ROOT_DIR / "connectors" CI_WORKFLOW_TEMPLATE_NAME = "ci_workflow.yaml" def copy_connector_template(connector_name: str, destination: str): """Copy job template files to jobs directory.""" try: shutil.copytree( src=TEMPLATES_DIR, dst=Path(destination) / connector_name, ) except FileExistsError: raise ValueError(f"Connector with name {connector_name} already exists.") # generate CI config for connector template_loader = jinja2.FileSystemLoader(TEMPLATES_DIR) template_env = jinja2.Environment(loader=template_loader) try: ci_workflow_template = template_env.get_template(CI_WORKFLOW_TEMPLATE_NAME) except jinja2.exceptions.TemplateNotFound: raise FileNotFoundError( f"{CI_WORKFLOW_TEMPLATE_NAME} must be in {TEMPLATES_DIR}" ) ci_workflow_text = ci_workflow_template.render(connector_name=connector_name) with open(Path(destination) / connector_name / CI_WORKFLOW_TEMPLATE_NAME, "w") as f: f.write(ci_workflow_text) @click.group(help="Commands for managing connectors.") def connector(): """Create the CLI group for the connector command.""" pass @connector.command(help="""Create a new custom Fivetran connector.""") @click.argument("connector_name") @click.option( "--destination", "-d", help="Destination directory", default=CONNECTOR_DIR ) def create(connector_name: str, destination: str): copy_connector_template(connector_name, destination)