def run()

in servicecatalog_puppet/workflow/portfolio/portfolio_management/copy_into_spoke_local_portfolio_task.py [0:0]


    def run(self):
        with self.input().get("create_spoke_local_portfolio").open("r") as f:
            spoke_portfolio = json.loads(f.read())
        portfolio_id = spoke_portfolio.get("Id")
        product_versions_that_should_be_copied = {}
        product_versions_that_should_be_updated = {}

        product_name_to_id_dict = {}
        with self.input().get("products_and_provisioning_artifacts").open("r") as f:
            products_and_provisioning_artifacts = json.loads(f.read())
            for product_view_summary in products_and_provisioning_artifacts:
                spoke_product_id = False
                target_product_id = False
                hub_product_name = product_view_summary.get("Name")

                for hub_provisioning_artifact_detail in product_view_summary.get(
                    "provisioning_artifact_details", []
                ):
                    if (
                        hub_provisioning_artifact_detail.get("Type")
                        == "CLOUD_FORMATION_TEMPLATE"
                    ):
                        product_versions_that_should_be_copied[
                            f"{hub_provisioning_artifact_detail.get('Name')}"
                        ] = hub_provisioning_artifact_detail
                        product_versions_that_should_be_updated[
                            f"{hub_provisioning_artifact_detail.get('Name')}"
                        ] = hub_provisioning_artifact_detail

                self.info(f"Copying {hub_product_name}")
                hub_product_arn = product_view_summary.get("ProductARN")
                copy_args = {
                    "SourceProductArn": hub_product_arn,
                    "CopyOptions": ["CopyTags",],
                }

                with self.spoke_regional_client(
                    "servicecatalog"
                ) as spoke_service_catalog:
                    p = None
                    try:
                        p = spoke_service_catalog.search_products_as_admin_single_page(
                            PortfolioId=portfolio_id,
                            Filters={"FullTextSearch": [hub_product_name]},
                        )
                    except spoke_service_catalog.exceptions.ResourceNotFoundException as e:
                        self.info(f"swallowing exception: {str(e)}")

                    if p is not None:
                        for spoke_product_view_details in p.get("ProductViewDetails"):
                            spoke_product_view = spoke_product_view_details.get(
                                "ProductViewSummary"
                            )
                            if spoke_product_view.get("Name") == hub_product_name:
                                spoke_product_id = spoke_product_view.get("ProductId")
                                product_name_to_id_dict[
                                    hub_product_name
                                ] = spoke_product_id
                                copy_args["TargetProductId"] = spoke_product_id
                                spoke_provisioning_artifact_details = spoke_service_catalog.list_provisioning_artifacts(
                                    ProductId=spoke_product_id
                                ).get(
                                    "ProvisioningArtifactDetails"
                                )
                                for (
                                    provisioning_artifact_detail
                                ) in spoke_provisioning_artifact_details:
                                    id_to_delete = (
                                        f"{provisioning_artifact_detail.get('Name')}"
                                    )
                                    if (
                                        product_versions_that_should_be_copied.get(
                                            id_to_delete, None
                                        )
                                        is not None
                                    ):
                                        self.info(
                                            f"{hub_product_name} :: Going to skip {spoke_product_id} {provisioning_artifact_detail.get('Name')}"
                                        )
                                        del product_versions_that_should_be_copied[
                                            id_to_delete
                                        ]

                    if len(product_versions_that_should_be_copied.keys()) == 0:
                        self.info(f"no versions to copy")
                    else:
                        self.info(f"about to copy product")

                        copy_args["SourceProvisioningArtifactIdentifiers"] = [
                            {"Id": a.get("Id")}
                            for a in product_versions_that_should_be_copied.values()
                        ]

                        self.info(f"about to copy product with args: {copy_args}")
                        copy_product_token = spoke_service_catalog.copy_product(
                            **copy_args
                        ).get("CopyProductToken")
                        while True:
                            time.sleep(5)
                            r = spoke_service_catalog.describe_copy_product_status(
                                CopyProductToken=copy_product_token
                            )
                            target_product_id = r.get("TargetProductId")
                            self.info(
                                f"{hub_product_name} status: {r.get('CopyProductStatus')}"
                            )
                            if r.get("CopyProductStatus") == "FAILED":
                                raise Exception(
                                    f"Copying "
                                    f"{hub_product_name} failed: {r.get('StatusDetail')}"
                                )
                            elif r.get("CopyProductStatus") == "SUCCEEDED":
                                break

                        self.info(
                            f"adding {target_product_id} to portfolio {portfolio_id}"
                        )
                        spoke_service_catalog.associate_product_with_portfolio(
                            ProductId=target_product_id, PortfolioId=portfolio_id,
                        )

                        # associate_product_with_portfolio is not a synchronous request
                        self.info(
                            f"waiting for adding of {target_product_id} to portfolio {portfolio_id}"
                        )
                        while True:
                            time.sleep(2)
                            response = spoke_service_catalog.search_products_as_admin_single_page(
                                PortfolioId=portfolio_id,
                            )
                            products_ids = [
                                product_view_detail.get("ProductViewSummary").get(
                                    "ProductId"
                                )
                                for product_view_detail in response.get(
                                    "ProductViewDetails"
                                )
                            ]
                            self.info(
                                f"Looking for {target_product_id} in {products_ids}"
                            )

                            if target_product_id in products_ids:
                                break

                        product_name_to_id_dict[hub_product_name] = target_product_id

                    product_id_in_spoke = spoke_product_id or target_product_id
                    spoke_provisioning_artifact_details = spoke_service_catalog.list_provisioning_artifacts(
                        ProductId=product_id_in_spoke
                    ).get(
                        "ProvisioningArtifactDetails", []
                    )
                    for (
                        version_name,
                        version_details,
                    ) in product_versions_that_should_be_updated.items():
                        self.info(
                            f"{version_name} is active: {version_details.get('Active')} in hub"
                        )
                        for (
                            spoke_provisioning_artifact_detail
                        ) in spoke_provisioning_artifact_details:
                            if (
                                spoke_provisioning_artifact_detail.get("Name")
                                == version_name
                            ):
                                self.info(
                                    f"Updating active of {version_name}/{spoke_provisioning_artifact_detail.get('Id')} "
                                    f"in the spoke to {version_details.get('Active')}"
                                )
                                spoke_service_catalog.update_provisioning_artifact(
                                    ProductId=product_id_in_spoke,
                                    ProvisioningArtifactId=spoke_provisioning_artifact_detail.get(
                                        "Id"
                                    ),
                                    Active=version_details.get("Active"),
                                )

        with self.output().open("w") as f:
            f.write(
                json.dumps(
                    {
                        "portfolio": spoke_portfolio,
                        "product_versions_that_should_be_copied": product_versions_that_should_be_copied,
                        "products": product_name_to_id_dict,
                    },
                    indent=4,
                    default=str,
                )
            )