in ForgeTracker/forgetracker/model/ticket.py [0:0]
def update_tickets(self, **post_data):
from forgetracker.tracker_main import get_change_text, get_label
tickets = Ticket.query.find(dict(
_id={'$in': [ObjectId(id)
for id in aslist(
post_data['__ticket_ids'])]},
app_config_id=self.app_config_id)).all()
fields = {'status', 'private'}
values = {}
labels = post_data.get('labels', [])
for k in fields:
v = post_data.get(k)
if v:
values[k] = v
assigned_to = post_data.get('assigned_to')
if assigned_to == '-':
values['assigned_to_id'] = None
elif assigned_to:
user = c.project.user_in_project(assigned_to)
if user:
values['assigned_to_id'] = user._id
private = post_data.get('private')
if private:
values['private'] = asbool(private)
deleted = post_data.get('deleted')
if deleted:
values['deleted'] = asbool(deleted)
discussion_disabled = post_data.get('discussion_disabled')
if discussion_disabled:
values['disabled_discussion'] = asbool(discussion_disabled)
custom_values = {}
custom_fields = {}
for cf in self.custom_fields or []:
v = post_data.get(cf.name)
if v:
custom_values[cf.name] = v
custom_fields[cf.name] = cf
changes = {}
changed_tickets = {}
for ticket in tickets:
message = ''
if labels:
values['labels'] = self.append_new_labels(ticket.labels, labels.split(','))
for k, v in sorted(values.items()):
if k == 'deleted':
if v:
ticket.soft_delete()
break
elif k == 'assigned_to_id':
new_user = User.query.get(_id=v)
old_user = User.query.get(_id=getattr(ticket, k))
if new_user:
message += get_change_text(
get_label(k),
new_user.display_name,
old_user.display_name)
elif k == 'private' or k == 'discussion_disabled':
def _text(val):
if val:
return 'Yes'
else:
return 'No'
message += get_change_text(
get_label(k),
_text(v),
_text(getattr(ticket, k)))
else:
message += get_change_text(
get_label(k),
v,
getattr(ticket, k))
setattr(ticket, k, v)
for k, v in sorted(custom_values.items()):
def cf_val(cf, ticket=ticket):
return ticket.get_custom_user(cf.name) \
if cf.type == 'user' \
else ticket.custom_fields.get(cf.name)
cf = custom_fields[k]
old_value = cf_val(cf)
if cf.type == 'boolean':
v = asbool(v)
ticket.custom_fields[k] = v
new_value = cf_val(cf)
message += get_change_text(
cf.label,
new_value,
old_value)
if message != '':
changes[ticket._id] = message
changed_tickets[ticket._id] = ticket
ticket.discussion_thread.post(message, notify=False, is_meta=True)
ticket.commit()
filtered_changes = self.filtered_by_subscription(changed_tickets)
users = User.query.find(
{'_id': {'$in': list(filtered_changes.keys())}}).all()
def changes_iter(user):
for t_id in filtered_changes.get(user._id, []):
# mark changes text as safe, thus it wouldn't be escaped in plain-text emails
# html part of email is handled by markdown and it'll be
# properly escaped
yield (changed_tickets[t_id], markupsafe.Markup(changes[t_id])) # noqa: S704
mail = dict(
sender=c.project.app_instance(self.app_config).email_address,
fromaddr=str(c.user._id),
reply_to=tg_config['forgemail.return_path'],
subject='[{}:{}] Mass edit changes by {}'.format(c.project.shortname,
self.app_config.options.mount_point,
c.user.display_name),
)
tmpl = g.jinja2_env.get_template('forgetracker:data/mass_report.html')
head = []
for f, v in sorted(values.items()):
if f == 'assigned_to_id':
user = User.query.get(_id=v)
v = user.display_name if user else v
head.append(f'- **{get_label(f)}**: {v}')
for f, v in sorted(custom_values.items()):
cf = custom_fields[f]
if cf.type == 'user':
user = User.by_username(v)
v = user.display_name if user else v
head.append(f'- **{cf.label}**: {v}')
tmpl_context = {'context': c, 'data':
{'header': markupsafe.Markup('\n'.join(['Mass edit changing:', ''] + head))}} # noqa: S704
for user in users:
tmpl_context['data'].update({'changes': changes_iter(user)})
mail.update(dict(
message_id=h.gen_message_id(),
text=tmpl.render(tmpl_context),
destinations=[str(user._id)]))
mail_tasks.sendmail.post(**mail)
if self.app_config.options.get('TicketMonitoringType') in (
'AllTicketChanges', 'AllPublicTicketChanges'):
monitoring_email = self.app_config.options.get(
'TicketMonitoringEmail')
visible_changes = []
for t_id, t in changed_tickets.items():
if (not t.private or
self.app_config.options.get('TicketMonitoringType') ==
'AllTicketChanges'):
visible_changes.append(
(changed_tickets[t_id], markupsafe.Markup(changes[t_id]))) # noqa: S704
if visible_changes:
tmpl_context['data'].update({'changes': visible_changes})
mail.update(dict(
message_id=h.gen_message_id(),
text=tmpl.render(tmpl_context),
destinations=[monitoring_email]))
mail_tasks.sendmail.post(**mail)
self.invalidate_bin_counts()
ThreadLocalODMSession.flush_all()
app = '{}/{}'.format(c.project.shortname,
self.app_config.options.mount_point)
count = len(tickets)
text = 'Updated {} ticket{} in {}'.format(
count, 's' if count != 1 else '', app)
Notification.post_user(c.user, None, 'flash', text=text)