in api/plugins/tasks.py [0:0]
def __init__(self, session, taskid = None, taskrow = None):
""" Loads a task from the registry or inits a new one """
self._data = {}
self.session = session
self.conn = session.DB.sqlite.open('nodetasks.db')
# task variables
self.id = None
self.type = None
self.category = 0
self.enabled = True
self.muted = False
self.payload = {}
self.name = None
# Load a task by ID
if taskid:
doc = None
nc = self.conn.cursor()
# Load by Task ID?
if re.match(r"^[0-9]+$", str(taskid)):
self.id = int(taskid)
nc.execute("SELECT * FROM `tasks` WHERE `id` = ? LIMIT 1", (taskid,))
doc = nc.fetchone()
if doc:
self.id = doc['id']
self.type = doc['type']
self.category = doc['category']
self.enabled = True if doc['enabled'] == 1 else False
self.muted = True if doc['muted'] == 1 else False
self.payload = json.loads(doc['payload'])
self.ipv6 = False # TODO!
self.name = doc['name']
else:
raise Exception("No such task found in registry")
# Or load a task by data row?
elif taskrow:
doc = taskrow
self.id = doc['id']
self.type = doc['type']
self.category = doc['category']
self.enabled = True if doc['enabled'] == 1 else False
self.muted = True if doc['muted'] == 1 else False
self.payload = json.loads(doc['payload'])
self.ipv6 = False # TODO!
self.name = doc['name']