def show_pipelines()

in servicecatalog_factory/commands/show_pipelines.py [0:0]


def show_pipelines(p, format):
    types_of_file = ["apps", "portfolios", "stacks", "workspaces"]

    pipelines_to_check = dict(
        core={"servicecatalog-factory-pipeline": dict()},
        apps=dict(),
        portfolios=dict(),
        stacks=dict(),
        workspaces=dict(),
    )
    for type_of_file in types_of_file:
        target_dir = os.path.sep.join([p, type_of_file])
        if os.path.exists(target_dir):
            for file_name in os.listdir(target_dir):
                content = yaml.safe_load(
                    open(os.path.sep.join([target_dir, file_name]), "r").read()
                )
                if type_of_file == "apps":
                    for app in content.get("Apps", []):
                        for version in app.get("Versions", []):
                            pipeline_name = (
                                f"app--{app.get('Name')}-{version.get('Name')}-pipeline"
                            )
                            pipelines_to_check["apps"][pipeline_name] = dict()
                elif type_of_file == "stacks":
                    for stack in content.get("Stacks", []):
                        for version in stack.get("Versions", []):
                            pipeline_name = f"stack--{stack.get('Name')}-{version.get('Name')}-pipeline"
                            pipelines_to_check["stacks"][pipeline_name] = dict()
                elif type_of_file == "workspaces":
                    for workspace in content.get("Workspaces", []):
                        for version in workspace.get("Versions", []):
                            pipeline_name = f"workspace--{workspace.get('Name')}-{version.get('Name')}-pipeline"
                            pipelines_to_check["workspaces"][pipeline_name] = dict()
                elif type_of_file == "portfolios":
                    p_name = file_name.split(".")[0]
                    portfolios_file_path = os.path.sep.join([target_dir, file_name])
                    portfolios = generate_portfolios(portfolios_file_path)
                    for portfolio in portfolios.get("Portfolios", []):
                        nested_products = portfolio.get("Products", []) + portfolio.get(
                            "Components", []
                        )
                        for product in nested_products:
                            for version in product.get("Versions", []):
                                pipeline_name = f"{p_name}-{portfolio.get('DisplayName')}-{product.get('Name')}-{version.get('Name')}-pipeline"
                                pipelines_to_check["portfolios"][pipeline_name] = dict()
                    for product in portfolios.get("Products", []):
                        for version in product.get("Versions", []):
                            pipeline_name = (
                                f"{product.get('Name')}-{version.get('Name')}-pipeline"
                            )
                            pipelines_to_check["portfolios"][pipeline_name] = dict()

    fake_date = datetime.datetime.now()
    with betterboto_client.ClientContextManager("codepipeline") as codepipeline:
        for type, pipelines in pipelines_to_check.items():
            for pipeline_name, pipeline_details in pipelines.items():
                try:
                    executions = codepipeline.list_pipeline_executions(
                        pipelineName=pipeline_name, maxResults=5
                    ).get("pipelineExecutionSummaries", [])
                except codepipeline.exceptions.PipelineNotFoundException as e:
                    executions = []

                if len(executions) > 0:
                    last_execution = executions[0]
                else:
                    last_execution = dict(
                        status="N/A",
                        lastUpdateTime=fake_date,
                        startTime=fake_date,
                        sourceRevisions=[dict(revisionId="N/A", revisionSummary="N/A")],
                    )

                trend = list()
                for execution in executions:
                    trend.append(
                        dict(
                            start_time=execution.get("startTime", fake_date),
                            duration=execution.get("lastUpdateTime", fake_date)
                            - execution.get("startTime", fake_date),
                            status=execution.get("status"),
                        )
                    )

                source_revisions = last_execution.get("sourceRevisions")
                if len(source_revisions) == 0:
                    source_revisions.append(
                        dict(revisionId="N/A", revisionSummary="N/A")
                    )

                pipelines[pipeline_name] = {
                    "name": pipeline_name,
                    "pipeline_execution_id": last_execution.get(
                        "pipelineExecutionId", "N/A"
                    ),
                    "start_time": last_execution.get("startTime", fake_date),
                    "status": last_execution.get("status"),
                    "revision_id": source_revisions[0].get("revisionId", "N/A"),
                    "revision_summary": source_revisions[0].get(
                        "revisionSummary", "N/A"
                    ),
                    "duration": last_execution.get("lastUpdateTime")
                    - last_execution.get("startTime"),
                    "trend": trend,
                }

    if format == "json":
        click.echo(json.dumps(pipelines_to_check, indent=4, default=str))
    elif format == "table":
        table_data = [
            [
                "Type",
                "Name",
                "Execution Id",
                "Start Time",
                "Status",
                "Last Commit Id",
                "Last Commit Message",
                "Duration",
                "Trend",
            ],
        ]
        for type, pipelines in pipelines_to_check.items():
            for pipeline_name, pipeline_details in pipelines.items():
                table_data.append(
                    [
                        type,
                        pipeline_details.get("name"),
                        pipeline_details.get("pipeline_execution_id"),
                        pipeline_details.get("start_time"),
                        pipeline_details.get("status"),
                        pipeline_details.get("revision_id"),
                        pipeline_details.get("revision_summary"),
                        pipeline_details.get("duration"),
                        ", ".join(
                            [
                                f"{a.get('status')}"
                                for a in pipeline_details.get("trend", [])
                            ]
                        ),
                    ]
                )
        table = terminaltables.AsciiTable(table_data)
        click.echo(table.table)
    elif format == "html":
        content = ""
        content += "<table class='pipes'>"
        content += "    <tr>"
        content += "        <th>Name</th>"
        content += "        <th>Status</th>"
        content += "        <th>Start Time</th>"
        content += "        <th>Revision Id</th>"
        content += "        <th>Revision Summary</th>"
        content += "        <th>Duration</th>"
        content += "        <th>Trend</th>"
        content += "    </tr>"

        for type, pipelines in pipelines_to_check.items():
            for pipeline_name, pipeline_details in pipelines.items():
                result = result_look_up.get(pipeline_details.get("status"), "amber")
                trend = ", ".join(
                    [f"{a.get('status')[0]}" for a in pipeline_details.get("trend", [])]
                )
                content += f"    <tr class='{result}'>"
                content += f"        <td>{pipeline_details.get('name')}</td>"
                content += f"        <td>{pipeline_details.get('status')}</td>"
                content += f"        <td>{pipeline_details.get('start_time')}</td>"
                content += f"        <td>{pipeline_details.get('revision_id')}</td>"
                content += (
                    f"        <td>{pipeline_details.get('revision_summary')}</td>"
                )
                content += f"        <td>{pipeline_details.get('duration')}</td>"
                content += f"        <td>{trend}</td>"
                content += "    </tr>"

        content += "</table>"
        template = utils.ENV.get_template(constants.STATIC_HTML_PAGE)
        rendered = template.render(title="Factory Pipelines", content=content)
        click.echo(rendered)