cdk/cdk-domainless-mode/test-scripts/add_delete_kerberos_leases.py (54 lines of code) (raw):
import grpc
import credentialsfetcher_pb2
import credentialsfetcher_pb2_grpc
import os
import json
import time
'''
Use this script to create and delete N kerberos leases in a recurring loop
(currently set to 100 times). This script is run to test that create/delete
functionality has no leaks or unexpected failures when run over a long
period of time. This script is run on a linux instance in stand-alone mode.
'''
with open('../data.json', 'r') as file:
# Load the JSON data
data = json.load(file)
def run():
with grpc.insecure_channel('unix:///var/credentials-fetcher/socket/credentials_fetcher.sock') as channel:
stub = credentialsfetcher_pb2_grpc.CredentialsFetcherServiceStub(channel)
number_of_gmsa_accounts = data["number_of_gmsa_accounts"]
directory_name = data["directory_name"]
netbios_name = data["netbios_name"]
username = data["username"]
password = data["password"]
for iter in range(100): # Repeat the process 100 times
lease_ids = []
# Create cred-specs for users ending with multiples of 5
for i in range(2, number_of_gmsa_accounts, 2):
credspec_contents = f"""{{
"CmsPlugins": ["ActiveDirectory"],
"DomainJoinConfig": {{
"Sid": "S-1-5-21-2725122404-4129967127-2630707939",
"MachineAccountName": "WebApp0{i}",
"Guid": "e96e0e09-9305-462f-9e44-8a8179722897",
"DnsTreeName": "{directory_name}",
"DnsName": "{directory_name}",
"NetBiosName": "{netbios_name}"
}},
"ActiveDirectoryConfig": {{
"GroupManagedServiceAccounts": [
{{"Name": "WebApp0{i}", "Scope": "{directory_name}"}},
{{"Name": "WebApp0{i}", "Scope": "{netbios_name}"}}
],
"HostAccountConfig": {{
"PortableCcgVersion": "1",
"PluginGUID": "{{GDMA0342-266A-4D1P-831J-20990E82944F}}",
"PluginInput": {{
"CredentialArn": "aws/directoryservice/contoso/gmsa"
}}
}}
}}
}}"""
contents = [credspec_contents]
response = stub.AddNonDomainJoinedKerberosLease(
credentialsfetcher_pb2.CreateNonDomainJoinedKerberosLeaseRequest(
credspec_contents=contents,
username=username,
password=password,
domain=directory_name
)
)
print(f"Created lease for WebApp0{i}: {response.lease_id}")
lease_path = (f"/var/credentials-fetcher/krbdir/"
f"{response.lease_id}/WebApp0{i}/krb5cc")
assert os.path.exists(lease_path)
lease_ids.append(response.lease_id)
# Small delay to allow for processing
time.sleep(1)
# Delete the created cred-specs
for lease_id in lease_ids:
delete_response = stub.DeleteKerberosLease(
credentialsfetcher_pb2.DeleteKerberosLeaseRequest(
lease_id=lease_id
)
)
print(f"Deleted lease: {delete_response.lease_id}")
lease_path = f"/var/credentials-fetcher/krbdir/{lease_id}"
print(lease_path)
assert not os.path.exists(lease_path)
print(f"Completed {iter} cycle of creation and deletion")
if __name__ == '__main__':
run()