in source/soca/cluster_web_ui/api/v1/ldap/openldap/user.py [0:0]
def post(self):
"""
Create a new LDAP user
---
tags:
- User Management
parameters:
- in: body
name: body
schema:
required:
- user
- password
- sudoers
- email
optional:
- uid
- gid
properties:
user:
type: string
description: user you want to create
password:
type: string
description: Password for the new user
sudoers:
type: boolean
description: True (give user SUDO permissions) or False
email:
type: string
description: Email address associated to the user
uid:
type: integer
description: Linux UID to be associated to the user
gid:
type: integer
description: Linux GID to be associated to user's group
responses:
200:
description: User created
203:
description: User already exist
400:
description: Malformed client input
"""
parser = reqparse.RequestParser()
parser.add_argument('user', type=str, location='form')
parser.add_argument('password', type=str, location='form')
parser.add_argument('sudoers', type=int, location='form')
parser.add_argument('email', type=str, location='form')
parser.add_argument('shell', type=str, location='form')
parser.add_argument('uid', type=int, location='form') # 0 = no value specified, use default one
parser.add_argument('gid', type=int, location='form') # 0 = no value specified, use default one
args = parser.parse_args()
user = ''.join(x for x in args["user"] if x.isalpha() or x.isdigit()).lower() # Sanitize input
password = args["password"]
sudoers = args["sudoers"]
email = args["email"]
uid = args["uid"]
gid = args["gid"]
shell = args["shell"]
group = f"{args['user']}{config.Config.GROUP_NAME_SUFFIX}"
if shell is None:
shell = "/bin/bash"
get_id = get(config.Config.FLASK_ENDPOINT + '/api/ldap/ids',
headers={"X-SOCA-TOKEN": config.Config.API_ROOT_KEY},
verify=False) # nosec
if get_id.status_code == 200:
current_ldap_ids = (json.loads(get_id.text))
else:
logger.error("/api/ldap/ids returned error : " + str(get_id.__dict__))
return {"success": False, "message": "/api/ldap/ids returned error: " +str(get_id.__dict__)}, 500
if user is None or password is None or sudoers is None or email is None:
return errors.all_errors("CLIENT_MISSING_PARAMETER", "user (str), password (str), sudoers (bool) and email (str) parameters are required")
# Note: parseaddr adheres to rfc5322 , which means user@domain is a correct address.
# You do not necessarily need to add a tld at the end
if "@" not in parseaddr(email)[1]:
return errors.all_errors("INVALID_EMAIL_ADDRESS")
if uid == 0:
uid = current_ldap_ids["message"]['proposed_uid']
else:
if uid in current_ldap_ids["message"]['uid_in_use']:
return errors.all_errors("UID_ALREADY_IN_USE")
if gid == 0:
gid = current_ldap_ids["message"]['proposed_gid']
else:
if gid in current_ldap_ids["message"]['gid_in_use']:
return errors.all_errors("GID_ALREADY_IN_USE")
try:
conn = ldap.initialize('ldap://' + config.Config.LDAP_HOST)
dn_user = "uid=" + user + ",ou=people," + config.Config.LDAP_BASE_DN
enc_passwd = bytes(password, 'utf-8')
salt = os.urandom(16)
sha = hashlib.sha1(enc_passwd) # nosec
sha.update(salt)
digest = sha.digest()
b64_envelop = encode(digest + salt)
passwd = '{{SSHA}}{}'.format(b64_envelop.decode('utf-8'))
attrs = [
('objectClass', ['top'.encode('utf-8'),
'person'.encode('utf-8'),
'posixAccount'.encode('utf-8'),
'shadowAccount'.encode('utf-8'),
'inetOrgPerson'.encode('utf-8'),
'organizationalPerson'.encode('utf-8')]),
('uid', [str(user).encode('utf-8')]),
('uidNumber', [str(uid).encode('utf-8')]),
('gidNumber', [str(gid).encode('utf-8')]),
('mail', [email.encode('utf-8')]),
('cn', [str(user).encode('utf-8')]),
('sn', [str(user).encode('utf-8')]),
('loginShell', [str(shell).encode('utf-8')]),
('homeDirectory', (config.Config.USER_HOME + '/' + str(user)).encode('utf-8')),
('userPassword', [passwd.encode('utf-8')])
]
conn.simple_bind_s(config.Config.ROOT_DN, config.Config.ROOT_PW)
# Create group first to prevent GID issue
create_user_group = post(config.Config.FLASK_ENDPOINT + "/api/ldap/group",
headers={"X-SOCA-TOKEN": config.Config.API_ROOT_KEY},
data={"group": f"{group}", "gid": gid},
verify=False) # nosec
if create_user_group.status_code != 200:
return errors.all_errors("COULD_NOT_CREATE_GROUP", str(create_user_group.text))
# Assign user
conn.add_s(dn_user, attrs)
# Add user to group
update_group = put(config.Config.FLASK_ENDPOINT + "/api/ldap/group",
headers={"X-SOCA-TOKEN": config.Config.API_ROOT_KEY},
data={"group": f"{group}",
"user": user,
"action": "add"},
verify=False) # nosec
if update_group.status_code != 200:
return errors.all_errors("UNABLE_TO_ADD_USER_TO_GROUP", "User/Group created but could not add user to his group")
# Create home directory
if create_home(user, group) is False:
return errors.all_errors("UNABLE_CREATE_HOME", "User added but unable to create home director")
# Create API Key
try:
get(config.Config.FLASK_ENDPOINT + "/api/user/api_key",
headers={"X-SOCA-TOKEN": config.Config.API_ROOT_KEY},
params={"user": user},
verify=False).json() # nosec
except Exception as err:
logger.error("User created but unable to create API key. SOCA will try to generate it when user log in for the first time " + str(err))
# Add Sudo permission
if sudoers == 1:
grant_sudo = post(config.Config.FLASK_ENDPOINT + "/api/ldap/sudo",
headers={"X-SOCA-TOKEN": config.Config.API_ROOT_KEY},
data={"user": user},
verify=False # nosec
)
if grant_sudo.status_code != 200:
return errors.all_errors("UNABLE_TO_GRANT_SUDO", "User added but unable to give admin permissions")
return {"success": True, "message": "Added user"}, 200
except Exception as err:
return errors.all_errors(type(err).__name__, err)