in msticpy/sectools/syslog_utils.py [0:0]
def cluster_syslog_logons_df(logon_events: pd.DataFrame) -> pd.DataFrame:
"""
Cluster logon sessions in syslog by start/end time based on PAM events.
Parameters
----------
logon_events: pd.DataFrame
A DataFrame of all syslog logon events
(can be generated with LinuxSyslog.user_logon query)
Returns
-------
logon_sessions: pd.DataFrame
A dictionary of logon sessions including start and end times
and logged on user
Raises
------
MsticpyException
There are no logon sessions in the supplied data set
"""
users = []
starts = []
ends = []
ses_close_time = logon_events["TimeGenerated"].max()
ses_opened = 0
ses_closed = 0
# Extract logon session opened and logon session closed data.
logons_opened = (
(
logon_events[
logon_events["SyslogMessage"].str.contains("pam_unix.+session opened")
]
)
.set_index("TimeGenerated")
.sort_index(ascending=True)
)
logons_closed = (
(
logon_events[
logon_events["SyslogMessage"].str.contains("pam_unix.+session closed")
]
)
.set_index("TimeGenerated")
.sort_index(ascending=True)
)
if logons_opened.empty or logons_closed.empty:
raise MsticpyException("There are no logon sessions in the supplied data set")
# For each session identify the likely start and end times
while ses_opened < len(logons_opened.index) and ses_closed < len(
logons_closed.index
):
ses_start = (logons_opened.iloc[ses_opened]).name
ses_end = (logons_closed.iloc[ses_closed]).name
# If we can identify a user for the session add this to the details
if "User" in logons_opened.columns:
user = (logons_opened.iloc[ses_opened]).User
elif "Sudoer" in logons_opened.columns:
user = (logons_opened.iloc[ses_opened]).Sudoer
else:
user = "Unknown"
if ses_start <= ses_close_time and ses_opened != 0:
ses_opened += 1
continue
if ses_end < ses_start:
ses_closed += 1
continue
users.append(user)
starts.append(ses_start)
ends.append(ses_end)
ses_close_time = ses_end
ses_closed += 1
ses_opened += 1
return pd.DataFrame({"User": users, "Start": starts, "End": ends})