in data_annotation_platform/app/admin/routes.py [0:0]
def manage_tasks():
user_list = [(u.id, u.username) for u in User.query.all()]
dataset_list = [
(d.id, d.name) for d in Dataset.query.order_by(Dataset.name).all()
]
form = AdminManageTaskForm()
form.username.choices = user_list
form.dataset.choices = dataset_list
if form.validate_on_submit():
user = User.query.filter_by(id=form.username.data).first()
if user is None:
flash("User does not exist.", "error")
return redirect(url_for("admin.manage_tasks"))
dataset = Dataset.query.filter_by(id=form.dataset.data).first()
if dataset is None:
flash("Dataset does not exist.", "error")
return redirect(url_for("admin.manage_tasks"))
action = None
if form.assign.data:
action = "assign"
elif form.delete.data:
action = "delete"
else:
flash(
"Internal error: no button is true but form was submitted.",
"error",
)
return redirect(url_for("admin.manage_tasks"))
task = Task.query.filter_by(
annotator_id=user.id, dataset_id=dataset.id
).first()
if task is None:
if action == "delete":
flash("Can't delete a task that doesn't exist.", "error")
return redirect(url_for("admin.manage_tasks"))
else:
task = Task(annotator_id=user.id, dataset_id=dataset.id)
task.admin_assigned = True
db.session.add(task)
db.session.commit()
flash("Task registered successfully.", "success")
else:
if action == "assign":
flash("Task assignment already exists.", "error")
return redirect(url_for("admin.manage_tasks"))
else:
# delete annotations too
for ann in Annotation.query.filter_by(task_id=task.id).all():
db.session.delete(ann)
db.session.delete(task)
db.session.commit()
flash("Task deleted successfully.", "success")
tasks = (
Task.query.join(User, Task.user)
.join(Dataset, Task.dataset)
.order_by(Dataset.name, User.username)
.all()
)
return render_template(
"admin/manage_tasks.html", title="Assign Task", form=form, tasks=tasks
)