我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用ldap.SCOPE_ONELEVEL。
def _replication_agreements(self): self._log.debug('Checking for replication agreements...') msg = [] healthy = True suffix = self._base_dn.replace('=', '\\3D').replace(',', '\\2C') results = self._search( 'cn=replica,cn=%s,cn=mapping tree,cn=config' % suffix, '(objectClass=*)', ['nsDS5ReplicaHost', 'nsds5replicaLastUpdateStatus'], scope=ldap.SCOPE_ONELEVEL ) for result in results: dn, attrs = result host = attrs['nsDS5ReplicaHost'][0].decode('utf-8') host = host.partition('.')[0] status = attrs['nsds5replicaLastUpdateStatus'][0].decode('utf-8') status = status.replace('Error ', '').partition(' ')[0].strip('()') if status != '0': healthy = False msg.append('%s %s' % (host, status)) return '\n'.join(msg), healthy
def check_userfree(self, uid): """Check if a username is free. If username is already used or is an LDAP group, an RBFatalError is raised. If the username is in the additional reserved LDAP tree, an RBWarningError is raised and checked if it is to be overridden. """ res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'uid=%s' % uid) if res: raise RBFatalError( "Username '%s' is already taken by %s account (%s)" % (uid, res[0][1]['objectClass'][0].decode(), res[0][1]['cn'][0].decode())) res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL, 'cn=%s' % uid) if res: raise RBFatalError("Username '%s' is reserved (LDAP Group)" % uid) res = self.ldap.search_s(rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL, 'uid=%s' % uid) if res: self.rberror( RBWarningError("Username '%s' is reserved (%s)" % (uid, res[0][ 1]['description'][0].decode())))
def list_pre_sync(self): """Return dictionary of all users for useradm pre_sync() dump.""" res = self.ldap.search_s( rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'objectClass=posixAccount', ('uid', 'homeDirectory', 'objectClass')) tmp = {} for data in res: for i in data['objectClass']: i = i.decode() if i in rbconfig.usertypes: break else: raise RBFatalError( "Unknown usertype for user '%s'" % data['uid'][0]) tmp[data['uid'][0]] = { 'homeDirectory': data['homeDirectory'][0], 'usertype': data['uid'][0] } return tmp
def uidNumber_findmax(self): """Return highest uidNumber found in LDAP accounts tree. This is only used to create the uidNumber file, the uidNumber_readnext() function should be used for getting the next available uidNumber.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'objectClass=posixAccount', ('uidNumber', )) maxuid = -1 for i in res: tmp = int(i[1]['uidNumber'][0]) if tmp > maxuid: maxuid = tmp return maxuid
def _search(self, filterstr, attrlist=None): """ A wrapper for the `LDAPObject.search_s` functionality. Perform an LDAP search operation, starting at the configured base DN. The filterstr argument is a string representation of the filter to apply in the search. The retrieved attributes can be limited with the attrlist parameter. If attrlist is None, all the attributes of each entry are returned. """ with self._ldap_connection() as ldap_cxn: results = ldap_cxn.search_s(self.base_dn, ldap.SCOPE_ONELEVEL, filterstr, attrlist) return results
def _count_hbac_rules(self): self._log.debug('Counting HBAC rules...') results = self._search( 'cn=hbac,%s' % self._base_dn, '(ipaUniqueID=*)', scope=ldap.SCOPE_ONELEVEL ) return len(results)
def _count_sudo_rules(self): self._log.debug('Counting SUDO rules...') results = self._search( 'cn=sudorules,cn=sudo,%s' % self._base_dn, '(ipaUniqueID=*)', scope=ldap.SCOPE_ONELEVEL ) return len(results)
def _count_dns_zones(self): self._log.debug('Counting DNS zones...') results = self._search( 'cn=dns,%s' % self._base_dn, '(|(objectClass=idnszone)(objectClass=idnsforwardzone))', scope=ldap.SCOPE_ONELEVEL ) return len(results)
def _count_certificates(self): self._log.debug('Counting certificates...') try: results = self._search( 'ou=certificateRepository,ou=ca,o=ipaca', '(certStatus=*)', scope=ldap.SCOPE_ONELEVEL ) except ldap.NO_SUCH_OBJECT: return 'N/A' n = len(results) return n
def check_user_byname(self, uid): """Raise RBFatalError if given username does not exist in user database.""" if not self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'uid=%s' % uid): raise RBFatalError("User '%s' does not exist" % uid)
def check_user_byid(self, user_id): """Raise RBFatalError if given id does not belong to a user in user database.""" if not self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'id=%s' % user_id): raise RBFatalError("User with id '%s' does not exist" % user_id)
def check_group_byname(self, group): """Raise RBFatalError if given group does not exist in group database.""" if not self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL, 'cn=%s' % group): raise RBFatalError("Group '%s' does not exist" % group)
def check_group_byid(self, gid): """Raise RBFatalError if given id does not belong to a group in group database.""" if not self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL, 'gidNumber=%s' % gid): raise RBFatalError("Group with id '%s' does not exist" % gid) # ------------------------------------------------------------------- # # INFORMATION RETRIEVAL METHODS # # ------------------------------------------------------------------- # # fixme still needed ? # def get_usertype_byname(self, uid): # """Return usertype for username in user database. Raise # RBFatalError if user does not exist.""" # res = self.ldap.search_s(rbconfig.ldap_accounts_tree, # ldap.SCOPE_ONELEVEL, 'uid=%s' % usr.uid, # ('objectClass', )) # if res: # for i in res[0][1]['objectClass']: # if i in rbconfig.usertypes: # return i # else: # raise RBFatalError("Unknown usertype for user '%s'" % uid) # else: # raise RBFatalError("User '%s' does not exist" % uid)
def get_user_byid(self, usr): """Populate RBUser object with data from user with given id in user database. Raise RBFatalError if user does not exist.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'id=%s' % usr.id) if res: self.set_user(usr, res[0]) else: raise RBFatalError("User with id '%s' does not exist" % usr.id)
def get_dummyid(self, usr): """Set usr.id to unique 'dummy' DCU ID number.""" raise RBFatalError('NOT YET IMPLEMENTED') res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, '(&(id>=10000000)(id<20000000))"' % (usr.uid)) if res: usr.id = int(res[0][1]['id'][0]) + 1 else: usr.id = 10000000
def get_gid_byname(self, group): """Get gid for given group name. Raise RBFatalError if given name does not belong to a group in group database.""" res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL, 'cn=%s' % group) if res: return int(res[0][1]['gidNumber'][0]) else: raise RBFatalError("Group '%s' does not exist" % group)
def get_group_byid(self, gid): """Get group name for given group ID. Raise RBFatalError if given id does not belong to a group in group database.""" res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL, 'gidNumber=%s' % gid) if res: return res[0][1]['cn'][0] else: raise RBFatalError("Group with id '%s' does not exist" % gid)
def list_paid_newbies(self): """Return list of all paid newbie usernames.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, '(&(yearsPaid>=1)(newbie=TRUE))', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_paid_non_newbies(self): """Return list of all paid renewal (non-newbie) usernames.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, '(&(yearsPaid>=1)(newbie=FALSE))', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_non_newbies(self): """Return list of all non newbie usernames.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'newbie=FALSE', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_newbies(self): """Return list of all newbie usernames.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'newbie=TRUE', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_groups(self): """Return list of all groups.""" res = self.ldap.search_s(rbconfig.ldap_group_tree, ldap.SCOPE_ONELEVEL, 'objectClass=posixGroup', ('cn', )) return [data['cn'][0] for dn, data in res]
def list_reserved_static(self): """Return list of all static reserved names.""" res = self.ldap.search_s( rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL, '(&(objectClass=reserved)(flag=static))', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_reserved_dynamic(self): """Return list of all dynamic reserved names.""" res = self.ldap.search_s( rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL, '(&(objectClass=reserved)(!(flag=static)))', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_unpaid(self): """Return list of all non-renewed users.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'yearsPaid<=0', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_unpaid_normal(self): """Return list of all normal non-renewed users.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'yearsPaid=0', ('uid', )) return [data['uid'][0] for dn, data in res]
def list_unpaid_grace(self): """Return list of all grace non-renewed users.""" res = self.ldap.search_s(rbconfig.ldap_accounts_tree, ldap.SCOPE_ONELEVEL, 'yearsPaid<=-1', ('uid', )) return [data['uid'][0] for dn, data in res]
def dict_reserved_desc(self): """Return dictionary of all reserved entries with their description.""" res = self.ldap.search_s(rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL, 'objectClass=reserved', ('uid', 'description')) tmp = {} for data in res: tmp[data['uid'][0]] = data['description'][0] return tmp
def dict_reserved_static(self): """Return dictionary of all static reserved entries with their description.""" res = self.ldap.search_s( rbconfig.ldap_reserved_tree, ldap.SCOPE_ONELEVEL, '(&(objectClass=reserved)(flag=static))', ('uid', 'description')) tmp = {} for data in res: tmp[data['uid'][0]] = data['description'][0] return tmp # -------------------------------- # # METHODS RETURNING SEARCH RESULTS # # -------------------------------- #
def execute(self): args = self.args # we can connect in 2 ways. By hostname/ip (and portnumber) # or by ldap-uri if "url" in args and ldapurl.isLDAPUrl(args["url"]): conn = ldap.initialize(args["url"]) else: ip, port = self.get_address() conn = ldap.initialize("ldap://%s:%s" % (ip, port)) username = args.get("username", "") password = args.get("password", "") conn.simple_bind(username, password) try: self._set_version(args, conn) except ValueError: return Event.DOWN, "unsupported protocol version" base = args.get("base", "dc=example,dc=org") if base == "cn=monitor": my_res = conn.search_st(base, ldap.SCOPE_BASE, timeout=self.timeout) versionstr = str(my_res[0][-1]['description'][0]) self.version = versionstr return Event.UP, versionstr scope = args.get("scope", "SUBTREE").upper() if scope == "BASE": scope = ldap.SCOPE_BASE elif scope == "ONELEVEL": scope = ldap.SCOPE_ONELEVEL else: scope = ldap.SCOPE_SUBTREE filtr = args.get("filter", "objectClass=*") try: conn.search_ext_s(base, scope, filterstr=filtr, timeout=self.timeout) # pylint: disable=W0703 except Exception as err: return (Event.DOWN, "Failed ldapSearch on %s for %s: %s" % ( self.get_address(), filtr, str(err))) conn.unbind() return Event.UP, "Ok"