我们从Python开源项目中,提取了以下8个代码示例,用于说明如何使用ldap.set_option()。
def _ldap_connection(self): """ Context manager for ldap connections """ if self.no_verify: ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) ldap_cxn = ldap.initialize('{0}'.format(self.uri)) ldap_cxn.protocol_version = 3 ldap_cxn.set_option(ldap.OPT_REFERRALS, 0) if self.tls and not self.uri.startswith('ldaps'): ldap_cxn.start_tls_s() yield ldap_cxn
def _connect_to_ldap(self): ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) connection = ldap.initialize(self.server_uri) if self.start_tls: try: connection.start_tls_s() except ldap.LDAPError: e = get_exception() self.module.fail_json(msg="Cannot start TLS.", details=str(e)) try: if self.bind_dn is not None: connection.simple_bind_s(self.bind_dn, self.bind_pw) else: connection.sasl_interactive_bind_s('', ldap.sasl.external()) except ldap.LDAPError: e = get_exception() self.module.fail_json( msg="Cannot bind to the server.", details=str(e)) return connection
def initiate_ldap(): """ contact the LDAP server to return a LDAP object """ ldap_schemes = ['ldap://', 'ldaps://'] ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0) ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir')) ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile')) ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile')) ldap.set_option(ldap.OPT_X_TLS_DEMAND, True) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) # TRY, NEVER, DEMAND ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0) for scheme in ldap_schemes: ldap_url = scheme + server_url ldap_obj = ldap.initialize(ldap_url) try: ldap_obj.start_tls_s() except ldap.OPERATIONS_ERROR as e: e_msg = e[0]['info'] if e_msg == 'TLS already started': pass else: raise except ldap.SERVER_DOWN: if scheme is not ldap_schemes[-1]: continue else: raise if login_dn != 'DEFAULT': # Use anonymous bind if login_dn is set as DEFAULT ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE) else: try: ldap_obj.whoami_s() except ldap.UNWILLING_TO_PERFORM: print 'Anonymous binding is disabled by server' raise SystemExit return ldap_obj break
def ldap_auth(self, username, password): ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.cert_path) connection = ldap.initialize(self.ldap_url) connection.set_option(ldap.OPT_REFERRALS, 0) try: if password: connection.simple_bind_s(username + self.user_suffix, password) else: return False except ldap.INVALID_CREDENTIALS: return False except ldap.SERVER_DOWN: return None return True
def __init__(self, backend, mode=PLAIN, cert=None, key=None, cacertdir='/etc/ssl/certs', ): self.backend = backend self._server = None self._schema = {} self._cert = cert self._key = key logger.debug("LDAP _session created, id: {}".format(id(self))) # Switch to LDAPS mode if ldaps is backend start with 'ldaps' if 'ldaps' == backend[:5].lower(): mode = self.LDAPS # Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None: ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) if cacertdir is None: warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) # Set client certificate if both cert and key are provided if cert is not None and key is not None: if not os.path.isfile(cert): raise LDAPSessionException("Certificate file {} does not exist".format(cert)) if not os.path.isfile(key): raise LDAPSessionException("Certificate key file {} does not exist".format(cert)) ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert) ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key) self._server = ldap.initialize(self.backend, bytes_mode=False) # Proceed STARTTLS if mode == self.STARTTLS: self._server.start_tls_s()
def connect(self, uri=rbconfig.LDAP_URI, dn=rbconfig.LDAP_ROOT_DN, password=None, dcu_uri=rbconfig.LDAP_DCU_URI, dcu_dn=rbconfig.LDAP_DCU_RBDN, dcu_pw=None): """Connect to databases. Custom URI, DN and password may be given for RedBrick LDAP. Password if not given will be read from shared secret file set in rbconfig. Custom URI may be given for DCU LDAP. """ if not password: try: pw_file = open(rbconfig.LDAP_ROOTPW_FILE, 'r') password = pw_file.readline().rstrip() except IOError: raise RBFatalError("Unable to open LDAP root password file") pw_file.close() if not dcu_pw: try: pw_file = open(rbconfig.LDAP_DCU_RBPW, 'r') dcu_pw = pw_file.readline().rstrip() except IOError: raise RBFatalError("Unable to open DCU AD root password file") pw_file.close() # Default protocol seems to be 2, set to 3. ldap.set_option(ldap.OPT_PROTOCOL_VERSION, 3) # Connect to RedBrick LDAP. self.ldap = ldap.initialize(uri) self.ldap.simple_bind_s(dn, password) # Connect to DCU LDAP (anonymous bind). self.ldap_dcu = ldap.initialize(dcu_uri) # self.ldap_dcu.simple_bind_s('', '') self.ldap_dcu.simple_bind_s(dcu_dn, dcu_pw)
def ldap_search(self, filter, attributes, incremental, incremental_filter): """ Query the configured LDAP server with the provided search filter and attribute list. """ for uri in self.conf_LDAP_SYNC_BIND_URI: #Read record of this uri if (self.working_uri == uri): adldap_sync = self.working_adldap_sync created = False else: adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri) if ((adldap_sync.syncs_to_full > 0) and incremental): filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT)) logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use) else: filter_to_use = filter ldap.set_option(ldap.OPT_REFERRALS, 0) #ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) l = PagedLDAPObject(uri) l.protocol_version = 3 if (uri.startswith('ldaps:')): l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) l.set_option(ldap.OPT_X_TLS_DEMAND, True) else: l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER) l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) l.set_option(ldap.OPT_X_TLS_DEMAND, False) try: l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS) except ldap.LDAPError as e: logger.error("Error connecting to LDAP server %s : %s" % (uri, e)) continue results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None) l.unbind_s() if (self.working_uri is None): self.working_uri = uri self.conf_LDAP_SYNC_BIND_URI.insert(0, uri) self.working_adldap_sync = adldap_sync return (uri, results) # Return both the LDAP server URI used and the request. This is for incremental sync purposes #if not connected correctly, raise error raise
def open_ldap(): """ Returns a freshly made LDAP object, according to the settings configured in webfront.conf. """ # Get config settings server = _config.get('ldap', 'server') port = _config.getint('ldap', 'port') encryption = _config.get('ldap', 'encryption').lower() timeout = _config.getfloat('ldap', 'timeout') # Revert to no encryption if none of the valid settings are found if encryption not in ('ssl', 'tls', 'none'): _logger.warning('Unknown encryption setting %r in config file, ' 'using no encryption instead', _config.get('ldap', 'encryption')) encryption = 'none' # Debug tracing from python-ldap/openldap to stderr if _config.getboolean('ldap', 'debug'): ldap.set_option(ldap.OPT_DEBUG_LEVEL, 255) # Use STARTTLS if enabled, then fail miserably if the server # does not support it if encryption == 'tls': _logger.debug("Using STARTTLS for ldap connection") lconn = ldap.open(server, port) lconn.timeout = timeout try: lconn.start_tls_s() except ldap.PROTOCOL_ERROR: _logger.error('LDAP server %s does not support the STARTTLS ' 'extension. Aborting.', server) raise NoStartTlsError(server) except (ldap.SERVER_DOWN, ldap.CONNECT_ERROR): _logger.exception("LDAP server is down") raise NoAnswerError(server) else: scheme = encryption == 'ssl' and 'ldaps' or 'ldap' uri = '%s://%s:%s' % (scheme, server, port) lconn = ldap.initialize(uri) lconn.timeout = timeout return lconn