testing/vcttesting/ldap.py (163 lines of code) (raw):
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
from __future__ import absolute_import, unicode_literals
import os
import ldap
import paramiko
class LDAP(object):
"""Interface to an LDAP server."""
def __init__(self, uri, username=None, password=None):
self.c = ldap.initialize(uri)
if username and password:
self.c.simple_bind_s(username, password)
def create_user(
self,
email,
username,
uid,
fullname,
key_filename=None,
scm_level=None,
hg_access=True,
hg_enabled=True,
bugzilla_email=None,
groups=None,
):
"""Create a new user in LDAP.
The user has an ``email`` address, a full ``name``, a
``username`` (for system accounts) and a numeric ``uid``.
"""
if not bugzilla_email:
bugzilla_email = email
dn = "mail=%s,o=com,dc=mozilla" % email
username = username.encode("utf-8")
fullname = fullname.encode("utf-8")
bugzilla_email = bugzilla_email.encode("utf-8")
r = [
(
"objectClass",
[
b"inetOrgPerson",
b"organizationalPerson",
b"person",
b"posixAccount",
b"bugzillaAccount",
b"top",
],
),
("cn", [fullname]),
("gidNumber", [b"100"]),
("homeDirectory", [b"/home/%s" % username]),
("sn", [fullname.split()[-1]]),
("uid", [username]),
("uidNumber", [str(uid).encode("utf-8")]),
("bugzillaEmail", [bugzilla_email]),
]
if hg_access:
r[0][1].append(b"hgAccount")
value = b"TRUE" if hg_enabled else b"FALSE"
r.extend(
[
("fakeHome", [b"/tmp"]),
("hgAccountEnabled", [value]),
("hgHome", [b"/tmp"]),
("hgShell", [b"/bin/sh"]),
]
)
self.c.add_s(dn, r)
res = {
"dn": dn,
"ldap_groups": set(),
}
if key_filename:
pubkey_filename = "%s.pub" % key_filename
if os.path.exists(key_filename):
with open(pubkey_filename, "rb") as fh:
pubkey = fh.read()
else:
k = paramiko.rsakey.RSAKey.generate(2048)
k.write_private_key_file(key_filename)
pubkey = "%s %s %s" % (k.get_name(), k.get_base64(), email)
pubkey = pubkey.encode("utf-8")
with open(pubkey_filename, "wb") as fh:
fh.write(pubkey)
self.add_ssh_key(email, pubkey)
res["ssh_pubkey"] = pubkey
res["ssh_key_filename"] = key_filename
res["ssh_pubkey_filename"] = pubkey_filename
if scm_level:
if scm_level < 1 or scm_level > 4:
raise ValueError(
"scm level must be between 1 and 3 (or 4 for "
"`scm_allow_direct_push`): %s" % scm_level
)
for level in range(1, scm_level + 1):
if level == 4:
group = "scm_allow_direct_push"
else:
group = "scm_level_%d" % level
self.add_user_to_group(email, group)
res["ldap_groups"].add(group)
if groups:
for group in groups:
self.add_user_to_group(email, group)
res["ldap_groups"].add(group)
return res
def delete_user(self, email):
"""Deletes the specified user from LDAP."""
dn = "mail=%s,o=com,dc=mozilla" % email
# Remove from posix groups.
results = self.c.search_s(
"ou=groups,dc=mozilla",
ldap.SCOPE_SUBTREE,
"(memberUid=%s)" % email,
[b"cn"],
)
for result in results:
self.c.modify_s(result[0], [(ldap.MOD_DELETE, b"memberUid", str(email))])
# Remove from ldap groups.
results = self.c.search_s(
"ou=groups,dc=mozilla", ldap.SCOPE_SUBTREE, "(member=%s)" % dn, [b"cn"]
)
for result in results:
self.c.modify_s(result[0], [(ldap.MOD_DELETE, b"member", str(dn))])
# Delete the user entry.
self.c.delete_s(dn)
def create_vcs_sync_login(self, pubkey):
dn = "uid=vcs-sync,ou=logins,dc=mozilla"
r = [
(
"objectClass",
[
b"account",
b"top",
b"uidObject",
b"hgAccount",
b"mailObject",
b"posixAccount",
b"ldapPublicKey",
],
),
("cn", [b"VCS Sync"]),
("fakeHome", [b"/tmp"]),
("gidNumber", [b"100"]),
("hgAccountEnabled", [b"TRUE"]),
("hgHome", [b"/tmp"]),
("hgShell", [b"/bin/sh"]),
("homeDirectory", [b"/home/vcs-sync"]),
("mail", [b"vcs-sync@mozilla.com"]),
("uidNumber", [b"1500"]),
("sshPublicKey", [pubkey.encode("utf-8")]),
]
self.c.add_s(dn, r)
def add_ssh_key(self, email, key):
"""Add an SSH key to a user in LDAP."""
dn = "mail=%s,o=com,dc=mozilla" % email
modlist = []
try:
existing = self.c.search_s(dn, ldap.SCOPE_BASE, attrlist=["objectClass"])[
0
][1]
if b"ldapPublicKey" not in existing["objectClass"]:
modlist.append((ldap.MOD_ADD, "objectClass", b"ldapPublicKey"))
except ldap.NO_SUCH_OBJECT:
pass
modlist.append((ldap.MOD_ADD, "sshPublicKey", key))
self.c.modify_s(dn, modlist)
def add_user_to_group(self, email, group):
"""Add a user to the specified LDAP group.
The ``group`` is defined in terms of its ``cn`` under
``ou=groups,dc=mozilla`. e.g. ``scm_level_3``.
"""
dn = "mail=%s,o=com,dc=mozilla" % email
group_dn = "cn=%s,ou=groups,dc=mozilla" % group
modlist = [(ldap.MOD_ADD, "memberUid", email.encode("utf-8"))]
self.c.modify_s(group_dn, modlist)
# MoCo LDAP has an active_* for each scm_level_* group, which we need
# to emulate here.
if group.startswith("scm_"):
group_dn = "cn=active_%s,ou=groups,dc=mozilla" % group
modlist = [(ldap.MOD_ADD, "member", dn.encode("utf-8"))]
self.c.modify_s(group_dn, modlist)