4 Star 13 Fork 12

single_yang/ddtalk_ldap

加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
文件
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
ldap.py 4.80 KB
一键复制 编辑 原始数据 按行查看 历史
single_yang 提交于 2021-03-08 11:44 +08:00 . update ldap.py.
import json
import pinyin
from ldap3 import Server, Connection, SUBTREE, ALL_ATTRIBUTES
class LDAP(object):
def __init__(self, host, port, base_dn, user, password):
dn = 'cn={},{}'.format(user, base_dn)
self.server = Server(host=host, port=port)
self.base_dn = base_dn
self.__conn = Connection(self.server, dn, password, auto_bind=True)
def add_user(self, userid, username, positions, department):
uidNumber = '%s' % userid[-4:]
objectclass = ['top', 'inetOrgPerson', 'posixAccount']
# 只将研发人员加入ldap
# 分组名称与ldap分组名称保持一致
try:
position = positions.replace('组', '')
except:
position = positions
if position not in ['产品', '后端', '前端', '测试', '设计', '中台']:
return "Other"
# 获取项目组gidNumber
if position not in ['产品', '后端', '前端']:
search_dn = 'cn={},cn=产品研发,ou=People,{}'.format(position, self.base_dn)
dn = 'cn={},cn={},cn=产品研发,ou=People,{}'.format(username, position, self.base_dn)
else:
search_dn = 'cn={},cn={},cn=产品研发,ou=People,{}'.format(position, department, self.base_dn)
dn = 'cn={},cn={},cn={},cn=产品研发,ou=People,{}'.format(username, position, department, self.base_dn)
filter = '(cn={})'.format(position)
status = self.__conn.search(search_dn, filter, attributes=['gidNumber'])
if status:
data = self.__conn.entries
gidNumber = data[0].entry_attributes_as_dict["gidNumber"]
else:
return " Gid Error"
# 添加新入职员工
s = self.__conn.add(dn, objectclass, {
'sn': pinyin.get(username[0], format='strip').encode('utf-8'),
'cn': username.encode('utf-8'),
'uid': pinyin.get(username, format='strip').encode('utf-8'),
'gidNumber': gidNumber,
'homeDirectory': '/home/users/%s' % pinyin.get(username, format='strip').encode('utf-8'),
'loginShell': '/usr/sbin/nologin'.encode('utf-8'),
'uidNumber': uidNumber.encode('utf-8'),
'userPassword': '123456'.encode('utf-8'),
})
return s
# 删除离职员工
def delete_user(self, username=None, positions=None, department=None, dn=None):
if dn:
return self.__conn.delete(dn)
# 分组名称与ldap名称保持一致
try:
position = positions.replace('组', '')
except:
position = positions
if position not in ['产品', '后端', '前端', '测试', '设计', '中台']:
return "Other"
if position not in ['产品', '后端', '前端']:
dn = 'cn={},cn={},cn=产品研发,ou=People,{}'.format(username, position, self.base_dn)
else:
dn = 'cn={},cn={},cn={},cn=产品研发,ou=People,{}'.format(username, position, department, self.base_dn)
return self.__conn.delete(dn)
# 用于丁丁组织架构调整时ldap的调整,丁丁只有两种事件:离职或入职!!!
def get_user_info(self, base_dn=None, **kwargs):
"""
` 参考: https://ldap3.readthedocs.io/tutorial_searches.html
获取用户dn, 通过 args
可以支持多个参数: get_user_info(mail="xxx@domain.com", cn="姓名")
会根据 kwargs 生成 search的内容,进行查询: 多个条件是 & and查询
返回第一个查询到的结果,
建议使用唯一标识符进行查询
这个函数基本可以获取所有类型的数据
:param base_dn:
:param kwargs:
:return: {"dn": dn, "cn": 项目组名称}
"""
search = ""
for k, v in kwargs.items():
search += "(%s=%s)" % (k, v)
if not base_dn:
base_dn = self.base_dn
if search:
search_filter = '(&{})'.format(search)
else:
search_filter = ''
status = self.__conn.search(base_dn,
search_filter=search_filter,
search_scope=SUBTREE,
attributes=ALL_ATTRIBUTES
)
if status:
return self.__conn.entries
else:
return False
def get_user_department(self, username):
entries = self.get_user_info(cn=username)
if isinstance(entries, list):
for entry in entries:
dn = json.loads(entry.entry_to_json())["dn"]
dn_list = dn.split(",")
if len(dn_list) == 6:
yield {"dn": dn, "cn": dn_list[1][3:]}
else:
yield {"dn": dn, "cn": dn_list[2][3:]}
else:
return {}
Loading...
马建仓 AI 助手
尝试更多
代码解读
代码找茬
代码优化
Python
1
https://gitee.com/single_yang/ddtalk_ldap.git
git@gitee.com:single_yang/ddtalk_ldap.git
single_yang
ddtalk_ldap
ddtalk_ldap
master

搜索帮助