in events/code_coverage_events/workflow.py [0:0]
def __init__(self):
# Create message bus shared amongst process
self.bus = MessageBus()
# Build code coverage workflow
self.workflow = CodeCoverage(
taskcluster_config.secrets["hook_id"],
taskcluster_config.secrets["hook_group_id"],
self.bus,
)
# Setup monitoring for newly created tasks
self.monitoring = Monitoring(
taskcluster_config,
QUEUE_MONITORING,
taskcluster_config.secrets["admins"],
7 * 3600,
)
self.monitoring.register(self.bus)
# Create pulse listener for code coverage
self.pulse = PulseListener(
{
QUEUE_PULSE: [
("exchange/taskcluster-queue/v1/task-group-resolved", ["#"])
]
},
taskcluster_config.secrets["pulse_user"],
taskcluster_config.secrets["pulse_password"],
)
self.pulse.register(self.bus)