ees_microsoft_teams/base_command.py (149 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""Module contains a base command interface.
Connector can run multiple commands such as full-sync, incremental-sync,
etc. This module provides convenience interface defining the shared
objects and methods that will can be used by commands.
"""
import csv
import functools
import logging
import os
try:
from functools import cached_property
except ImportError:
from cached_property import cached_property
from .enterprise_search_wrapper import EnterpriseSearchWrapper
from concurrent.futures import ThreadPoolExecutor, as_completed
from elastic_enterprise_search import __version__
from packaging import version
from .configuration import Configuration
from .local_storage import LocalStorage
from .microsoft_teams_calendars import MSTeamsCalendar
from .microsoft_teams_channels import MSTeamsChannels
from .microsoft_teams_user_messages import MSTeamsUserMessage
from .msal_access_token import MSALAccessToken
from .permission_sync_command import PermissionSyncCommand
ENTERPRISE_V8 = version.parse("8.0")
class BaseCommand:
"""Base interface for all module commands.
Inherit from it and implement 'execute' method, then add
code to cli.py to register this command."""
def __init__(self, args):
self.args = args
self.version = version.parse(__version__)
def execute(self):
"""Run the command.
This method is overridden by actual commands with logic
that is specific to each command implementing it."""
raise NotImplementedError
@cached_property
def logger(self):
"""Get the logger instance for the running command.
log level will be determined by the configuration
setting log_level.
"""
log_level = self.config.get_value('log_level')
logger = logging.getLogger(__name__)
logger.propagate = True
logger.setLevel(log_level)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(asctime)s %(levelname)s Thread[%(thread)s]: %(message)s")
handler.setFormatter(formatter)
# Uncomment the following lines to output logs in ECS-compatible format
# formatter = ecs_logging.StdlibFormatter()
# handler.setFormatter(formatter)
handler.setLevel(log_level)
logger.addHandler(handler)
return logger
@cached_property
def workplace_search_custom_client(self):
"""Get the workplace search custom client instance for the running command."""
return EnterpriseSearchWrapper(self.logger, self.config, self.args)
@cached_property
def config(self):
"""Get the configuration for the connector for the running command."""
file_name = self.args.config_file
return Configuration(file_name)
def create_and_execute_jobs(self, thread_count, func, args, iterable_list):
"""Creates a thread pool of given number of thread count
:param thread_count: Total number of threads to be spawned
:param func: The target function on which the async calls would be made
:param args: Arguments for the targeted function
:param iterable_list: List to iterate over and create thread
"""
callables = []
if iterable_list:
for list_element in iterable_list:
callables.append(functools.partial(func, *args, list_element))
else:
callables.append(func)
documents = []
with ThreadPoolExecutor(max_workers=thread_count) as executor:
future_to_path = {
executor.submit(list_element): list_element
for list_element in callables
}
for future in as_completed(future_to_path):
try:
if future.result():
documents.extend(future.result())
except Exception as exception:
self.logger.exception(
f"Error while fetching the data from Microsoft Teams. Error {exception}"
)
return documents
@cached_property
def local_storage(self):
"""Get the object for local storage to fetch and update ids stored locally"""
return LocalStorage(self.logger)
def get_access_token(self, is_acquire_for_client=False):
"""Get access token for fetching the data
:param is_acquire_for_client: Flag for fetching the access token
"""
return MSALAccessToken(self.logger, self.config).get_token(is_acquire_for_client)
def microsoft_team_channel_object(self, access_token):
"""Get the object for fetching the teams and its children"""
return MSTeamsChannels(
access_token, self.logger, self.config, self.local_storage
)
def microsoft_user_chats_object(self, access_token):
"""Get the object for fetching the user chats related data"""
return MSTeamsUserMessage(
access_token, self.logger, self.config, self.local_storage
)
def microsoft_calendar_object(self, access_token):
"""Get the object for fetching the calendar related data"""
return MSTeamsCalendar(
access_token, self.logger, self.config, self.local_storage
)
def get_mapped_users(self):
"""Returns mapped users from the CSV file
"""
rows = {}
mapping_sheet_path = self.config.get_value("microsoft_teams.user_mapping")
if (
mapping_sheet_path and os.path.exists(mapping_sheet_path) and os.path.getsize(mapping_sheet_path) > 0
):
with open(mapping_sheet_path, encoding="UTF-8") as file:
for row in csv.reader(file):
rows[row[0]] = row[1]
return rows
def manage_permissions(self, object_permissions, ws_user, ws_permissions):
"""Returns the permissions differs from Workplace Search
:param object_permissions: Permissions of the Microsoft Teams Object
:param ws_user: Workplace Search user
:param ws_permissions: Workplace Search permissions of a user
"""
mapped_users = self.get_mapped_users()
for ms_team_user, permissions in object_permissions.items():
ms_team_user = mapped_users.get(ms_team_user, ms_team_user)
if ms_team_user.lower() == ws_user.lower():
ws_permissions = set(ws_permissions).difference(permissions)
return list(ws_permissions)
def remove_object_permissions(self, end_time):
"""Remove the permissions of the users removed from the Microsoft Teams objects
:param end_time: End time to fetch the permissions
"""
deleted_permissions_list = []
microsoft_teams_object = self.microsoft_team_channel_object(
self.get_access_token()
)
user_chat_object = self.microsoft_user_chats_object(
self.get_access_token()
)
calendar_object = self.microsoft_calendar_object(
self.get_access_token(is_acquire_for_client=True)
)
teams_permissions = microsoft_teams_object.get_team_members()
user_chats_permissions, _ = user_chat_object.get_user_chats([])
calendar_permissions, _ = calendar_object.get_calendars([], self.config.get_value('start_time'), end_time)
ws_user_permissions = PermissionSyncCommand(
self.logger, self.config, self.workplace_search_custom_client
).list_user_permissions()
for ws_user, ws_permissions in ws_user_permissions.items():
actual_permissions = ws_permissions
ws_permissions = self.manage_permissions(teams_permissions, ws_user, ws_permissions)
ws_permissions = self.manage_permissions(user_chats_permissions, ws_user, ws_permissions)
ws_permissions = self.manage_permissions(calendar_permissions, ws_user, ws_permissions)
deleted_permissions_list.append({"user": ws_user, "actual_permissions": actual_permissions,
"deleted_permissions": ws_permissions})
for permission_dict in deleted_permissions_list:
if permission_dict["deleted_permissions"]:
if self.version >= ENTERPRISE_V8:
self.workplace_search_custom_client.remove_permissions(
{
"external_user_properties": [{'attribute_value': permission_dict['user']}],
"permissions": permission_dict['deleted_permissions']
}
)
else:
self.workplace_search_custom_client.remove_permissions(
{"user": permission_dict['user'], "permissions": permission_dict['deleted_permissions']}
)
self.workplace_search_custom_client.add_permissions(
permission_dict['user'],
list(set(permission_dict['actual_permissions']) - set(permission_dict['deleted_permissions']))
)
self.logger.debug(
f"Removed permissions for {permission_dict['user']} from the Workplace Search"
)
else:
self.logger.debug(
f"No permission found for {permission_dict['user']} to remove from Workplace Search"
)