ManagedkdbInsights/boto/managed_kx.py [13:801]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
pd.set_option('display.max_colwidth', None)

#
# Functions
# ----------------------------------------------------------------------------

def get_kx_environment_id(client):
    l = list_kx_environments(client)
    envs = []

    for e in l:
        if e['status'] == "CREATED":
            envs.append(e)

    if len(envs) != 1:
        print(f"Environment count: {len(envs)} cannot infer which to use")
        return None

    # determined list len == 1, return that environmentId
    return envs[0].get('environmentId')


def list_kx_environments(client):
    envs = []

    envs.extend(client.list_kx_environments()['environments'])

    return envs


def list_kx_databases(client, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp = client.list_kx_databases(environmentId=environmentId)

    results = resp['kxDatabases']

    while "nextToken" in resp:
        resp = client.list_kx_databases(environmentId=environmentId, nextToken=resp['nextToken'])
        results.extend(resp['kxDatabases'])

    return results


def list_kx_clusters(client, clusterType=None, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp = None

    if clusterType is not None:
        resp = client.list_kx_clusters(environmentId=environmentId, clusterType=clusterType)
    else:
        resp = client.list_kx_clusters(environmentId=environmentId)

    results = resp['kxClusterSummaries']

    while "nextToken" in resp:
        if clusterType is not None:
            resp = client.list_kx_clusters(environmentId=environmentId, clusterType=clusterType, nextToken=resp['nextToken'])
        else:
            resp = client.list_kx_clusters(environmentId=environmentId, nextToken=resp['nextToken'])

        results.extend(resp['kxClusterSummaries'])

    return results


def list_kx_changesets(client, databaseName, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp = client.list_kx_changesets(environmentId=environmentId, databaseName=databaseName)

    results = resp.get('kxChangesets', [])

    while "nextToken" in resp:
        resp = client.list_kx_changesets(environmentId=environmentId, databaseName=databaseName, nextToken=resp['nextToken'])
        results.extend(resp['kxChangesets'])

    return results

def get_kx_changeset(client, environmentId:str, databaseName: str, changesetId:str):
    resp = None
    try:
        resp = client.get_kx_changeset(environmentId=environmentId, databaseName=databaseName, changesetId=changesetId)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def get_all_kx_changesets(client, databaseName, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    # get all changesets
    c_list=list_kx_changesets(client, environmentId=environmentId, databaseName=databaseName)

    # get details on each changeset in the database
    cd_list = []

    for c in c_list:
        c_id = c['changesetId']
        cd = get_kx_changeset(client=client, environmentId=environmentId, databaseName=databaseName, changesetId=c_id)

        cd_list.append(cd)

    # For each changeset, extract/flattend changeRequest
    part_list = []

    for cs in cd_list:
        for cr in cs.get('changeRequests', None):

            # anything?
            if cr is None:
                continue

            item = {}

            # get the core items
            item['status'] = cs['status']
            item['changesetId'] = cs['changesetId']
            item['createdTimestamp'] = cs['createdTimestamp']
            item['activeFromTimestamp'] = cs['activeFromTimestamp']
            item['lastModifiedTimestamp'] = cs['lastModifiedTimestamp']

            # get the request items
            item['changeType'] = cr['changeType']
            item['dbPath'] = cr['dbPath']

            # get the date partition from dbPath
            dt = None
            p = cr.get('dbPath', None)

            # has a specific date to parse
            if p is not None and p[0] == '/' and len(p) == 10:
                dt = datetime.datetime.strptime(p[1:11],"%Y.%m.%d")

            item['date'] = dt

            part_list.append(item)

    return part_list

def has_database(client, databaseName, environmentId: str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    has_database = True

    try:
        resp = client.get_kx_database(environmentId=environmentId, databaseName=databaseName)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
            sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        has_database = False

    return has_database


def has_cluster(client, clusterName, environmentId: str = None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    has_cluster = True

    try:
        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=clusterName)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
            sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        has_cluster = False

    return has_cluster


def get_kx_environment(client, environmentId: str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    try:
        resp = client.get_kx_environment(environmentId=environmentId)
    except client.exceptions.ResourceNotFoundException:
        return None

    if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
            sys.stderr.write("Error:\n {resp}")
    else:
        resp.pop('ResponseMetadata', None)

    resp = resp.get('environment', resp)    

    return resp


def wait_for_environment_status(client, environmentId:str, status: str='CREATED', sleep_sec=10, max_wait_sec=1200, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until environment is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        resp = get_kx_environment(client, environmentId=environmentId)

        if resp is None:
            print(f"Environment: {environmentId} does not exist")
            continue

        a_status = resp.get('status')

        if a_status is None:
            print("status is None, returning")
            return resp

        if a_status.upper() == "FAILED_CREATION":
            print(f"Failed, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif a_status.upper() != status.upper():
            if show_wait: 
                print(f"Status is {a_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")

            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            return resp

    return None


def wait_for_changeset_status(client, environmentId:str, databaseName: str, changesetId: str, status: str='COMPLETED', sleep_sec=10, max_wait_sec=1200, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until changeset is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        try:
            resp = client.get_kx_changeset(environmentId=environmentId, databaseName=databaseName, changesetId=changesetId)
        except client.exceptions.ResourceNotFoundException:
            resp = None
            continue

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

        a_status = resp.get('status')

        if a_status is not None and a_status.upper() == "FAILED":
            print(f"Failed, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif a_status is not None and a_status.upper() != status.upper():
            if show_wait: 
                print(f"Status is {a_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")

            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            return resp

    return None


def wait_for_cluster_status(client, environmentId:str, clusterName: str, status: str='RUNNING', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until cluster is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        try:
            resp = client.get_kx_cluster(environmentId = environmentId, clusterName=clusterName)

            if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                    sys.stderr.write("Error:\n {resp}")
            else:
                resp.pop('ResponseMetadata', None)

            this_cluster = resp
        except client.exceptions.ResourceNotFoundException:
            return None

        if this_cluster is None:
            print(f"cluster: {clusterName} not found")
            return None

        this_status = this_cluster['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Cluster: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Cluster: {clusterName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait > 0:
                    print(f"Cluster: {clusterName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Cluster: {clusterName} status is {this_status}")
                    
            return this_cluster

    print(f"No Cluster after {datetime.timedelta(seconds=total_wait)}")


def get_kx_volume(client, environmentId:str, volumeName: str):
    resp = None
    try:
        resp = client.get_kx_volume(environmentId = environmentId, volumeName=volumeName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def wait_for_volume_status(client, environmentId:str, volumeName: str, status: str='ACTIVE', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until volume is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        this_volume = get_kx_volume(client, environmentId = environmentId, volumeName=volumeName)

        if this_volume is None:
            print(f"Volume: {volumeName} not found")
            return None

        this_status = this_volume['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Volume: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Volume: {volumeName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait>0:
                    print(f"Volume: {volumeName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Volume: {volumeName} status is {this_status}")
            return this_volume

    print(f"No Volume after {datetime.timedelta(seconds=total_wait)}")


def get_kx_scaling_group(client, environmentId:str, scalingGroupName: str):
    resp = None

    try:
        resp = client.get_kx_scaling_group(environmentId = environmentId, scalingGroupName=scalingGroupName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def wait_for_scaling_group_status(client, environmentId:str, scalingGroupName: str, status: str='ACTIVE', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until volume is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        this_sg = get_kx_scaling_group(client=client, environmentId = environmentId, scalingGroupName=scalingGroupName)

        if this_sg is None:
            print(f"Scaling Group: {scalingGroupName} not found")
            return None

        this_status = this_sg['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Scaling Group: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Scaling Group: {scalingGroupName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait > 0:
                    print(f"Scaling Group: {scalingGroupName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Scaling Group: {scalingGroupName} status is {this_status}")
            return this_sg

    print(f"No Scaling Group after {datetime.timedelta(seconds=total_wait)}")


def get_kx_dataview(client, environmentId:str, databaseName: str, dataviewName: str):
    resp = None
    try:
        resp = client.get_kx_dataview(environmentId = environmentId, databaseName=databaseName, dataviewName=dataviewName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def wait_for_dataview_status(client, environmentId:str, databaseName: str, dataviewName: str, status: str='ACTIVE', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until dataview is in desired status
    """
    total_wait = 0

    max_polls = max_wait_sec/sleep_sec

    while True and total_wait < max_wait_sec:
        this_resp = get_kx_dataview(client=client, environmentId = environmentId, databaseName=databaseName, dataviewName=dataviewName)

        if this_resp is None:
            print(f"Dataview: {dataviewName} not found")
            return None

        this_status = this_resp['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Dataview: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Dataview: {dataviewName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait > 0:
                    print(f"Dataview: {dataviewName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Dataview: {dataviewName} status is {this_status}")
            return this_resp

    print(f"No Dataview after {datetime.timedelta(seconds=total_wait)}")


def get_kx_cluster(client, clusterName: str, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp=None
    try:
        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=clusterName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None
        return resp

    resp = resp.get('environment', resp)

    return resp


def get_kx_connection_string(client, clusterName: str, userName: str, boto_session, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp=client.get_kx_user(environmentId=environmentId, userName=userName)

    userArn = resp.get("userArn")
    iamRole = resp.get("iamRole")

    sts_client = boto_session.client(service_name='sts')
    cred_client = client

    try:
        resp = sts_client.assume_role(RoleArn=iamRole, RoleSessionName="Connect-to-kdb-cluster")

        creds = resp.get('Credentials')

        # session for calling get connection String
        cred_session = boto3.Session(
            aws_access_key_id=creds.get('AccessKeyId'),
            aws_secret_access_key=creds.get('SecretAccessKey'),
            aws_session_token=creds.get('SessionToken')
        )
        cred_client = cred_session.client(service_name='finspace')
    except:
        cred_client = client

    resp=cred_client.get_kx_connection_string(environmentId=environmentId, userArn=userArn, clusterName=clusterName)

    return resp.get("signedConnectionString", None)


def get_clusters(client, clusterType: str=None, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    svcs=[]

    try:
        svcs = list_kx_clusters(client, environmentId=environmentId, clusterType=clusterType)
    except botocore.exceptions.ClientError as error:
        code = error.response['Error']['Code']
        print(f"ClientError: {code} to environmentId: {environmentId}")
        return None
    except:
        return None

    svcs = sorted(svcs, key=lambda d: d['clusterName']) 

    dict_l = []

    for s in svcs:
        svc_name = s['clusterName']

        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=svc_name)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

        svc_details = resp

        d = {}

        k_list = ['clusterName', 'status', 
                  'clusterType', 'capacityConfiguration', 'commandLineArguments', 
                  'clusterDescription', 'lastModifiedTimestamp', 'createdTimestamp']

        for k in k_list:
            d[k] = resp.get(k, None)

        dbs = resp.get('databases', None)

        if dbs is not None:
            db = dbs[0]
            d['databaseName'] = db.get('databaseName', None)
            d['cacheConfigurations'] = db.get('cacheConfigurations', None)
        else:
            d['databaseName'] = None

        dict_l.append(d)

    return pd.DataFrame.from_dict(dict_l)


def print_clusters(client, print_empty=False, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    svcs=[]

    try:
        svcs = list_kx_clusters(client, environmentId=environmentId)
    except:
        svcs=[]

    if len(svcs) > 0 or print_empty == True:        
        print(f"Environment: {environmentId} Clusters found: {len(svcs)}")

    svcs = sorted(svcs, key=lambda d: d['clusterName']) 

    for s in svcs:
        svc_name = s['clusterName']

        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=svc_name)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

        svc_details = resp

        print(f"{svc_name.ljust(20)} {svc_details['status'].ljust(13)} {svc_details.get('connectionString', '')}")    


def get_kx_database(client, environmentId:str, databaseName: str):
    resp = None
    try:
        resp = client.get_kx_database(environmentId = environmentId, databaseName=databaseName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def dump_database(client, databaseName:str, changset_details=False, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    c_set_list=[]
    note_str=""

    db = client.get_kx_database(environmentId=environmentId, databaseName=databaseName)

    try:
        c_set_list = client.list_kx_changesets(environmentId=environmentId, databaseName=databaseName)['kxChangesets']
    except:
        note_str = "<<Could not get changesets>>"

    print(100*"=")
    print(f"Database: {databaseName}, Changesets: ")
    print(f"{db.get('description', '')}")
    print(f"Bytes: {db['numBytes']:,} Changesets: {db['numChangesets']:,} Files: {db['numFiles']:,}")
    print(100*"-")

    # sort by create time
    c_set_list = sorted(c_set_list, key=lambda d: d['createdTimestamp'], reverse=True) 

    if changset_details:
        for c in c_set_list:
            c_set_id = c['changesetId']
            print(f"Changeset ({c['status']}): {c_set_id}: Created: {c['createdTimestamp']}")
            c_rqs = client.get_kx_changeset(environmentId=environmentId, databaseName=databaseName, changesetId=c_set_id)['changeRequests']

            chs_pdf = pd.DataFrame.from_dict(c_rqs)# .style.hide_index()
            display(chs_pdf)
        print()
    else:
        display( pd.DataFrame.from_dict(c_set_list) )


def set_keepalive_linux(sock, after_idle_sec=1, interval_sec=3, max_fails=5):
    """Set TCP keepalive on an open socket.

    It activates after 1 second (after_idle_sec) of idleness,
    then sends a keepalive ping once every 3 seconds (interval_sec),
    and closes the connection after 5 failed ping (max_fails), or 15 seconds
    """
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)


def set_keepalive_osx(sock, after_idle_sec=1, interval_sec=3, max_fails=5):
    """Set TCP keepalive on an open socket.

    sends a keepalive ping once every 3 seconds (interval_sec)
    """
    # scraped from /usr/include, not exported by python's socket module
    TCP_KEEPALIVE = 0x10
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval_sec)


def parse_connection_string(conn_str: str):
    conn_parts = conn_str.split(":")

    host=conn_parts[2].strip("/")
    port = int(conn_parts[3])
    username=conn_parts[4]
    password=conn_parts[5]

    return host, port, username, password


def get_pykx_connection(client, clusterName: str, userName: str, boto_session, environmentId:str=None, tls_enabled:bool=False, timeout:float=300.0):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    import pykx as kx

    conn_str = get_kx_connection_string(client, 
                                      environmentId=environmentId, clusterName=clusterName, userName=userName,
                                      boto_session=boto_session)

    host, port, username, password = parse_connection_string(conn_str)

    return kx.SyncQConnection(host=host, port=port, username=username, password=password, tls=tls_enabled, timeout=timeout)

    if (tls_enabled is False) and kx.licensed:
        set_keepalive_linux(handle._sock, after_idle_sec=300, interval_sec=60, max_fails=25)

    return handle
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



ManagedkdbInsights/update_cluster/managed_kx.py [13:801]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
pd.set_option('display.max_colwidth', None)

#
# Functions
# ----------------------------------------------------------------------------

def get_kx_environment_id(client):
    l = list_kx_environments(client)
    envs = []

    for e in l:
        if e['status'] == "CREATED":
            envs.append(e)

    if len(envs) != 1:
        print(f"Environment count: {len(envs)} cannot infer which to use")
        return None

    # determined list len == 1, return that environmentId
    return envs[0].get('environmentId')


def list_kx_environments(client):
    envs = []

    envs.extend(client.list_kx_environments()['environments'])

    return envs


def list_kx_databases(client, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp = client.list_kx_databases(environmentId=environmentId)

    results = resp['kxDatabases']

    while "nextToken" in resp:
        resp = client.list_kx_databases(environmentId=environmentId, nextToken=resp['nextToken'])
        results.extend(resp['kxDatabases'])

    return results


def list_kx_clusters(client, clusterType=None, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp = None

    if clusterType is not None:
        resp = client.list_kx_clusters(environmentId=environmentId, clusterType=clusterType)
    else:
        resp = client.list_kx_clusters(environmentId=environmentId)

    results = resp['kxClusterSummaries']

    while "nextToken" in resp:
        if clusterType is not None:
            resp = client.list_kx_clusters(environmentId=environmentId, clusterType=clusterType, nextToken=resp['nextToken'])
        else:
            resp = client.list_kx_clusters(environmentId=environmentId, nextToken=resp['nextToken'])

        results.extend(resp['kxClusterSummaries'])

    return results


def list_kx_changesets(client, databaseName, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp = client.list_kx_changesets(environmentId=environmentId, databaseName=databaseName)

    results = resp.get('kxChangesets', [])

    while "nextToken" in resp:
        resp = client.list_kx_changesets(environmentId=environmentId, databaseName=databaseName, nextToken=resp['nextToken'])
        results.extend(resp['kxChangesets'])

    return results

def get_kx_changeset(client, environmentId:str, databaseName: str, changesetId:str):
    resp = None
    try:
        resp = client.get_kx_changeset(environmentId=environmentId, databaseName=databaseName, changesetId=changesetId)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def get_all_kx_changesets(client, databaseName, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    # get all changesets
    c_list=list_kx_changesets(client, environmentId=environmentId, databaseName=databaseName)

    # get details on each changeset in the database
    cd_list = []

    for c in c_list:
        c_id = c['changesetId']
        cd = get_kx_changeset(client=client, environmentId=environmentId, databaseName=databaseName, changesetId=c_id)

        cd_list.append(cd)

    # For each changeset, extract/flattend changeRequest
    part_list = []

    for cs in cd_list:
        for cr in cs.get('changeRequests', None):

            # anything?
            if cr is None:
                continue

            item = {}

            # get the core items
            item['status'] = cs['status']
            item['changesetId'] = cs['changesetId']
            item['createdTimestamp'] = cs['createdTimestamp']
            item['activeFromTimestamp'] = cs['activeFromTimestamp']
            item['lastModifiedTimestamp'] = cs['lastModifiedTimestamp']

            # get the request items
            item['changeType'] = cr['changeType']
            item['dbPath'] = cr['dbPath']

            # get the date partition from dbPath
            dt = None
            p = cr.get('dbPath', None)

            # has a specific date to parse
            if p is not None and p[0] == '/' and len(p) == 10:
                dt = datetime.datetime.strptime(p[1:11],"%Y.%m.%d")

            item['date'] = dt

            part_list.append(item)

    return part_list

def has_database(client, databaseName, environmentId: str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    has_database = True

    try:
        resp = client.get_kx_database(environmentId=environmentId, databaseName=databaseName)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
            sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        has_database = False

    return has_database


def has_cluster(client, clusterName, environmentId: str = None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    has_cluster = True

    try:
        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=clusterName)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
            sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        has_cluster = False

    return has_cluster


def get_kx_environment(client, environmentId: str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    try:
        resp = client.get_kx_environment(environmentId=environmentId)
    except client.exceptions.ResourceNotFoundException:
        return None

    if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
            sys.stderr.write("Error:\n {resp}")
    else:
        resp.pop('ResponseMetadata', None)

    resp = resp.get('environment', resp)    

    return resp


def wait_for_environment_status(client, environmentId:str, status: str='CREATED', sleep_sec=10, max_wait_sec=1200, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until environment is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        resp = get_kx_environment(client, environmentId=environmentId)

        if resp is None:
            print(f"Environment: {environmentId} does not exist")
            continue

        a_status = resp.get('status')

        if a_status is None:
            print("status is None, returning")
            return resp

        if a_status.upper() == "FAILED_CREATION":
            print(f"Failed, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif a_status.upper() != status.upper():
            if show_wait: 
                print(f"Status is {a_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")

            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            return resp

    return None


def wait_for_changeset_status(client, environmentId:str, databaseName: str, changesetId: str, status: str='COMPLETED', sleep_sec=10, max_wait_sec=1200, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until changeset is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        try:
            resp = client.get_kx_changeset(environmentId=environmentId, databaseName=databaseName, changesetId=changesetId)
        except client.exceptions.ResourceNotFoundException:
            resp = None
            continue

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

        a_status = resp.get('status')

        if a_status is not None and a_status.upper() == "FAILED":
            print(f"Failed, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif a_status is not None and a_status.upper() != status.upper():
            if show_wait: 
                print(f"Status is {a_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")

            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            return resp

    return None


def wait_for_cluster_status(client, environmentId:str, clusterName: str, status: str='RUNNING', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until cluster is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        try:
            resp = client.get_kx_cluster(environmentId = environmentId, clusterName=clusterName)

            if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                    sys.stderr.write("Error:\n {resp}")
            else:
                resp.pop('ResponseMetadata', None)

            this_cluster = resp
        except client.exceptions.ResourceNotFoundException:
            return None

        if this_cluster is None:
            print(f"cluster: {clusterName} not found")
            return None

        this_status = this_cluster['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Cluster: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Cluster: {clusterName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait > 0:
                    print(f"Cluster: {clusterName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Cluster: {clusterName} status is {this_status}")
                    
            return this_cluster

    print(f"No Cluster after {datetime.timedelta(seconds=total_wait)}")


def get_kx_volume(client, environmentId:str, volumeName: str):
    resp = None
    try:
        resp = client.get_kx_volume(environmentId = environmentId, volumeName=volumeName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def wait_for_volume_status(client, environmentId:str, volumeName: str, status: str='ACTIVE', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until volume is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        this_volume = get_kx_volume(client, environmentId = environmentId, volumeName=volumeName)

        if this_volume is None:
            print(f"Volume: {volumeName} not found")
            return None

        this_status = this_volume['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Volume: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Volume: {volumeName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait>0:
                    print(f"Volume: {volumeName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Volume: {volumeName} status is {this_status}")
            return this_volume

    print(f"No Volume after {datetime.timedelta(seconds=total_wait)}")


def get_kx_scaling_group(client, environmentId:str, scalingGroupName: str):
    resp = None

    try:
        resp = client.get_kx_scaling_group(environmentId = environmentId, scalingGroupName=scalingGroupName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def wait_for_scaling_group_status(client, environmentId:str, scalingGroupName: str, status: str='ACTIVE', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until volume is in desired status
    """
    total_wait = 0

    while True and total_wait < max_wait_sec:
        this_sg = get_kx_scaling_group(client=client, environmentId = environmentId, scalingGroupName=scalingGroupName)

        if this_sg is None:
            print(f"Scaling Group: {scalingGroupName} not found")
            return None

        this_status = this_sg['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Scaling Group: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Scaling Group: {scalingGroupName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait > 0:
                    print(f"Scaling Group: {scalingGroupName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Scaling Group: {scalingGroupName} status is {this_status}")
            return this_sg

    print(f"No Scaling Group after {datetime.timedelta(seconds=total_wait)}")


def get_kx_dataview(client, environmentId:str, databaseName: str, dataviewName: str):
    resp = None
    try:
        resp = client.get_kx_dataview(environmentId = environmentId, databaseName=databaseName, dataviewName=dataviewName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def wait_for_dataview_status(client, environmentId:str, databaseName: str, dataviewName: str, status: str='ACTIVE', sleep_sec=30, max_wait_sec=3600, show_wait=False):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    """
    Function polls until dataview is in desired status
    """
    total_wait = 0

    max_polls = max_wait_sec/sleep_sec

    while True and total_wait < max_wait_sec:
        this_resp = get_kx_dataview(client=client, environmentId = environmentId, databaseName=databaseName, dataviewName=dataviewName)

        if this_resp is None:
            print(f"Dataview: {dataviewName} not found")
            return None

        this_status = this_resp['status']

        if this_status.upper() == "CREATE_FAILED":
            print(f"Dataview: {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")

            error_info = resp.get('errorInfo', None)
            if error_info is not None:
                print(f"Type: {error_info.get('errorType', '')}")
                print(f"Message: {error_info.get('errorMessage', '')}")
                print(error_info)
            else:
                print(resp)

            return None
        elif this_status.upper() != status.upper():
            if show_wait:
                print(f"Dataview: {dataviewName} status is {this_status}, total wait {datetime.timedelta(seconds=total_wait)}, waiting {sleep_sec} sec ...")
            time.sleep(sleep_sec)
            total_wait = total_wait + sleep_sec
            continue
        else:
            if show_wait:
                if total_wait > 0:
                    print(f"Dataview: {dataviewName} status is now {this_status}, total wait {datetime.timedelta(seconds=total_wait)}")
                else:
                    print(f"Dataview: {dataviewName} status is {this_status}")
            return this_resp

    print(f"No Dataview after {datetime.timedelta(seconds=total_wait)}")


def get_kx_cluster(client, clusterName: str, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp=None
    try:
        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=clusterName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None
        return resp

    resp = resp.get('environment', resp)

    return resp


def get_kx_connection_string(client, clusterName: str, userName: str, boto_session, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    resp=client.get_kx_user(environmentId=environmentId, userName=userName)

    userArn = resp.get("userArn")
    iamRole = resp.get("iamRole")

    sts_client = boto_session.client(service_name='sts')
    cred_client = client

    try:
        resp = sts_client.assume_role(RoleArn=iamRole, RoleSessionName="Connect-to-kdb-cluster")

        creds = resp.get('Credentials')

        # session for calling get connection String
        cred_session = boto3.Session(
            aws_access_key_id=creds.get('AccessKeyId'),
            aws_secret_access_key=creds.get('SecretAccessKey'),
            aws_session_token=creds.get('SessionToken')
        )
        cred_client = cred_session.client(service_name='finspace')
    except:
        cred_client = client

    resp=cred_client.get_kx_connection_string(environmentId=environmentId, userArn=userArn, clusterName=clusterName)

    return resp.get("signedConnectionString", None)


def get_clusters(client, clusterType: str=None, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    svcs=[]

    try:
        svcs = list_kx_clusters(client, environmentId=environmentId, clusterType=clusterType)
    except botocore.exceptions.ClientError as error:
        code = error.response['Error']['Code']
        print(f"ClientError: {code} to environmentId: {environmentId}")
        return None
    except:
        return None

    svcs = sorted(svcs, key=lambda d: d['clusterName']) 

    dict_l = []

    for s in svcs:
        svc_name = s['clusterName']

        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=svc_name)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

        svc_details = resp

        d = {}

        k_list = ['clusterName', 'status', 
                  'clusterType', 'capacityConfiguration', 'commandLineArguments', 
                  'clusterDescription', 'lastModifiedTimestamp', 'createdTimestamp']

        for k in k_list:
            d[k] = resp.get(k, None)

        dbs = resp.get('databases', None)

        if dbs is not None:
            db = dbs[0]
            d['databaseName'] = db.get('databaseName', None)
            d['cacheConfigurations'] = db.get('cacheConfigurations', None)
        else:
            d['databaseName'] = None

        dict_l.append(d)

    return pd.DataFrame.from_dict(dict_l)


def print_clusters(client, print_empty=False, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    svcs=[]

    try:
        svcs = list_kx_clusters(client, environmentId=environmentId)
    except:
        svcs=[]

    if len(svcs) > 0 or print_empty == True:        
        print(f"Environment: {environmentId} Clusters found: {len(svcs)}")

    svcs = sorted(svcs, key=lambda d: d['clusterName']) 

    for s in svcs:
        svc_name = s['clusterName']

        resp = client.get_kx_cluster(environmentId=environmentId, clusterName=svc_name)
        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)

        svc_details = resp

        print(f"{svc_name.ljust(20)} {svc_details['status'].ljust(13)} {svc_details.get('connectionString', '')}")    


def get_kx_database(client, environmentId:str, databaseName: str):
    resp = None
    try:
        resp = client.get_kx_database(environmentId = environmentId, databaseName=databaseName)

        if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
                sys.stderr.write("Error:\n {resp}")
        else:
            resp.pop('ResponseMetadata', None)
    except client.exceptions.ResourceNotFoundException:
        resp=None

    return resp


def dump_database(client, databaseName:str, changset_details=False, environmentId:str=None):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    c_set_list=[]
    note_str=""

    db = client.get_kx_database(environmentId=environmentId, databaseName=databaseName)

    try:
        c_set_list = client.list_kx_changesets(environmentId=environmentId, databaseName=databaseName)['kxChangesets']
    except:
        note_str = "<<Could not get changesets>>"

    print(100*"=")
    print(f"Database: {databaseName}, Changesets: ")
    print(f"{db.get('description', '')}")
    print(f"Bytes: {db['numBytes']:,} Changesets: {db['numChangesets']:,} Files: {db['numFiles']:,}")
    print(100*"-")

    # sort by create time
    c_set_list = sorted(c_set_list, key=lambda d: d['createdTimestamp'], reverse=True) 

    if changset_details:
        for c in c_set_list:
            c_set_id = c['changesetId']
            print(f"Changeset ({c['status']}): {c_set_id}: Created: {c['createdTimestamp']}")
            c_rqs = client.get_kx_changeset(environmentId=environmentId, databaseName=databaseName, changesetId=c_set_id)['changeRequests']

            chs_pdf = pd.DataFrame.from_dict(c_rqs)# .style.hide_index()
            display(chs_pdf)
        print()
    else:
        display( pd.DataFrame.from_dict(c_set_list) )


def set_keepalive_linux(sock, after_idle_sec=1, interval_sec=3, max_fails=5):
    """Set TCP keepalive on an open socket.

    It activates after 1 second (after_idle_sec) of idleness,
    then sends a keepalive ping once every 3 seconds (interval_sec),
    and closes the connection after 5 failed ping (max_fails), or 15 seconds
    """
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, after_idle_sec)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, interval_sec)
    sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPCNT, max_fails)


def set_keepalive_osx(sock, after_idle_sec=1, interval_sec=3, max_fails=5):
    """Set TCP keepalive on an open socket.

    sends a keepalive ping once every 3 seconds (interval_sec)
    """
    # scraped from /usr/include, not exported by python's socket module
    TCP_KEEPALIVE = 0x10
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    sock.setsockopt(socket.IPPROTO_TCP, TCP_KEEPALIVE, interval_sec)


def parse_connection_string(conn_str: str):
    conn_parts = conn_str.split(":")

    host=conn_parts[2].strip("/")
    port = int(conn_parts[3])
    username=conn_parts[4]
    password=conn_parts[5]

    return host, port, username, password


def get_pykx_connection(client, clusterName: str, userName: str, boto_session, environmentId:str=None, tls_enabled:bool=False, timeout:float=300.0):
    if environmentId is None:
        environmentId = get_kx_environment_id(client)

    import pykx as kx

    conn_str = get_kx_connection_string(client, 
                                      environmentId=environmentId, clusterName=clusterName, userName=userName,
                                      boto_session=boto_session)

    host, port, username, password = parse_connection_string(conn_str)

    return kx.SyncQConnection(host=host, port=port, username=username, password=password, tls=tls_enabled, timeout=timeout)

    if (tls_enabled is False) and kx.licensed:
        set_keepalive_linux(handle._sock, after_idle_sec=300, interval_sec=60, max_fails=25)

    return handle
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



