distill/segmentation/segment.py (287 lines of code) (raw):
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
from enum import Enum
import csv
from distill.segmentation.segments import Segments
import copy
class Segment_Type(Enum):
CREATE = "create"
GENERATE = "generate"
DEADSPACE = "deadspace"
FIXED_TIME = "fixed_time"
GENERATE_COLLAPSING_WINDOWS = "generate_collapsing_windows"
UNION = "union"
INTERSECTION = "intersection"
DIFFERENCE = "difference"
class Segment():
"""
Distill's segmentation package. Allows the user to segment User Ale log data.
"""
def __init__(self, segment_name="", start_end_val=None, num_logs=0, uids=[]):
"""
Initializes a Segment object. This object contains metadata for the associated Segment.
:param segment_name: Name associated with the segment, defaults to an empty string
:param start_end_val: A list of tuples (i.e [(start_time, end_time)], where start_time and end_time are Date/Time Objects or integers. Defaults to a None value.
:param num_logs: Number of logs in the segment. Defaults to 0.
:param uids: A list of strings representing the associated uids of logs within the segment. Defaults to an empty list.
"""
self.segment_name = segment_name
self.start_end_val = start_end_val
self.num_logs = num_logs
self.uids = uids
self.generate_field_name = None
self.generate_matched_values = None
self.segment_type = None
def __str__(self):
start = self.start_end_val[0]
end = self.start_end_val[1]
variables = vars(self)
final_str = "Segment:"
for var in variables:
if var != "uids":
if var == "start_end_val":
final_str += " start" + "=" + str(start) + ","
final_str += " end" + "=" + str(end) + ","
else:
final_str += " " + str(var) + "=" + str(variables[var]) + ","
return final_str[:-1]
def get_segment_name(self):
"""
Gets the name of a given segment.
:return: The segment name of the given segment.
"""
return self.segment_name
def get_start_end_val(self):
"""
Gets the start and end values of a given segment.
:return: The start and end values of the given segment.
"""
return self.start_end_val
def get_num_logs(self):
"""
Gets the number of logs within a given segment.
:return: The number of logs within the given segment.
"""
return self.num_logs
def get_segment_uids(self):
"""
Gets the uid list of a given segment.
:return: The uid list of the given segment.
"""
return self.uids
def get_segment_type(self):
"""
Gets the segment type of a given segment.
:return: The segment type of the given segment.
"""
return self.segment_type
def get_generate_field_name(self):
"""
Gets the field name used to create a segment with generate_segment.
:return: The field name used to create a segment with generate_segment.
"""
return self.generate_field_name
def get_generate_matched_values(self):
"""
Gets the values used to create a segment with generate_segment.
:return: The values used to create a segment with generate_segment.
"""
return self.generate_matched_values
#######################
# SET LOGIC FUNCTIONS #
#######################
def union(segment_name, segment1, segment2):
"""
Creates a new segment based on the union of given segments' uids.
:param segment_name: Name associated with the new segment
:param segment1: First segment involved in union.
:param segment2: Second segment involved in union.
:return: A new segment with the given segment_name, start and end values based on the smallest client time and
largest client time of the given segments, and a list of the union of the uids of segment1 and segment2.
"""
if not isinstance(segment1.start_end_val[0], type(segment2.start_end_val[0])) or \
not isinstance(segment1.start_end_val[1], type(segment2.start_end_val[1])):
raise TypeError("Segment start and end values must be of the same type between segments.")
# Union uids
uids = copy.deepcopy(segment1.uids)
for uid in segment2.uids:
if uid not in uids:
uids.append(uid)
# Get earliest starting val and latest end val
start_time = segment1.start_end_val[0]
end_time = segment1.start_end_val[1]
if segment1.start_end_val[0] > segment2.start_end_val[0]:
start_time = segment2.start_end_val[0]
if segment1.start_end_val[1] < segment2.start_end_val[1]:
end_time = segment2.start_end_val[1]
# Create segment to return
segment = Segment(segment_name, (start_time, end_time), len(uids), uids)
segment.segment_type = Segment_Type.UNION
segment.generate_field_name = None
segment.generate_matched_values = None
return segment
def intersection(segment_name, segment1, segment2):
"""
Creates a new segment based on the intersection of given segments' uids.
:param segment_name: Name associated with the new segment
:param segment1: First segment involved in intersection.
:param segment2: Second segment involved in intersection.
:return: A new segment with the given segment_name, start and end values based on the smallest client time and
largest client time of the given segments, and a list of the intersection of the uids of segment1 and segment2.
"""
if not isinstance(segment1.start_end_val[0], type(segment2.start_end_val[0])) or \
not isinstance(segment1.start_end_val[1], type(segment2.start_end_val[1])):
raise TypeError("Segment start and end values must be of the same type between segments.")
# intersections uids
uids = []
for uid in segment2.uids:
if uid in segment1.uids:
uids.append(uid)
# Get the earliest start and latest end value
start_time = segment1.start_end_val[0]
end_time = segment1.start_end_val[1]
if segment1.start_end_val[0] > segment2.start_end_val[0]:
start_time = segment2.start_end_val[0]
if segment1.start_end_val[1] < segment2.start_end_val[1]:
end_time = segment2.start_end_val[1]
segment = Segment(segment_name, (start_time, end_time), len(uids), uids)
segment.segment_type = Segment_Type.INTERSECTION
segment.generate_field_name = None
segment.generate_matched_values = None
return segment
def difference(segment_name, segment1, segment2):
"""
Creates a new segment based on the logical difference of segment2 from segment1.
:param segment_name: Name associated with the new segment
:param segment1: Segment from which to subtract segment2's matched UIDs.
:param segment2: Segment whose matched UIDs are to be subtracted from segment1.
:return: A new segment with the given segment_name, start and end values based on segment1, and a list of the
difference of the uids of segment1 and segment2.
"""
# Find matching UIDs
matched_uids = []
for uid in segment2.uids:
if uid in segment1.uids:
matched_uids.append(uid)
# Subtract matched UIDs from segment1
uids = copy.deepcopy(segment1.uids)
for uid in matched_uids:
uids.remove(uid)
# Return new segment
segment = Segment(segment_name, segment1.start_end_val, len(uids), uids)
segment.segment_type = Segment_Type.DIFFERENCE
segment.generate_field_name = None
segment.generate_matched_values = None
return segment
####################
# SEGMENT CREATION #
####################
def create_segment(target_dict, segment_names, start_end_vals):
"""
Creates a dictionary of Segment objects representing the metadata
associated with each defined segment.
:param target_dict: A dictionary of User Ale logs assumed to be ordered by clientTime (Date/Time Objects or integers)
:param segment_names: A list of segment_names ordered in the same way as the start_end_vals
:param start_end_vals: A list of tuples (i.e [(start_time, end_time)], where start_time and end_time are Date/Time Objects or integers
:return: A Segments object containing newly created Segment objects.
"""
segments = []
for i in range(len(segment_names)):
num_logs = 0
segment_name = segment_names[i]
start_time = start_end_vals[i][0]
end_time = start_end_vals[i][1]
uids = []
for uid in target_dict:
log = target_dict[uid]
if (isinstance(log['clientTime'], int) and isinstance(start_time, int) and isinstance(end_time, int)) or (isinstance(log['clientTime'], datetime.datetime) and isinstance(start_time, datetime.datetime) and isinstance(end_time, datetime.datetime)):
if log['clientTime'] >= start_time and log['clientTime'] <= end_time:
# Perform data collection on log
num_logs += 1
uids.append(uid)
else:
raise TypeError("clientTime and start/end times must be represented as the same type and must either be a datetime object or integer.")
segment = Segment(segment_name, start_end_vals[i], num_logs, uids)
segment.segment_type = Segment_Type.CREATE
segment.generate_field_name = None
segment.generate_matched_values = None
segments.append(segment)
return Segments(segments)
def write_segment(target_dict, segment_names, start_end_vals):
"""
Creates a nested dictionary of segment names to UIDs which then map to individual
logs (i.e result['segment_name'][uid] --> log). This assists with easy iteration over
defined segments.
:param target_dict: A dictionary of User Ale logs assumed to be ordered by clientTime (Date/Time Objects or integers).
:param segment_names: A list of segment_names ordered in the same way as the start_end_vals.
:param start_end_vals: A list of tuples (i.e [(start_time, end_time)]), where start_time and end_time are Date/Time Objects or integers.
:return: A nested dictionary of segment_names to uids to individual logs.
"""
result = {}
create_result = create_segment(target_dict, segment_names, start_end_vals)
# Iterate through segments to get logs
for segment in create_result:
result[segment.get_segment_name()] = {}
for uid in segment.uids:
result[segment.get_segment_name()][uid] = target_dict[uid]
return result
def generate_segments(target_dict, field_name, field_values, start_time_limit, end_time_limit, label=""):
"""
Generates a list of Segment objects corresponding to windows of time defined by the given time limits,
field name, and associated values meant to match the field name indicated.
:param target_dict: A dictionary of User Ale logs assumed to be ordered by clientTime (Date/Time Objects or integers).
:param field_name: A string indicating the field name meant to be matched by the field values.
:param field_values: A list of field values to be matched in order to start a segment.
:param start_time_limit: Amount of time (in seconds) prior to a detected event that should be included in the generated segment.
:param end_time_limit: Amount of time (in seconds) to keep the segment window open after a detected event.
:param label: An optional string argument that provides a prefix for the returned dictionary keys.
:return: A Segments object containing newly created Segment objects.
"""
# Iterate through the target dictionary using key list
start_end_vals = []
segment_names = []
prev_end_time = None
keys = list(target_dict.keys())
index = 0
for i in range(len(keys)):
if field_name in target_dict[keys[i]]:
# Matches value in field_values list with dict values (str or list)
if any(item in target_dict[keys[i]][field_name] for item in field_values):
# Matches values - Create segment
orig_start_time = target_dict[keys[i]]['clientTime']
if isinstance(orig_start_time, int):
start_time = orig_start_time - (start_time_limit * 1000)
end_time = orig_start_time + (end_time_limit*1000)
elif isinstance(orig_start_time, datetime.datetime):
start_time = orig_start_time - datetime.timedelta(seconds=start_time_limit)
end_time = orig_start_time + datetime.timedelta(seconds=end_time_limit)
else:
raise TypeError('clientTime field is not represented as an integer or datetime object')
if prev_end_time is None or orig_start_time > prev_end_time:
if prev_end_time is not None and start_time < prev_end_time:
start_time = prev_end_time
start_end_tuple = (start_time, end_time)
start_end_vals.append(start_end_tuple)
segment_names.append(label + str(index))
prev_end_time = end_time
index += 1
# Create segment dictionary with create_segment
segments = create_segment(target_dict, segment_names, start_end_vals)
for segment in segments:
segment.segment_type = Segment_Type.GENERATE
segment.generate_field_name = field_name
segment.generate_matched_values = field_values
return segments
def detect_deadspace(target_dict, deadspace_limit, start_time_limit, end_time_limit, label=""):
"""
Detects deadspace in a dictionary of User Ale logs. Detected instances of deadspace are captured in Segment
objects based on the start and end time limits indicated by the function parameters.
:param target_dict: A dictionary of User Ale logs assumed to be ordered by clientTime (Date/Time Objects or integers).
:param deadspace_limit: An integer representing the amount of time (in seconds) considered to be 'deadspace'.
:param start_time_limit: Amount of time (in seconds) prior to a detected deadspace event that should be included in the deadspace segment.
:param end_time_limit: Amount of time (in seconds) to keep the segment window open after a detected deadspace event.
:param label: An optional string argument that provides a prefix for the returned dictionary keys.
:return: A Segments object containing newly created Segment objects.
"""
# Iterate through the target dictionary using key list
start_end_vals = []
segment_names = []
key_list = list(target_dict.keys())
index = 0
for i in range(len(key_list)):
# Check for deadspace
if i < len(key_list) - 1:
curr_time = target_dict[key_list[i]]['clientTime']
next_time = target_dict[key_list[i + 1]]['clientTime']
if isinstance(curr_time, int) and isinstance(next_time, int):
time_delta = next_time - curr_time
if time_delta > deadspace_limit * 1000:
# Deadspace detected
start_time = curr_time - (start_time_limit * 1000)
end_time = next_time + (end_time_limit * 1000)
start_end_tuple = (start_time, end_time)
start_end_vals.append(start_end_tuple)
segment_names.append(label + str(index))
index += 1
elif isinstance(curr_time, datetime.datetime) and isinstance(next_time, datetime.datetime):
time_delta = next_time - curr_time
if time_delta > datetime.timedelta(seconds=deadspace_limit):
# Deadspace detected
start_time = curr_time - datetime.timedelta(seconds=start_time_limit)
end_time = next_time + datetime.timedelta(seconds=end_time_limit)
start_end_tuple = (start_time, end_time)
start_end_vals.append(start_end_tuple)
segment_names.append(label + str(index))
index += 1
else:
raise TypeError('clientTime field is not consistently represented as an integer or datetime object')
# Create segment dictionary with create_segment
segments = create_segment(target_dict, segment_names, start_end_vals)
for segment in segments:
segment.segment_type = Segment_Type.DEADSPACE
segment.generate_field_name = None
segment.generate_matched_values = None
return segments
def generate_fixed_time_segments(target_dict, time, trim=False, label=""):
"""
Generates segments based on fixed time intervals.
:param target_dict: A dictionary of User Ale logs assumed to be ordered by clientTime (Date/Time Objects or integers).
:param time: The fixed time from which the Segment start and end times are based (seconds).
:param trim: An optional boolean indicating whether the logs that don't fit into the fixed windows should be trimmed.
:param label: An optional string argument that provides a prefix for the returned dictionary keys.
:return: A Segments object containing newly created Segment objects.
"""
key_list = list(target_dict.keys())
# Get overall start and end time
start = target_dict[key_list[0]]['clientTime']
end = target_dict[key_list[len(key_list) - 1]]['clientTime']
start_end_vals = []
segment_names = []
index = 0
if isinstance(start, int) and isinstance(end, int):
if trim:
# Create equal segments
while start + (time * 1000) <= end:
# Create a new start/end tuple
start_end = (start, start + (time * 1000))
start_end_vals.append(start_end)
segment_names.append(label + str(index))
start += (time * 1000)
index += 1
else:
# Include all logs
while start < end:
# Create a new start/end tuple
start_end = (start, start + (time * 1000))
start_end_vals.append(start_end)
segment_names.append(label + str(index))
start += (time * 1000)
index += 1
elif isinstance(start, datetime.datetime) and isinstance(end, datetime.datetime):
if trim:
# Create equal segments
while start + datetime.timedelta(seconds=time) <= end:
# Create a new start/end tuple
start_end = (start, start + datetime.timedelta(seconds=time))
start_end_vals.append(start_end)
segment_names.append(label + str(index))
start += datetime.timedelta(seconds=time)
index += 1
else:
# Include all logs
while start < end:
# Create a new start/end tuple
start_end = (start, start + datetime.timedelta(seconds=time))
start_end_vals.append(start_end)
segment_names.append(label + str(index))
start += datetime.timedelta(seconds=time)
index += 1
else:
raise TypeError("clientTime must be represented as either an integer or datetime object.")
# Create segment dictionary with create_segment
segments = create_segment(target_dict, segment_names, start_end_vals)
for segment in segments:
segment.segment_type = Segment_Type.FIXED_TIME
segment.generate_field_name = None
segment.generate_matched_values = None
return segments
def generate_collapsing_window_segments(target_dict, field_name, field_values_of_interest, label=""):
"""
Generates segments based on a window to time in which the given field name has a value matching one of the values
indicated by the field_values_of_interest list.
:param target_dict: A dictionary of User Ale logs assumed to be ordered by clientTime (Date/Time Objects or integers).
:param field_name: A string indicating the field name meant to be matched by the field values.
:param field_values_of_interest: A list of field values to be matched in order to start/end a segment.
:param label: An optional string argument that provides a prefix for the returned dictionary keys.
"""
key_list = list(target_dict.keys())
start_end_val_lists = []
start_end_vals = []
segment_names = []
index = 0
segment_started = False
for i in range(len(key_list)):
if field_name in target_dict[key_list[i]] and any(item in target_dict[key_list[i]][field_name] for item in field_values_of_interest):
if not segment_started:
# Start a new Segment
start_end = [target_dict[key_list[i]]['clientTime'], None]
start_end_val_lists.append(start_end)
segment_names.append(label + str(index))
segment_started = True
# End Segment if end of dictionary
elif segment_started and i == (len(key_list) - 1):
# End the Segment
start_end_tuple = (start_end_val_lists[index][0], target_dict[key_list[i]]['clientTime'])
start_end_vals.append(start_end_tuple)
index += 1
segment_started = False
else:
if segment_started:
# End the Segment
start_end_tuple = (start_end_val_lists[index][0], target_dict[key_list[i - 1]]['clientTime'])
start_end_vals.append(start_end_tuple)
index += 1
segment_started = False
if 0 < len(start_end_vals) < len(segment_names):
start_end_vals.append((start_end_val_lists[index][0], target_dict[key_list[len(key_list) - 1]]['clientTime']))
# Create Segments object with create_segment
segments = create_segment(target_dict, segment_names, start_end_vals)
for segment in segments:
segment.segment_type = Segment_Type.GENERATE_COLLAPSING_WINDOWS
segment.generate_field_name = field_name
segment.generate_matched_values = field_values_of_interest
return segments
######################
# EXPORTING SEGMENTS #
######################
def export_segments(path, segments):
"""
Writes segment metadata into a csv file. Csv will be saved at the indicated path.
:param path: Represents the path of the new file.
:param segments: A Segments object containing Segment objects.
"""
file = open(path, 'w')
writer = csv.writer(file)
# Populate the csv row by row
# TODO: Make sure this is the right format
header_row = ['Segment Name', 'Start Time', 'End Time', 'Number of Logs', 'Generate Field Name',
'Generate Matched Values', 'Segment Type']
writer.writerow(header_row)
for segment in segments:
row = [segment.segment_name, str(segment.start_end_val[0]), str(segment.start_end_val[1]), segment.num_logs,
segment.generate_field_name, segment.generate_matched_values, segment.segment_type]
writer.writerow(row)
file.close()