def cluster_syslog_logons_df()

in msticpy/sectools/syslog_utils.py [0:0]


def cluster_syslog_logons_df(logon_events: pd.DataFrame) -> pd.DataFrame:
    """
    Cluster logon sessions in syslog by start/end time based on PAM events.

    Parameters
    ----------
    logon_events: pd.DataFrame
        A DataFrame of all syslog logon events
        (can be generated with LinuxSyslog.user_logon query)

    Returns
    -------
    logon_sessions: pd.DataFrame
        A dictionary of logon sessions including start and end times
        and logged on user

    Raises
    ------
    MsticpyException
        There are no logon sessions in the supplied data set

    """
    users = []
    starts = []
    ends = []
    ses_close_time = logon_events["TimeGenerated"].max()
    ses_opened = 0
    ses_closed = 0
    # Extract logon session opened and logon session closed data.
    logons_opened = (
        (
            logon_events[
                logon_events["SyslogMessage"].str.contains("pam_unix.+session opened")
            ]
        )
        .set_index("TimeGenerated")
        .sort_index(ascending=True)
    )
    logons_closed = (
        (
            logon_events[
                logon_events["SyslogMessage"].str.contains("pam_unix.+session closed")
            ]
        )
        .set_index("TimeGenerated")
        .sort_index(ascending=True)
    )
    if logons_opened.empty or logons_closed.empty:
        raise MsticpyException("There are no logon sessions in the supplied data set")

    # For each session identify the likely start and end times
    while ses_opened < len(logons_opened.index) and ses_closed < len(
        logons_closed.index
    ):
        ses_start = (logons_opened.iloc[ses_opened]).name
        ses_end = (logons_closed.iloc[ses_closed]).name
        # If we can identify a user for the session add this to the details
        if "User" in logons_opened.columns:
            user = (logons_opened.iloc[ses_opened]).User
        elif "Sudoer" in logons_opened.columns:
            user = (logons_opened.iloc[ses_opened]).Sudoer
        else:
            user = "Unknown"
        if ses_start <= ses_close_time and ses_opened != 0:
            ses_opened += 1
            continue
        if ses_end < ses_start:
            ses_closed += 1
            continue
        users.append(user)
        starts.append(ses_start)
        ends.append(ses_end)
        ses_close_time = ses_end
        ses_closed += 1
        ses_opened += 1
    return pd.DataFrame({"User": users, "Start": starts, "End": ends})