我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用ldap.SCOPE_BASE。
def exact(self): try: results = self.connection.search_s( self.dn, ldap.SCOPE_BASE, attrlist=[self.name]) except ldap.LDAPError: e = get_exception() self.module.fail_json( msg="Cannot search for attribute %s" % self.name, details=str(e)) current = results[0][1].get(self.name, []) modlist = [] if frozenset(self.values) != frozenset(current): if len(current) == 0: modlist = [(ldap.MOD_ADD, self.name, self.values)] elif len(self.values) == 0: modlist = [(ldap.MOD_DELETE, self.name, None)] else: modlist = [(ldap.MOD_REPLACE, self.name, self.values)] return modlist
def read_subschemasubentry_s(self,subschemasubentry_dn,attrs=None): """ Returns the sub schema sub entry's data """ attrs = attrs or SCHEMA_ATTRS try: r = self.search_s( subschemasubentry_dn,ldap.SCOPE_BASE, '(objectClass=subschema)', attrs ) except ldap.NO_SUCH_OBJECT: return None else: if r: return r[0][1] else: return None
def read_s(self,dn,filterstr=None,attrlist=None,serverctrls=None,clientctrls=None,timeout=-1): """ Reads and returns a single entry specified by `dn'. Other attributes just like those passed to `search_ext_s()' """ r = self.search_ext_s( dn, ldap.SCOPE_BASE, filterstr or '(objectClass=*)', attrlist=attrlist, serverctrls=serverctrls, clientctrls=clientctrls, timeout=timeout, ) if r: return r[0][1] else: return None
def get_real_name(self): """ Attempt to retrieve the LDAP Common Name of the given login name. """ encoding = _config.get('ldap', 'encoding') user_dn = self.get_user_dn().encode(encoding) name_attr = _config.get('ldap', 'name_attr') try: res = self.ldap.search_s(user_dn, ldap.SCOPE_BASE, '(objectClass=*)', [name_attr]) except ldap.LDAPError: _logger.exception("Caught exception while retrieving user name " "from LDAP, returning None as name") return None # Just look at the first result record, since we are searching for # a specific user record = res[0][1] name = record[name_attr][0] return name
def _anon_bind(self): self._log.debug('Checking for anonymous bind...') results = self._search( 'cn=config', '(objectClass=*)', ['nsslapd-allow-anonymous-access'], scope=ldap.SCOPE_BASE ) dn, attrs = results[0] state = attrs['nsslapd-allow-anonymous-access'][0].decode('utf-8') if state == 'on': return 'YES' elif state == 'off': return 'NO' elif state == 'rootdse': return 'ROOTDSE' else: return 'ERROR'
def getDefaultNamingContext(self): try: newCon = ldap.initialize('ldap://{}'.format(self.dc_ip)) newCon.simple_bind_s('','') res = newCon.search_s("", ldap.SCOPE_BASE, '(objectClass=*)') rootDSE = res[0][1] except ldap.LDAPError, e: print "[!] Error retrieving the root DSE" print "[!] {}".format(e) sys.exit(1) if not rootDSE.has_key('defaultNamingContext'): print "[!] No defaultNamingContext found!" sys.exit(1) defaultNamingContext = rootDSE['defaultNamingContext'][0] self.domainBase = defaultNamingContext newCon.unbind() return defaultNamingContext
def search_subschemasubentry_s(self,dn=''): """ Returns the distinguished name of the sub schema sub entry for a part of a DIT specified by dn. None as result indicates that the DN of the sub schema sub entry could not be determined. """ try: r = self.search_s( dn,ldap.SCOPE_BASE,'(objectClass=*)',['subschemaSubentry'] ) except (ldap.NO_SUCH_OBJECT,ldap.NO_SUCH_ATTRIBUTE,ldap.INSUFFICIENT_ACCESS): r = [] except ldap.UNDEFINED_TYPE: return None try: if r: e = ldap.cidict.cidict(r[0][1]) search_subschemasubentry_dn = e.get('subschemaSubentry',[None])[0] if search_subschemasubentry_dn is None: if dn: # Try to find sub schema sub entry in root DSE return self.search_subschemasubentry_s(dn='') else: # If dn was already root DSE we can return here return None else: return search_subschemasubentry_dn except IndexError: return None
def is_group_member(self, group_dn): """ Verify that uid is a member in the group object identified by group_dn, using the pre-initialized ldap object l. The full user DN will be attempted matched against the member attribute of the group object. If no match is found, the user uid will be attempted matched against the memberUid attribute. The former should work well for groupOfNames and groupOfUniqueNames objects, the latter should work for posixGroup objects. """ encoding = _config.get('ldap', 'encoding') group_search = _config.get('ldap', 'group_search').encode(encoding) user_dn = self.get_user_dn().encode(encoding) # Match groupOfNames/groupOfUniqueNames objects try: filterstr = group_search % escape_filter_chars(user_dn) result = self.ldap.search_s(group_dn, ldap.SCOPE_BASE, filterstr) _logger.debug("groupOfNames results: %s", result) if len(result) < 1: # If no match, match posixGroup objects filterstr = ( '(memberUid=%s)' % escape_filter_chars(self.username.encode(encoding))) result = self.ldap.search_s(group_dn, ldap.SCOPE_BASE, filterstr) _logger.debug("posixGroup results: %s", result) return len(result) > 0 except ldap.TIMEOUT as error: _logger.error("Timed out while veryfing group memberships") raise TimeoutError(error) # # Exception classes #
def parse_schema(self): """ Create ``self.schema['attributes']`` dictionary where values are a tuple holding the syntax oid and a boolean (true if the attribute is single valued). """ def get_attribute_syntax(attr_name): """ Get some information about an attributeType, directly or by a potential inheritance. :param attr_name: Name of the attribute :return: a tuple with (SYNTAX_OID, Boolean) where boolean is True if the attribute is single valued. """ attribute = schema.get_obj(ldap.schema.AttributeType, attr_name) if attribute.syntax is None: return get_attribute_syntax(attribute.sup[0]) return attribute.syntax, attribute.single_value self._schema['attributes'] = {} self._schema['objectClass'] = {} # TODO: base must be discovered from server (using subSchemaEntry) request = self.server.search_s(base='cn=schema', scope=ldap.SCOPE_BASE, attrlist=['+']) schema = ldap.schema.SubSchema(request[0][1]) for attr in schema.tree(ldap.schema.AttributeType): definition = schema.get_obj(ldap.schema.AttributeType, attr) if definition is not None: syntax = get_attribute_syntax(definition.names[0]) for attribute_name in definition.names: self._schema['attributes'][attribute_name] = (syntax[0], definition.single_value) self._schema['attributes']['memberOf'] = ('1.3.6.1.4.1.1466.115.121.1.12', False)
def _count_users(self, user_base): self._log.debug('Counting %s users...' % user_base) results = self._search( getattr(self, '_%s_user_base' % user_base), '(objectClass=*)', ['numSubordinates'], scope=ldap.SCOPE_BASE ) dn, attrs = results[0] return attrs['numSubordinates'][0].decode('utf-8')
def _count_hosts(self): self._log.debug('Counting hosts...') results = self._search( 'cn=computers,cn=accounts,%s' % self._base_dn, '(objectClass=*)', ['numSubordinates'], scope=ldap.SCOPE_BASE ) dn, attrs = results[0] return attrs['numSubordinates'][0].decode('utf-8')
def _count_hostgroups(self): self._log.debug('Counting host groups...') results = self._search( 'cn=hostgroups,cn=accounts,%s' % self._base_dn, '(objectClass=*)', ['numSubordinates'], scope=ldap.SCOPE_BASE ) dn, attrs = results[0] return attrs['numSubordinates'][0].decode('utf-8')
def search_ext(self,base,scope,filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1,sizelimit=0): """ search(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]]) -> int search_s(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]]) search_st(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,timeout=-1]]]]) search_ext(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]]) search_ext_s(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]]) Perform an LDAP search operation, with base as the DN of the entry at which to start the search, scope being one of SCOPE_BASE (to search the object itself), SCOPE_ONELEVEL (to search the object's immediate children), or SCOPE_SUBTREE (to search the object and all its descendants). filter is a string representation of the filter to apply in the search (see RFC 2254). Each result tuple is of the form (dn,entry), where dn is a string containing the DN (distinguished name) of the entry, and entry is a dictionary containing the attributes. Attributes types are used as string dictionary keys and attribute values are stored in a list as dictionary value. The DN in dn is extracted using the underlying ldap_get_dn(), which may raise an exception of the DN is malformed. If attrsonly is non-zero, the values of attrs will be meaningless (they are not transmitted in the result). The retrieved attributes can be limited with the attrlist parameter. If attrlist is None, all the attributes of each entry are returned. serverctrls=None clientctrls=None The synchronous form with timeout, search_st() or search_ext_s(), will block for at most timeout seconds (or indefinitely if timeout is negative). A TIMEOUT exception is raised if no result is received within the time. The amount of search results retrieved can be limited with the sizelimit parameter if non-zero. """ return self._ldap_call( self._l.search_ext, base,scope,filterstr, attrlist,attrsonly, EncodeControlTuples(serverctrls), EncodeControlTuples(clientctrls), timeout,sizelimit, )
def search_ext(self,base,scope,filterstr='(objectClass=*)',attrlist=None,attrsonly=0,serverctrls=None,clientctrls=None,timeout=-1,sizelimit=0): """ search(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]]) -> int search_s(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0]]]) search_st(base, scope [,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,timeout=-1]]]]) search_ext(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]]) search_ext_s(base,scope,[,filterstr='(objectClass=*)' [,attrlist=None [,attrsonly=0 [,serverctrls=None [,clientctrls=None [,timeout=-1 [,sizelimit=0]]]]]]]) Perform an LDAP search operation, with base as the DN of the entry at which to start the search, scope being one of SCOPE_BASE (to search the object itself), SCOPE_ONELEVEL (to search the object's immediate children), or SCOPE_SUBTREE (to search the object and all its descendants). filter is a string representation of the filter to apply in the search (see RFC 4515). Each result tuple is of the form (dn,entry), where dn is a string containing the DN (distinguished name) of the entry, and entry is a dictionary containing the attributes. Attributes types are used as string dictionary keys and attribute values are stored in a list as dictionary value. The DN in dn is extracted using the underlying ldap_get_dn(), which may raise an exception of the DN is malformed. If attrsonly is non-zero, the values of attrs will be meaningless (they are not transmitted in the result). The retrieved attributes can be limited with the attrlist parameter. If attrlist is None, all the attributes of each entry are returned. serverctrls=None clientctrls=None The synchronous form with timeout, search_st() or search_ext_s(), will block for at most timeout seconds (or indefinitely if timeout is negative). A TIMEOUT exception is raised if no result is received within the time. The amount of search results retrieved can be limited with the sizelimit parameter if non-zero. """ base, filterstr = self._unbytesify_values(base, filterstr) if attrlist is not None: attrlist = tuple(self._unbytesify_values(*attrlist)) return self._ldap_call( self._l.search_ext, base,scope,filterstr, attrlist,attrsonly, RequestControlTuples(serverctrls), RequestControlTuples(clientctrls), timeout,sizelimit, )
def execute(self): args = self.args # we can connect in 2 ways. By hostname/ip (and portnumber) # or by ldap-uri if "url" in args and ldapurl.isLDAPUrl(args["url"]): conn = ldap.initialize(args["url"]) else: ip, port = self.get_address() conn = ldap.initialize("ldap://%s:%s" % (ip, port)) username = args.get("username", "") password = args.get("password", "") conn.simple_bind(username, password) try: self._set_version(args, conn) except ValueError: return Event.DOWN, "unsupported protocol version" base = args.get("base", "dc=example,dc=org") if base == "cn=monitor": my_res = conn.search_st(base, ldap.SCOPE_BASE, timeout=self.timeout) versionstr = str(my_res[0][-1]['description'][0]) self.version = versionstr return Event.UP, versionstr scope = args.get("scope", "SUBTREE").upper() if scope == "BASE": scope = ldap.SCOPE_BASE elif scope == "ONELEVEL": scope = ldap.SCOPE_ONELEVEL else: scope = ldap.SCOPE_SUBTREE filtr = args.get("filter", "objectClass=*") try: conn.search_ext_s(base, scope, filterstr=filtr, timeout=self.timeout) # pylint: disable=W0703 except Exception as err: return (Event.DOWN, "Failed ldapSearch on %s for %s: %s" % ( self.get_address(), filtr, str(err))) conn.unbind() return Event.UP, "Ok"