in src/dispatch/signal/flows.py [0:0]
def engage_signal_identity(db_session: Session, signal_instance: SignalInstance) -> None:
"""Engage the signal identity."""
users_to_engage = []
engagements = signal_instance.signal.engagements
for engagement in engagements:
for entity in signal_instance.entities:
if engagement.entity_type_id == entity.entity_type_id:
try:
validated_email = validate_email(entity.value, check_deliverability=False)
except EmailNotValidError as e:
log.warning(
f"A user subject included in a signal for {signal_instance.signal.name} (id: {signal_instance.signal.id}) contains an invalid email address: {e}. Investigate why this detection included a user subject with an invalid email in the signal."
)
else:
users_to_engage.append(
{
"user": validated_email.email,
"engagement": engagement,
}
)
if not users_to_engage:
log.warning(
f"Engagement configured for signal {signal_instance.signal.name} (id: {signal_instance.signal.id}), but no users found in instance with id {signal_instance.id}."
)
return
plugin = plugin_service.get_active_instance(
db_session=db_session,
project_id=signal_instance.case.project.id,
plugin_type="conversation",
)
if not plugin:
log.warning("No conversation plugin is active.")
return
for reachout in users_to_engage:
email = reachout.get("user")
case_flows.case_add_or_reactivate_participant_flow(
db_session=db_session,
user_email=email,
case_id=signal_instance.case.id,
add_to_conversation=True,
)
user = user_service.get_or_create(
db_session=db_session,
organization=signal_instance.case.project.organization.slug,
user_in=UserRegister(email=email),
)
response = plugin.instance.create_engagement_threaded(
signal_instance=signal_instance,
case=signal_instance.case,
conversation_id=signal_instance.case.conversation.channel_id,
thread_id=signal_instance.case.conversation.thread_id,
user=user,
engagement=reachout.get("engagement"),
engagement_status=SignalEngagementStatus.new,
)
signal_instance.engagement_thread_ts = response.get("timestamp")
db_session.commit()