ad-joining/register-computer/kerberos/password.py (52 lines of code) (raw):
#
# Copyright 2019 Google LLC
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import os
import subprocess
import logging
import tempfile
class KerberosException(Exception):
def __init__(self, message, error_code):
self.__message = message
self.__error_code = error_code
super().__init__(self.__message)
def get_error_code(self):
return self.__error_code
class KerberosPasswordClient(object):
KSETPWD_BINARY = "ksetpwd"
def __init__(self, realm, kdc, admin_server, client_upn, client_password):
self.__realm = realm
self.__kdc = kdc
self.__admin_server = admin_server
self.__client_upn = client_upn
self.__client_password = client_password
def __generate_config_file(self):
config = """[libdefaults]
dns_lookup_realm = false
dns_lookup_kdc = false
[realms]
%s = {
kdc = %s
admin_server = %s
}
""" % (self.__realm.upper(), self.__kdc, self.__admin_server)
handle, name = tempfile.mkstemp(prefix = "krb5.")
with open(handle, "w", encoding = "utf-8") as f:
f.write(config)
return name
def set_password(self, upn, password):
bin_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "bin")
config_file = self.__generate_config_file()
env = os.environ.copy()
env["LD_LIBRARY_PATH"] = bin_path # for libcom_err.so
env["KRB5_CONFIG"] = config_file
env["KSETPWD_AGENT_PASSWORD"] = self.__client_password
env["KSETPWD_TARGET_PASSWORD"] = password
ksetpwd = os.path.join(bin_path, KerberosPasswordClient.KSETPWD_BINARY)
logging.info("Launching %s with environment config at %s and admin server %s" % (ksetpwd, config_file, self.__admin_server))
# NB. Realm names must be upper case to work.
process = subprocess.run(
[ksetpwd, self.__client_upn.upper(), upn.upper()],
capture_output=True,
env=env)
# Delete KRB5 config
os.unlink(config_file)
if process.returncode == 0:
if process.stderr:
logging.info(process.stderr)
if process.stdout:
logging.info(process.stdout)
else:
if process.stderr:
logging.warning(process.stderr)
if process.stdout:
logging.warning(process.stdout)
raise KerberosException("Password reset failed: %d" % process.returncode, process.returncode)