in status_led_pkg/status_led_pkg/status_led_node.py [0:0]
def effect_loop(self):
"""Main daemon function to pick next effect to execute from the effect queue.
"""
while True:
# Get new effect from the queue.
new_effect, keyword_args = self.effect_queue.get()
# Terminate?
if new_effect is None:
break
# Get LED index.
led_index = keyword_args.get("led_index", constants.DEFAULT_LED_INDEX)
# Stop the current effect.
self.stop_effect(led_index)
# Start new effect.
self.stop[led_index].clear()
self.threads[led_index] = threading.Thread(target=new_effect, kwargs=keyword_args)
self.threads[led_index].start()
for led_index in range(0, 3):
self.stop_effect(led_index)