in ManagedkdbInsights/hdb_backup/managed_kx.py [0:0]
def get_clusters(client, clusterType: str=None, environmentId:str=None):
if environmentId is None:
environmentId = get_kx_environment_id(client)
svcs=[]
try:
svcs = list_kx_clusters(client, environmentId=environmentId, clusterType=clusterType)
except botocore.exceptions.ClientError as error:
code = error.response['Error']['Code']
print(f"ClientError: {code} to environmentId: {environmentId}")
return None
except:
return None
svcs = sorted(svcs, key=lambda d: d['clusterName'])
dict_l = []
for s in svcs:
svc_name = s['clusterName']
resp = client.get_kx_cluster(environmentId=environmentId, clusterName=svc_name)
if resp['ResponseMetadata']['HTTPStatusCode'] != 200:
sys.stderr.write("Error:\n {resp}")
else:
resp.pop('ResponseMetadata', None)
svc_details = resp
d = {}
k_list = ['clusterName', 'status',
'clusterType', 'capacityConfiguration', 'commandLineArguments',
'clusterDescription', 'lastModifiedTimestamp', 'createdTimestamp']
for k in k_list:
d[k] = resp.get(k, None)
dbs = resp.get('databases', None)
if dbs is not None:
db = dbs[0]
d['databaseName'] = db.get('databaseName', None)
d['cacheConfigurations'] = db.get('cacheConfigurations', None)
else:
d['databaseName'] = None
dict_l.append(d)
return pd.DataFrame.from_dict(dict_l)