Python ldap 模块,MOD_REPLACE 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用ldap.MOD_REPLACE

项目:isam-ansible-roles    作者:IBM-Security    | 项目源码 | 文件源码
def exact(self):
        try:
            results = self.connection.search_s(
                self.dn, ldap.SCOPE_BASE, attrlist=[self.name])
        except ldap.LDAPError:
            e = get_exception()
            self.module.fail_json(
                msg="Cannot search for attribute %s" % self.name,
                details=str(e))

        current = results[0][1].get(self.name, [])
        modlist = []

        if frozenset(self.values) != frozenset(current):
            if len(current) == 0:
                modlist = [(ldap.MOD_ADD, self.name, self.values)]
            elif len(self.values) == 0:
                modlist = [(ldap.MOD_DELETE, self.name, None)]
            else:
                modlist = [(ldap.MOD_REPLACE, self.name, self.values)]

        return modlist
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def usr2ldap_renew(cls, usr):
        """Return a list of (type, attribute) pairs for given user.
        This list is used in LDAP modify queries for renewing."""

        tmp = [
            (ldap.MOD_REPLACE, 'newbie', usr.newbie and 'TRUE' or 'FALSE'),
            (ldap.MOD_REPLACE, 'cn', usr.cn),
            (ldap.MOD_REPLACE, 'altmail', usr.altmail),
            (ldap.MOD_REPLACE, 'updatedby', usr.updatedby),
            (ldap.MOD_REPLACE, 'updated', usr.updated),
        ]
        if usr.id is not None:
            tmp.append((ldap.MOD_REPLACE, 'id', str(usr.id)))
        if usr.course:
            tmp.append((ldap.MOD_REPLACE, 'course', usr.course))
        if usr.year is not None:
            tmp.append((ldap.MOD_REPLACE, 'year', usr.year))
        if usr.yearsPaid is not None:
            tmp.append((ldap.MOD_REPLACE, 'yearsPaid', str(usr.yearsPaid)))
        if usr.birthday:
            tmp.append((ldap.MOD_REPLACE, 'birthday', usr.birthday))
        return tmp
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def usr2ldap_update(cls, usr):
        """Return a list of (type, attribute) pairs for given user.
        This list is used in LDAP modify queries for updating."""

        tmp = [(ldap.MOD_REPLACE, 'newbie', usr.newbie and 'TRUE' or
                'FALSE'), (ldap.MOD_REPLACE, 'cn', usr.cn),
               (ldap.MOD_REPLACE, 'altmail',
                usr.altmail), (ldap.MOD_REPLACE, 'updatedby', usr.updatedby),
               (ldap.MOD_REPLACE, 'updated', usr.updated)]
        if usr.id is not None:
            tmp.append((ldap.MOD_REPLACE, 'id', str(usr.id)))
        if usr.course:
            tmp.append((ldap.MOD_REPLACE, 'course', usr.course))
        if usr.year is not None:
            tmp.append((ldap.MOD_REPLACE, 'year', usr.year))
        if usr.yearsPaid is not None:
            tmp.append((ldap.MOD_REPLACE, 'yearsPaid', str(usr.yearsPaid)))
        if usr.birthday:
            tmp.append((ldap.MOD_REPLACE, 'birthday', usr.birthday))
        return tmp
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def set_password(self, username, hashes):
        """
        Administratively set the user's password using the given hashes.
        """
        dn = 'uid={0},{1}'.format(username, self.base_dn)
        try:
            with self._ldap_connection() as ldap_cxn:
                ldap_cxn.simple_bind_s(self.bind_dn, self.bind_pw)

                mod_nt = (ldap.MOD_REPLACE, 'sambaNTPassword', hashes['sambaNTPassword'])
                mod_ssha = (ldap.MOD_REPLACE, 'userPassword', hashes['userPassword'])
                mod_list = [mod_nt, mod_ssha]

                ldap_cxn.modify_s(dn, mod_list)

        except ldap.INVALID_CREDENTIALS:
            self.bus.log('Invalid credentials for admin user: {0}'.format(self.bind_dn), 40)
            raise
        except ldap.INSUFFICIENT_ACCESS:
            self.bus.log('Insufficient access for admin user: {0}'.format(self.bind_dn), 40)
            raise
        except ldap.INVALID_DN_SYNTAX:
            self.bus.log('Invalid DN syntax in configuration: {0}'.format(self.base_dn), 40)
            raise
        except ldap.LDAPError as e:
            self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
                         level=40,
                         traceback=True)
            raise
项目:auth-tool    作者:luciddg    | 项目源码 | 文件源码
def change_password(self, username, oldpassword, hashes):
        """
        Change the user's password using their own credentials.
        """
        dn = 'uid={0},{1}'.format(username, self.base_dn)

        try:
            with self._ldap_connection() as ldap_cxn:
                ldap_cxn.simple_bind_s(dn, oldpassword)

                # don't use LDAPObject.passwd_s() here to make use of
                # ldap's atomic operations.  IOW, don't change one password
                # but not the other.
                mod_nt = (ldap.MOD_REPLACE, 'sambaNTPassword', hashes['sambaNTPassword'])
                mod_ssha = (ldap.MOD_REPLACE, 'userPassword', hashes['userPassword'])
                mod_list = [mod_nt, mod_ssha]
                ldap_cxn.modify_s(dn, mod_list)

        except ldap.INVALID_CREDENTIALS:
            raise
        except ldap.INVALID_DN_SYNTAX:
            self.bus.log('Invalid DN syntax in configuration: {0}'.format(self.base_dn), 40)
            raise
        except ldap.LDAPError as e:
            self.bus.log('LDAP Error: {0}'.format(e.message['desc'] if 'desc' in e.message else str(e)),
                         level=40,
                         traceback=True)
            raise
项目:umanager    作者:lcm-unimi    | 项目源码 | 文件源码
def changeshadowexpire(self, username, shexp):
        if (not self.userexistsbyuid(username)):
            print("User %s does not exist!", username)
            return

        dn = "uid="+username+",ou=People,"+self.dc
        ldif = [( ldap.MOD_REPLACE, 'shadowExpire', shexp )]
        try:
            self.conn.modify_s(dn, ldif)
        except ldap.LDAPError as e:
            print("Error: Can\'t change %s shadowExpire: %s" % (username, e.message['desc']))
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def verify_token(self, uid, query_args):
            code = query_args.get('code', None)
            state = query_args.get('state', None)
            user = self.ldaptools.getuser(uid)

            if code and state:
                state_key = self.config["statekey"]
                if state_key == state:
                    r = self.get_reddit_client(self.config["redirect_base"] + url_for('reddit_loop'))
                    access_info = r.get_access_information(code)
                    auth_reddit = r.get_me()
                    if 'redditAccount' in user.objectClass:
                        if hasattr(user, 'redditName') and hasattr(user, 'redditToken'):
                            from ldap import MOD_REPLACE
                            self.ldaptools.updateattrs(uid, MOD_REPLACE, {
                                'redditName': auth_reddit.name,
                                'redditToken': access_info['access_token']
                                })
                        else:
                            # Something went horribly wrong.
                            return False
                    else:
                        from ldap import MOD_ADD
                        self.ldaptools.updateattrs(uid, MOD_ADD, {
                            'objectClass': 'redditAccount',
                            'redditName': auth_reddit.name,
                            'redditToken': access_info['access_token']
                            })

                    return True

            return False
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def recovery_update_account():
    try:
        email = request.form["email"]
        result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"]))
        assert(result)
        result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email)
        assert(result)
        flash("Account updated", "success")
    except Exception:
        flash("Update failed", "danger")
    app.logger.info('User account {0} infos changed'.format(current_user.get_id()))
    return redirect("/account")
项目:pizza-auth    作者:xxpizzaxx    | 项目源码 | 文件源码
def update_account():
    email = request.form["email"]
    oldpassword = request.form["oldpassword"]
    api_id = request.form["api_id"]
    api_key = request.form["api_key"]
    update_needed = False
    if api_id != current_user.keyID[0] or api_key != current_user.vCode[0]:
        update_needed = True
    if not ldaptools.check_credentials(current_user.get_id(), oldpassword):
        flash("You must confirm your old password to update your account.", "danger")
        return redirect("/account")
    try:
        if all(x in request.form for x in ["password", "password_confirm", "oldpassword"]):
            if request.form["password"] != request.form["password_confirm"]:
                flash("Password confirmation mismatch.", "danger")
                return redirect("/account")
            result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "userPassword", ldaptools.makeSecret(request.form["password"]))
            assert(result)
        result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "email", email)
        assert(result)
        if "api_id" in request.form:
            result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "keyID", api_id)
            assert(result)
            result = ldaptools.modattr(current_user.get_id(), MOD_REPLACE, "vCode", api_key)
            assert(result)
        flash("Account updated", "success")
    except Exception:
        flash("Update failed", "danger")
    if update_needed is True:
        update_characters([current_user.get_id()])
    app.logger.info('User account {0} infos changed'.format(current_user.get_id()))
    return redirect("/account")
项目:adminset    作者:guohongze    | 项目源码 | 文件源码
def ldap_update_pass(self,uid=None,oldpass=None,newpass=None):
        modify_entry = [(ldap.MOD_REPLACE,'userpassword',newpass)]
        obj = self.ldapconn
        target_cn = self.ldap_search_dn(uid)
        try:
            obj.simple_bind_s(target_cn,oldpass)
            obj.passwd_s(target_cn,oldpass,newpass)
            return True
        except ldap.LDAPError,e:
            return False
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def update_soa(self):
        mlist = [(ldap.MOD_REPLACE, 'sOARecord', self._soa())]
        self.lobj.modify_s(self.dn, mlist)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def modify_address(self, name, address):
        names = self.ldap_tuple[1]['associatedDomain']
        if not names:
            raise exception.NotFound()
        if len(names) == 1:
            self.lobj.modify_s(self.dn, [(ldap.MOD_REPLACE, 'aRecord',
                                         [utils.utf8(address)])])
        else:
            self.remove_name(name)
            self.parent.add_entry(name, address)
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def set_passwd(self, usr):
        """Set password for given user from the plaintext password
        in usr.passwd."""

        usr.userPassword = self.userPassword(usr.passwd)
        self.wrapper(self.ldap.modify_s,
                     self.uid2dn(usr.uid), ((ldap.MOD_REPLACE, 'userPassword',
                                             usr.userPassword), ))
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def set_shell(self, usr):
        """Set shell for given user."""

        self.wrapper(self.ldap.modify_s,
                     self.uid2dn(usr.uid), ((ldap.MOD_REPLACE, 'loginShell',
                                             usr.loginShell), ))
项目:useradm    作者:redbrick    | 项目源码 | 文件源码
def usr2ldap_rename(cls, usr):
        """Return a list of (type, attribute) pairs for given user.
        This list is used in LDAP modify queries for renaming."""

        return ((ldap.MOD_REPLACE, 'homeDirectory', usr.homeDirectory),
                (ldap.MOD_REPLACE, 'updatedby',
                 usr.updatedby), (ldap.MOD_REPLACE, 'updated', usr.updated))