in cdk/cdk-domainless-mode/test-scripts/add_delete_kerberos_leases.py [0:0]
def run():
with grpc.insecure_channel('unix:///var/credentials-fetcher/socket/credentials_fetcher.sock') as channel:
stub = credentialsfetcher_pb2_grpc.CredentialsFetcherServiceStub(channel)
number_of_gmsa_accounts = data["number_of_gmsa_accounts"]
directory_name = data["directory_name"]
netbios_name = data["netbios_name"]
username = data["username"]
password = data["password"]
for iter in range(100): # Repeat the process 100 times
lease_ids = []
# Create cred-specs for users ending with multiples of 5
for i in range(2, number_of_gmsa_accounts, 2):
credspec_contents = f"""{{
"CmsPlugins": ["ActiveDirectory"],
"DomainJoinConfig": {{
"Sid": "S-1-5-21-2725122404-4129967127-2630707939",
"MachineAccountName": "WebApp0{i}",
"Guid": "e96e0e09-9305-462f-9e44-8a8179722897",
"DnsTreeName": "{directory_name}",
"DnsName": "{directory_name}",
"NetBiosName": "{netbios_name}"
}},
"ActiveDirectoryConfig": {{
"GroupManagedServiceAccounts": [
{{"Name": "WebApp0{i}", "Scope": "{directory_name}"}},
{{"Name": "WebApp0{i}", "Scope": "{netbios_name}"}}
],
"HostAccountConfig": {{
"PortableCcgVersion": "1",
"PluginGUID": "{{GDMA0342-266A-4D1P-831J-20990E82944F}}",
"PluginInput": {{
"CredentialArn": "aws/directoryservice/contoso/gmsa"
}}
}}
}}
}}"""
contents = [credspec_contents]
response = stub.AddNonDomainJoinedKerberosLease(
credentialsfetcher_pb2.CreateNonDomainJoinedKerberosLeaseRequest(
credspec_contents=contents,
username=username,
password=password,
domain=directory_name
)
)
print(f"Created lease for WebApp0{i}: {response.lease_id}")
lease_path = (f"/var/credentials-fetcher/krbdir/"
f"{response.lease_id}/WebApp0{i}/krb5cc")
assert os.path.exists(lease_path)
lease_ids.append(response.lease_id)
# Small delay to allow for processing
time.sleep(1)
# Delete the created cred-specs
for lease_id in lease_ids:
delete_response = stub.DeleteKerberosLease(
credentialsfetcher_pb2.DeleteKerberosLeaseRequest(
lease_id=lease_id
)
)
print(f"Deleted lease: {delete_response.lease_id}")
lease_path = f"/var/credentials-fetcher/krbdir/{lease_id}"
print(lease_path)
assert not os.path.exists(lease_path)
print(f"Completed {iter} cycle of creation and deletion")