def initiate_ldap(): """ contact the LDAP server to return a LDAP object """ ldap_schemes = ['ldap://', 'ldaps://'] ldap.set_option(ldap.OPT_DEBUG_LEVEL, 0) ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, config.get('ldap', 'cacertdir')) ldap.set_option(ldap.OPT_X_TLS_CERTFILE, config.get('ldap', 'certfile')) ldap.set_option(ldap.OPT_X_TLS_KEYFILE, config.get('ldap', 'keyfile')) ldap.set_option(ldap.OPT_X_TLS_DEMAND, True) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) # TRY, NEVER, DEMAND ldap.set_option(ldap.OPT_X_TLS_NEWCTX, 0) for scheme in ldap_schemes: ldap_url = scheme + server_url ldap_obj = ldap.initialize(ldap_url) try: ldap_obj.start_tls_s() except ldap.OPERATIONS_ERROR as e: e_msg = e[0]['info'] if e_msg == 'TLS already started': pass else: raise except ldap.SERVER_DOWN: if scheme is not ldap_schemes[-1]: continue else: raise if login_dn != 'DEFAULT': # Use anonymous bind if login_dn is set as DEFAULT ldap_obj.bind(login_dn, password, ldap.AUTH_SIMPLE) else: try: ldap_obj.whoami_s() except ldap.UNWILLING_TO_PERFORM: print 'Anonymous binding is disabled by server' raise SystemExit return ldap_obj break
def __init__(self, backend, mode=PLAIN, cert=None, key=None, cacertdir='/etc/ssl/certs', ): self.backend = backend self._server = None self._schema = {} self._cert = cert self._key = key logger.debug("LDAP _session created, id: {}".format(id(self))) # Switch to LDAPS mode if ldaps is backend start with 'ldaps' if 'ldaps' == backend[:5].lower(): mode = self.LDAPS # Set CACERTDIR and REQUIRED_CERT to TLS_DEMAND (validation required) if needed if mode in (self.STARTTLS, self.LDAPS) and cacertdir is not None: ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, cacertdir) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) if cacertdir is None: warnings.warn("You are in INSECURE mode", ImportWarning, stacklevel=2) ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) # Set client certificate if both cert and key are provided if cert is not None and key is not None: if not os.path.isfile(cert): raise LDAPSessionException("Certificate file {} does not exist".format(cert)) if not os.path.isfile(key): raise LDAPSessionException("Certificate key file {} does not exist".format(cert)) ldap.set_option(ldap.OPT_X_TLS_CERTFILE, cert) ldap.set_option(ldap.OPT_X_TLS_KEYFILE, key) self._server = ldap.initialize(self.backend, bytes_mode=False) # Proceed STARTTLS if mode == self.STARTTLS: self._server.start_tls_s()
def ldap_search(self, filter, attributes, incremental, incremental_filter): """ Query the configured LDAP server with the provided search filter and attribute list. """ for uri in self.conf_LDAP_SYNC_BIND_URI: #Read record of this uri if (self.working_uri == uri): adldap_sync = self.working_adldap_sync created = False else: adldap_sync, created = ADldap_Sync.objects.get_or_create(ldap_sync_uri=uri) if ((adldap_sync.syncs_to_full > 0) and incremental): filter_to_use = incremental_filter.replace('?', self.whenchanged.strftime(self.conf_LDAP_SYNC_INCREMENTAL_TIMESTAMPFORMAT)) logger.debug("Using an incremental search. Filter is:'%s'" % filter_to_use) else: filter_to_use = filter ldap.set_option(ldap.OPT_REFERRALS, 0) #ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) l = PagedLDAPObject(uri) l.protocol_version = 3 if (uri.startswith('ldaps:')): l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_DEMAND) l.set_option(ldap.OPT_X_TLS_DEMAND, True) else: l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_NEVER) l.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) l.set_option(ldap.OPT_X_TLS_DEMAND, False) try: l.simple_bind_s(self.conf_LDAP_SYNC_BIND_DN, self.conf_LDAP_SYNC_BIND_PASS) except ldap.LDAPError as e: logger.error("Error connecting to LDAP server %s : %s" % (uri, e)) continue results = l.paged_search_ext_s(self.conf_LDAP_SYNC_BIND_SEARCH, ldap.SCOPE_SUBTREE, filter_to_use, attrlist=attributes, serverctrls=None) l.unbind_s() if (self.working_uri is None): self.working_uri = uri self.conf_LDAP_SYNC_BIND_URI.insert(0, uri) self.working_adldap_sync = adldap_sync return (uri, results) # Return both the LDAP server URI used and the request. This is for incremental sync purposes #if not connected correctly, raise error raise