in awsglue/blueprint/workflow.py [0:0]
def _extract_triggers(self, crawlers, jobs, workflow_name, starting_trigger_schedule, name_suffix):
triggers = []
starting_trigger = {
'WorkflowName': workflow_name,
'Name': "{}_starting_trigger".format(workflow_name),
'Actions': []
}
if starting_trigger_schedule:
starting_trigger['Type'] = "SCHEDULED"
starting_trigger['Schedule'] = starting_trigger_schedule
starting_trigger['StartOnCreation'] = True
else:
starting_trigger['Type'] = "ON_DEMAND"
for crawler in crawlers:
if 'DependsOn' in crawler:
trigger = {'WorkflowName': workflow_name,
'Type': 'CONDITIONAL',
'Name': "{}_{}_trigger".format(workflow_name, crawler['Name']),
'Predicate': {'Conditions': []},
'StartOnCreation': True,
'Actions': [{'CrawlerName': crawler['Name']}]}
for dependency, state in crawler['DependsOn'].items():
if isinstance(dependency, Crawler):
trigger['Predicate']['Conditions'].append({
'LogicalOperator': 'EQUALS',
'CrawlerName': "{}_{}".format(dependency.Name, name_suffix),
'CrawlState': state
})
elif isinstance(dependency, Job):
trigger['Predicate']['Conditions'].append({
'LogicalOperator': 'EQUALS',
'JobName': "{}_{}".format(dependency.Name, name_suffix),
'State': state
})
else:
raise TypeError("Dependencies must be of type Job or Crawler, but found {} instead".format(
type(dependency)))
crawler.pop('DependsOn')
if 'WaitForDependencies' in crawler:
trigger['Predicate']['Logical'] = crawler['WaitForDependencies']
crawler.pop('WaitForDependencies')
triggers.append(trigger)
else:
starting_trigger['Actions'].append({'CrawlerName': crawler['Name']})
for job in jobs:
if 'DependsOn' in job:
trigger = {'WorkflowName': workflow_name,
'Type': 'CONDITIONAL',
'Name': "{}_{}_trigger".format(workflow_name, job['Name']),
'Predicate': {'Conditions': []},
'StartOnCreation': True,
'Actions': [{'JobName': job['Name']}]}
for dependency, state in job['DependsOn'].items():
if isinstance(dependency, Crawler):
trigger['Predicate']['Conditions'].append({
'LogicalOperator': 'EQUALS',
'CrawlerName': "{}_{}".format(dependency.Name, name_suffix),
'CrawlState': state
})
elif isinstance(dependency, Job):
trigger['Predicate']['Conditions'].append({
'LogicalOperator': 'EQUALS',
'JobName': "{}_{}".format(dependency.Name, name_suffix),
'State': state
})
else:
raise TypeError("Dependencies must be of type Job or Crawler, but found {} instead".format(
type(dependency)))
job.pop('DependsOn')
if 'WaitForDependencies' in job:
trigger['Predicate']['Logical'] = job['WaitForDependencies']
job.pop('WaitForDependencies')
triggers.append(trigger)
else:
starting_trigger['Actions'].append({'JobName': job['Name']})
triggers.append(starting_trigger)
return triggers