def __init__()

in src/lambda_codebase/initial_commit/bootstrap_repository/adf-build/shared/cdk/cdk_constructs/adf_events.py [0:0]


    def __init__(self, scope: core.Construct, id: str, params: dict, **kwargs):  # pylint: disable=W0622
        super().__init__(scope, id, **kwargs)
        # pylint: disable=no-value-for-parameter
        stack = core.Stack.of(self)
        _pipeline = _codepipeline.Pipeline.from_pipeline_arn(self, 'pipeline', params["pipeline"])
        _source_account = params.get('source', {}).get('account_id')
        _provider = params.get('source', {}).get('provider')
        _add_trigger_on_changes = (
            _provider == 'codecommit'
            and _source_account
            and params.get('source', {}).get('trigger_on_changes')
            and not params.get('source', {}).get('poll_for_changes')
        )

        name = params.get('name')
        account_id = params['source']['account_id']
        repo_name = params['source']['repo_name']

        if _add_trigger_on_changes:
            _event = _events.Rule(
                self,
                f'trigger_{name}',
                description=f'Triggers {name} on changes in source CodeCommit repository',
                event_pattern=_events.EventPattern(
                    resources=[
                        f'arn:{stack.partition}:codecommit:{ADF_DEPLOYMENT_REGION}:{account_id}:{repo_name}'
                    ],
                    source=["aws.codecommit"],
                    detail_type=[
                        'CodeCommit Repository State Change'
                    ],
                    detail={
                        "event": [
                            "referenceCreated",
                            "referenceUpdated"
                        ],
                        "referenceType": [
                            "branch"
                        ],
                        "referenceName": [
                            params['source']['branch']
                        ]
                    }
                )
            )
            _event.add_target(
                _targets.CodePipeline(
                    pipeline=_pipeline
                )
            )
        if params.get('topic_arn'):
            # pylint: disable=no-value-for-parameter
            _topic = _sns.Topic.from_topic_arn(self, 'topic_arn', params["topic_arn"])
            _event = _events.Rule(
                self,
                f'pipeline_state_{name}',
                description=f"{name} | Trigger notifications based on pipeline state changes",
                enabled=True,
                event_pattern=_events.EventPattern(
                    detail={
                        "state": [
                            "FAILED",
                            "STARTED",
                            "SUCCEEDED"
                        ],
                        "pipeline": [
                            f"{ADF_PIPELINE_PREFIX}{name}",
                        ]
                    },
                    detail_type=[
                        "CodePipeline Pipeline Execution State Change"
                    ],
                    source=["aws.codepipeline"]
                )
            )
            _event.add_target(
                _targets.SnsTopic(
                    topic=_topic,
                    message=_events.RuleTargetInput.from_text(
                        # Need to parse and get the pipeline: "$.detail.pipeline" state: "$.detail.state"
                        f"The pipeline {_events.EventField.from_path('$.detail.pipeline')} "
                        f"from account {_events.EventField.account} "
                        f"has {_events.EventField.from_path('$.detail.state')} "
                        f"at {_events.EventField.time}."
                    )
                )
            )
        if params.get('completion_trigger'):
            # There might be other types of completion triggers later, eg lambda..
            for index, pipeline in enumerate(params['completion_trigger'].get('pipelines', [])):
                _event = _events.Rule(
                    self,
                    f'completion_{pipeline}',
                    description="Triggers {pipeline} on completion of {params['pipeline']}",
                    enabled=True,
                    event_pattern=_events.EventPattern(
                        detail={
                            "state": [
                                "SUCCEEDED"
                            ],
                            "pipeline": [
                                f"{ADF_PIPELINE_PREFIX}{name}",
                            ]
                        },
                        detail_type=[
                            "CodePipeline Pipeline Execution State Change"
                        ],
                        source=["aws.codepipeline"]
                    )
                )
                # pylint: disable=no-value-for-parameter
                _completion_pipeline = _codepipeline.Pipeline.from_pipeline_arn(
                    self,
                    f'pipeline-{index}',
                    f'arn:{stack.partition}:codepipeline:'
                    f'{ADF_DEPLOYMENT_REGION}:{ADF_DEPLOYMENT_ACCOUNT_ID}:'
                    f'{ADF_PIPELINE_PREFIX}{pipeline}'
                )
                _event.add_target(
                    _targets.CodePipeline(
                        pipeline=_completion_pipeline
                    )
                )
        if params.get('schedule'):
            _event = _events.Rule(
                self,
                f'schedule_{params["name"]}',
                description=f"Triggers {params['name']} on a schedule of {params['schedule']}",
                enabled=True,
                # pylint: disable=no-value-for-parameter
                schedule=_events.Schedule.expression(params['schedule'])
            )
            _target_pipeline = _targets.CodePipeline(
                pipeline=_pipeline
            )
            _event.add_target(
                _target_pipeline
            )