def _extract_triggers()

in awsglue/blueprint/workflow.py [0:0]


    def _extract_triggers(self, crawlers, jobs, workflow_name, starting_trigger_schedule, name_suffix):
        triggers = []
        starting_trigger = {
            'WorkflowName': workflow_name,
            'Name': "{}_starting_trigger".format(workflow_name),
            'Actions': []
        }
        if starting_trigger_schedule:
            starting_trigger['Type'] = "SCHEDULED"
            starting_trigger['Schedule'] = starting_trigger_schedule
            starting_trigger['StartOnCreation'] = True
        else:
            starting_trigger['Type'] = "ON_DEMAND"

        for crawler in crawlers:
            if 'DependsOn' in crawler:
                trigger = {'WorkflowName': workflow_name,
                           'Type': 'CONDITIONAL',
                           'Name': "{}_{}_trigger".format(workflow_name, crawler['Name']),
                           'Predicate': {'Conditions': []},
                           'StartOnCreation': True,
                           'Actions': [{'CrawlerName': crawler['Name']}]}
                for dependency, state in crawler['DependsOn'].items():
                    if isinstance(dependency, Crawler):
                        trigger['Predicate']['Conditions'].append({
                            'LogicalOperator': 'EQUALS',
                            'CrawlerName': "{}_{}".format(dependency.Name, name_suffix),
                            'CrawlState': state
                        })
                    elif isinstance(dependency, Job):
                        trigger['Predicate']['Conditions'].append({
                            'LogicalOperator': 'EQUALS',
                            'JobName': "{}_{}".format(dependency.Name, name_suffix),
                            'State': state
                        })
                    else:
                        raise TypeError("Dependencies must be of type Job or Crawler, but found {} instead".format(
                            type(dependency)))
                crawler.pop('DependsOn')
                if 'WaitForDependencies' in crawler:
                    trigger['Predicate']['Logical'] = crawler['WaitForDependencies']
                    crawler.pop('WaitForDependencies')
                triggers.append(trigger)
            else:
                starting_trigger['Actions'].append({'CrawlerName': crawler['Name']})

        for job in jobs:
            if 'DependsOn' in job:
                trigger = {'WorkflowName': workflow_name,
                           'Type': 'CONDITIONAL',
                           'Name': "{}_{}_trigger".format(workflow_name, job['Name']),
                           'Predicate': {'Conditions': []},
                           'StartOnCreation': True,
                           'Actions': [{'JobName': job['Name']}]}
                for dependency, state in job['DependsOn'].items():
                    if isinstance(dependency, Crawler):
                        trigger['Predicate']['Conditions'].append({
                            'LogicalOperator': 'EQUALS',
                            'CrawlerName': "{}_{}".format(dependency.Name, name_suffix),
                            'CrawlState': state
                        })
                    elif isinstance(dependency, Job):
                        trigger['Predicate']['Conditions'].append({
                            'LogicalOperator': 'EQUALS',
                            'JobName': "{}_{}".format(dependency.Name, name_suffix),
                            'State': state
                        })
                    else:
                        raise TypeError("Dependencies must be of type Job or Crawler, but found {} instead".format(
                            type(dependency)))
                job.pop('DependsOn')
                if 'WaitForDependencies' in job:
                    trigger['Predicate']['Logical'] = job['WaitForDependencies']
                    job.pop('WaitForDependencies')
                triggers.append(trigger)
            else:
                starting_trigger['Actions'].append({'JobName': job['Name']})
        triggers.append(starting_trigger)
        return triggers