ees_microsoft_outlook/utils.py (146 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
"""This module contains un-categorized utility methods.
"""
import csv
import os
import time
import urllib.parse
from datetime import date, datetime
import exchangelib
import pytz
from bs4 import BeautifulSoup
from exchangelib import EWSTimeZone
from tika import parser
from .adapter import SCHEMA
from .constant import DEFAULT_TIME_ZONE, RFC_3339_DATETIME_FORMAT
def extract(content):
"""Extracts the contents
:param content: content to be extracted
Returns:
parsed_test: parsed text
"""
parsed = parser.from_buffer(content)
parsed_text = parsed["content"]
return parsed_text
def url_encode(object_name):
"""Performs encoding on the name of objects
containing special characters in their url, and
replaces single quote with two single quote since quote
is treated as an escape character in data
:param object_name: name that contains special characters
"""
name = urllib.parse.quote(object_name, safe="'")
return name.replace("'", "''")
def retry(exception_list):
"""Decorator for retrying in case of server exceptions.
Retries the wrapped method `times` times if the exceptions listed
in ``exceptions`` are thrown
:param exception_list: Lists of exceptions on which the connector should retry
"""
def decorator(func):
"""This function used as a decorator."""
def execute(self, *args, **kwargs):
"""This function execute the retry logic."""
retry = 1
while retry <= self.retry_count:
try:
return func(self, *args, **kwargs)
except exception_list as exception:
self.logger.exception(
f"Error while creating a connection. Retry count: {retry} out of {self.retry_count}. "
f"Error: {exception}"
)
time.sleep(2**retry)
retry += 1
return execute
return decorator
def fetch_users_from_csv_file(user_mapping, logger):
"""This method is used to map sid to username from csv file.
:param user_mapping: path to csv file containing source user to enterprise search mapping
:param logger: logger object
:returns: dictionary of sid and username
"""
rows = {}
if user_mapping and os.path.exists(user_mapping) and os.path.getsize(user_mapping) > 0:
with open(user_mapping, encoding="utf-8") as mapping_file:
try:
csvreader = csv.reader(mapping_file)
for row in csvreader:
rows[row[0]] = row[1]
except csv.Error as e:
logger.exception(f"Error while reading user mapping file at the location: {user_mapping}. Error: {e}")
return rows
def split_list_into_buckets(documents, total_buckets):
"""Divide large number of documents amongst the total buckets
:param documents: list to be partitioned
:param total_buckets: number of buckets to be formed
"""
if documents:
groups = min(total_buckets, len(documents))
group_list = []
for i in range(groups):
group_list.append(documents[i::groups])
return group_list
else:
return []
def split_documents_into_equal_chunks(documents, chunk_size):
"""This method splits a list or dictionary into equal chunks size
:param documents: List or Dictionary to be partitioned into chunks
:param chunk_size: Maximum size of a chunk
Returns:
list_of_chunks: List containing the chunks
"""
list_of_chunks = []
for i in range(0, len(documents), chunk_size):
if type(documents) is dict:
partitioned_chunk = list(documents.items())[i:i + chunk_size]
list_of_chunks.append(dict(partitioned_chunk))
else:
list_of_chunks.append(documents[i:i + chunk_size])
return list_of_chunks
def get_current_time():
"""Returns current time in rfc 3339 format"""
return (datetime.utcnow()).strftime(RFC_3339_DATETIME_FORMAT)
def html_to_text(content):
"""Convert html content to text format
:param content: HTML content
Returns:
text: Converted Text
"""
if content:
soup = BeautifulSoup(content, "html.parser")
text = soup.get_text().strip()
return text
def convert_datetime_to_ews_format(utc_datetime):
"""Change datetime format to EWS timezone
:param utc_datetime: Datetime in UTC format
Returns:
Datetime: Datetime with EWS format
"""
return datetime.strptime(utc_datetime, "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=EWSTimeZone(DEFAULT_TIME_ZONE)
)
def change_datetime_format(datetime, timezone):
"""Change datetime format to user account timezone
:param datetime: Datetime in UTC format
:param timezone: User account timezone
Returns:
Datetime: Date format as user account timezone
"""
if isinstance(datetime, exchangelib.ewsdatetime.EWSDateTime):
return (datetime.astimezone(pytz.timezone(str(timezone)))).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
elif isinstance(datetime, exchangelib.ewsdatetime.EWSDate) or isinstance(
datetime, date
):
return datetime.strftime("%Y-%m-%d")
else:
return datetime
def insert_document_into_doc_id_storage(ids_list, id, parent_id, type, platform):
"""This function is used to prepare item for deletion and insert into global variable.
:param ids_list: Pass "global_keys" of doc ids JSON file
:param id: Pass id of mail, contacts, calendar events, tasks
:param parent_id: Pass parent id of id property
:param type: Pass type of each document for deletion.
:param platform: Pass platform of document like Office365, Microsoft Exchange
Returns:
ids_list: Updated ids_list
"""
new_item = {
"id": str(id),
"parent id": parent_id,
"type": type,
"platform": platform,
}
if new_item not in ids_list:
return ids_list.append(new_item)
else:
return ids_list
def get_schema_fields(document_name, objects):
"""Returns the schema of all the include_fields or exclude_fields specified in the configuration file.
:param document_name: Document name from Mails, Calendar, Tasks, Contacts etc.
Returns:
schema: Included and excluded fields schema
"""
fields = objects.get(document_name)
adapter_schema = SCHEMA[document_name]
field_id = adapter_schema["id"]
if fields:
include_fields = fields.get("include_fields")
exclude_fields = fields.get("exclude_fields")
if include_fields:
adapter_schema = {
key: val for key, val in adapter_schema.items() if val in include_fields
}
elif exclude_fields:
adapter_schema = {
key: val
for key, val in adapter_schema.items()
if val not in exclude_fields
}
adapter_schema["id"] = field_id
return adapter_schema
def split_date_range_into_chunks(start_time, end_time, number_of_chunks):
"""Divides the timerange in equal partitions by number of chunks
:param start_time: Start time of the interval
:param end_time: End time of the interval
:param number_of_chunks: Number of threads defined into config file for producer process
"""
start_time = datetime.strptime(start_time, RFC_3339_DATETIME_FORMAT)
end_time = datetime.strptime(end_time, RFC_3339_DATETIME_FORMAT)
diff = (end_time - start_time) / number_of_chunks
datelist = []
for idx in range(number_of_chunks):
date_time = start_time + diff * idx
datelist.append(date_time.strftime(RFC_3339_DATETIME_FORMAT))
formatted_end_time = end_time.strftime(RFC_3339_DATETIME_FORMAT)
datelist.append(formatted_end_time)
return datelist
def split_documents_into_equal_bytes(documents, allowed_size):
"""This method splits a list of dictionary into list based on allowed size limit.
:param documents: List of dictionary to be partitioned into chunks
:param allowed_size: Maximum size allowed for indexing per request.
Returns:
list_of_chunks: List of dictionary array that to be indexed.
"""
list_of_chunks = []
chunk = []
current_size = allowed_size
for document in documents:
document_size = len(str(document))
if document_size < current_size:
chunk.append(document)
current_size -= document_size
else:
if chunk:
list_of_chunks.append(chunk)
if document_size > allowed_size:
document["body"] = None
document_size = len(str(document))
chunk = [document]
current_size = allowed_size - document_size
list_of_chunks.append(chunk)
return list_of_chunks