Python os 模块,setegid() 实例源码

我们从Python开源项目中,提取了以下40个代码示例,用于说明如何使用os.setegid()

项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def callIntoPAM(service, user, conv):
    """A testing hook.
    """
    pam = PAM.pam()
    pam.start(service)
    pam.set_item(PAM.PAM_USER, user)
    pam.set_item(PAM.PAM_CONV, conv)
    gid = os.getegid()
    uid = os.geteuid()
    os.setegid(0)
    os.seteuid(0)
    try:
        pam.authenticate() # these will raise
        pam.acct_mgmt()
        return 1
    finally:
        os.setegid(gid)
        os.seteuid(uid)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def callIntoPAM(service, user, conv):
    """A testing hook.
    """
    pam = PAM.pam()
    pam.start(service)
    pam.set_item(PAM.PAM_USER, user)
    pam.set_item(PAM.PAM_CONV, conv)
    gid = os.getegid()
    uid = os.geteuid()
    os.setegid(0)
    os.seteuid(0)
    try:
        pam.authenticate() # these will raise
        pam.acct_mgmt()
        return 1
    finally:
        os.setegid(gid)
        os.seteuid(uid)
项目:darkc0de-old-stuff    作者:tuwid    | 项目源码 | 文件源码
def main():
    #change to data directory if needed
    os.chdir("/root/data")
    #redirect outputs to a logfile
    sys.stdout = sys.stderr = Log(open(LOGFILE, 'a+'))
    #ensure the that the daemon runs a normal user
    os.setegid(103)     #set group first "pydaemon"
    os.seteuid(103)     #set user "pydaemon"
    #start the user program here:
    USERPROG()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def switchUID(uid, gid, euid=False):
    if euid:
        setuid = os.seteuid
        setgid = os.setegid
    else:
        setuid = os.setuid
        setgid = os.setgid
    if gid is not None:
        setgid(gid)
    if uid is not None:
        initgroups(uid, gid)
        setuid(uid)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def requestAvatarId(self, credentials):
        if pwd:
            try:
                cryptedPass = pwd.getpwnam(credentials.username)[1]
            except KeyError:
                return defer.fail(UnauthorizedLogin())
            else:
                if cryptedPass not in ['*', 'x'] and \
                    verifyCryptedPassword(cryptedPass, credentials.password):
                    return defer.succeed(credentials.username)
        if shadow:
            gid = os.getegid()
            uid = os.geteuid()
            os.setegid(0)
            os.seteuid(0)
            try:
                shadowPass = shadow.getspnam(credentials.username)[1]
            except KeyError:
                os.setegid(gid)
                os.seteuid(uid)
                return defer.fail(UnauthorizedLogin())
            os.setegid(gid)
            os.seteuid(uid)
            if verifyCryptedPassword(shadowPass, credentials.password):
                return defer.succeed(credentials.username)
            return defer.fail(UnauthorizedLogin())

        return defer.fail(UnauthorizedLogin())
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def checkKey(self, credentials):
        sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
        if sshDir.startswith('~'): # didn't expand
            return 0
        uid, gid = os.geteuid(), os.getegid()
        ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
        os.setegid(0)
        os.seteuid(0)
        os.setegid(ogid)
        os.seteuid(ouid)
        for name in ['authorized_keys2', 'authorized_keys']:
            if not os.path.exists(sshDir+name):
                continue
            lines = open(sshDir+name).xreadlines()
            os.setegid(0)
            os.seteuid(0)
            os.setegid(gid)
            os.seteuid(uid)
            for l in lines:
                l2 = l.split()
                if len(l2) < 2:
                    continue
                try:
                    if base64.decodestring(l2[1]) == credentials.blob:
                        return 1
                except binascii.Error:
                    continue
        return 0
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i)>1 and i[1] or ()
                kw = len(i)>2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)
项目:skilled-hammer    作者:r00m    | 项目源码 | 文件源码
def pull(directory):
    """
    Pulls latest changes with the user rights that owns the folder
    """
    try:
        st = os.stat(directory)
        logger.info("Pulling as {0}:{1}...".format(st.st_uid, st.st_gid))

        # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
        os.setegid(st.st_uid)
        os.seteuid(st.st_gid)

        repo = git.Repo(directory)
        info = repo.remotes.origin.pull()[0]

        if info.flags & info.ERROR:
            logger.error("Pull failed: {0}".format(info.note))
            return False
        elif info.flags & info.REJECTED:
            logger.error("Could not merge after pull: {0}".format(info.note))
            return False
        elif info.flags & info.HEAD_UPTODATE:
            logger.info("Head is already up to date")
    except PermissionError:
        logger.error("Insufficient permissions to set uid/gid")
        return False
    finally:
        logger.info("Restoring root permissions")
        os.setegid(0)
        os.seteuid(0)

    return True
项目:skilled-hammer    作者:r00m    | 项目源码 | 文件源码
def run(project, command, directory, slack_webhook_url):
    """
    Run the specified command as the user that owns the directory
    """
    try:
        st = os.stat(directory)

        # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed
        os.setegid(st.st_uid)
        os.seteuid(st.st_gid)

        logger.info("Changing working directory to '{0}'".format(directory))
        logger.info("Spawning background command '{0}' as {1}:{2} for '{3}'...".format(command, st.st_uid, st.st_gid, project))

        def background():
            """
            I don't care how long it takes to run the command, but Bitbucket gets angry when it takes longer
            than 10 seconds. My npm build takes around 15 secs, so I'd get 3 Webhooks from Bitbucket, because
            it thinks each Webhook timedout.

            Easy way out is to return response immediately and start a background thread that
            does all of the heavy lifting.
            """
            start_time = time.time()
            output = subprocess.check_output(command, shell=True, cwd=directory, stderr=subprocess.STDOUT)
            completed_in = time.time() - start_time

            logger.info("'{0}' background command finished in {1:.2f} seconds".format(project, completed_in))

            if slack_webhook_url:
                slack_notification(slack_webhook_url, "Deployed `{0}` in {1:.2f} seconds! :rocket:".format(project, completed_in), output)

        Thread(target=background).start()
    except PermissionError:
        logger.error("Insufficient permissions to set uid/gid")
    except subprocess.CalledProcessError as e:
        logger.error("Error: {0}".format(e.output))
    finally:
        logger.info("Restoring root permissions")
        os.setegid(0)
        os.seteuid(0)
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_setegid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def setUp(self):
            safe_rmpath(TESTFN)
            TestProcess.setUp(self)
            os.setegid(1000)
            os.seteuid(1000)
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def tearDown(self):
            os.setegid(self.PROCESS_UID)
            os.seteuid(self.PROCESS_GID)
            TestProcess.tearDown(self)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_setegid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_setegid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def switchUID(uid, gid, euid=False):
    if euid:
        setuid = os.seteuid
        setgid = os.setegid
    else:
        setuid = os.setuid
        setgid = os.setgid
    if gid is not None:
        setgid(gid)
    if uid is not None:
        initgroups(uid, gid)
        setuid(uid)
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def requestAvatarId(self, credentials):
        if pwd:
            try:
                cryptedPass = pwd.getpwnam(credentials.username)[1]
            except KeyError:
                return defer.fail(UnauthorizedLogin())
            else:
                if cryptedPass not in ['*', 'x'] and \
                    verifyCryptedPassword(cryptedPass, credentials.password):
                    return defer.succeed(credentials.username)
        if shadow:
            gid = os.getegid()
            uid = os.geteuid()
            os.setegid(0)
            os.seteuid(0)
            try:
                shadowPass = shadow.getspnam(credentials.username)[1]
            except KeyError:
                os.setegid(gid)
                os.seteuid(uid)
                return defer.fail(UnauthorizedLogin())
            os.setegid(gid)
            os.seteuid(uid)
            if verifyCryptedPassword(shadowPass, credentials.password):
                return defer.succeed(credentials.username)
            return defer.fail(UnauthorizedLogin())

        return defer.fail(UnauthorizedLogin())
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def checkKey(self, credentials):
        sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username)
        if sshDir.startswith('~'): # didn't expand
            return 0
        uid, gid = os.geteuid(), os.getegid()
        ouid, ogid = pwd.getpwnam(credentials.username)[2:4]
        os.setegid(0)
        os.seteuid(0)
        os.setegid(ogid)
        os.seteuid(ouid)
        for name in ['authorized_keys2', 'authorized_keys']:
            if not os.path.exists(sshDir+name):
                continue
            lines = open(sshDir+name).xreadlines()
            os.setegid(0)
            os.seteuid(0)
            os.setegid(gid)
            os.seteuid(uid)
            for l in lines:
                l2 = l.split()
                if len(l2) < 2:
                    continue
                try:
                    if base64.decodestring(l2[1]) == credentials.blob:
                        return 1
                except binascii.Error:
                    continue
        return 0
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i)>1 and i[1] or ()
                kw = len(i)>2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_setegid(self):
                if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def test_setegid(self):
        if os.getuid() != 0:
            self.assertRaises(os.error, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_setegid(self):
        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
            self.assertRaises(OSError, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def test_setegid(self):
                if os.getuid() != 0:
                    self.assertRaises(os.error, os.setegid, 0)
                self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def impersonate_user(self, username, password):
            """Change process effective user/group ids to reflect
            logged in user.
            """
            try:
                pwdstruct = pwd.getpwnam(username)
            except KeyError:
                raise AuthorizerError(self.msg_no_such_user)
            else:
                os.setegid(pwdstruct.pw_gid)
                os.seteuid(pwdstruct.pw_uid)
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def terminate_impersonation(self, username):
            """Revert process effective user/group IDs."""
            os.setegid(PROCESS_GID)
            os.seteuid(PROCESS_UID)
项目:navdoon    作者:farzadghanei    | 项目源码 | 文件源码
def _change_process_user_group(self):
        # type: () -> None
        if self.user:
            self._log("changing process user to {}".format(self.user))
            os.seteuid(self.user)
        if self.group:
            self._log("changing process group to {}".format(self.group))
            os.setegid(self.group)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def runAsEffectiveUser(euid, egid, function, *args, **kwargs):
    """
    Run the given function wrapped with seteuid/setegid calls.

    This will try to minimize the number of seteuid/setegid calls, comparing
    current and wanted permissions

    @param euid: effective UID used to call the function.
    @type euid: C{int}

    @type egid: effective GID used to call the function.
    @param egid: C{int}

    @param function: the function run with the specific permission.
    @type function: any callable

    @param *args: arguments passed to C{function}
    @param **kwargs: keyword arguments passed to C{function}
    """
    uid, gid = os.geteuid(), os.getegid()
    if uid == euid and gid == egid:
        return function(*args, **kwargs)
    else:
        if uid != 0 and (uid != euid or gid != egid):
            os.seteuid(0)
        if gid != egid:
            os.setegid(egid)
        if euid != 0 and (euid != uid or gid != egid):
            os.seteuid(euid)
        try:
            return function(*args, **kwargs)
        finally:
            if euid != 0 and (uid != euid or gid != egid):
                os.seteuid(0)
            if gid != egid:
                os.setegid(gid)
            if uid != 0 and (uid != euid or gid != egid):
                os.seteuid(uid)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _runAsUser(self, f, *args, **kw):
        euid = os.geteuid()
        egid = os.getegid()
        groups = os.getgroups()
        uid, gid = self.getUserGroupId()
        os.setegid(0)
        os.seteuid(0)
        os.setgroups(self.getOtherGroups())
        os.setegid(gid)
        os.seteuid(uid)
        try:
            f = iter(f)
        except TypeError:
            f = [(f, args, kw)]
        try:
            for i in f:
                func = i[0]
                args = len(i) > 1 and i[1] or ()
                kw = len(i) > 2 and i[2] or {}
                r = func(*args, **kw)
        finally:
            os.setegid(0)
            os.seteuid(0)
            os.setgroups(groups)
            os.setegid(egid)
            os.seteuid(euid)
        return r
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def getPtyOwnership(self):
        ttyGid = os.stat(self.ptyTuple[2])[5]
        uid, gid = self.avatar.getUserGroupId()
        euid, egid = os.geteuid(), os.getegid()
        os.setegid(0)
        os.seteuid(0)
        try:
            os.chown(self.ptyTuple[2], uid, ttyGid)
        finally:
            os.setegid(egid)
            os.seteuid(euid)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def setegid(self, egid):
        """
        Mock C{os.setegid}, store result.
        """
        self.setegidCalls.append(egid)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def tearDown(self):
            os.setegid(self.PROCESS_UID)
            os.seteuid(self.PROCESS_GID)
            TestProcess.tearDown(self)
项目:smart-realestate    作者:stevensshi    | 项目源码 | 文件源码
def tor_new_process():
    """
    Drops privileges to TOR_USER user and start a new Tor process
    """
    debian_tor_uid = getpwnam(TOR_USER).pw_uid
    debian_tor_gid = getpwnam(TOR_USER).pw_gid
    os.setgid(debian_tor_gid)
    os.setuid(debian_tor_uid)
    os.setegid(debian_tor_gid)
    os.seteuid(debian_tor_uid)
    os.environ['HOME'] = "/var/lib/tor"

    tor_process = stem.process.launch_tor_with_config(
      config = {
        'SocksPort': '6666',
        'ControlPort': '6969',
        'DNSPort': '9053',
        'DNSListenAddress': '127.0.0.1',
        'AutomapHostsOnResolve': '1',
        'AutomapHostsSuffixes': '.exit,.onion',
        'VirtualAddrNetwork': '10.192.0.0/10',
        'TransPort': '9040',
        'TransListenAddress': '127.0.0.1',
        'AvoidDiskWrites': '1',
        'WarnUnsafeSocks': '1',
      })
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def setUp(self):
            safe_rmpath(TESTFN)
            TestProcess.setUp(self)
            os.setegid(1000)
            os.seteuid(1000)
项目:FancyWord    作者:EastonLee    | 项目源码 | 文件源码
def tearDown(self):
            os.setegid(self.PROCESS_UID)
            os.seteuid(self.PROCESS_GID)
            TestProcess.tearDown(self)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_setegid(self):
        if os.getuid() != 0 and not HAVE_WHEEL_GROUP:
            self.assertRaises(OSError, os.setegid, 0)
        self.assertRaises(OverflowError, os.setegid, 1<<32)
项目:ZServer    作者:zopefoundation    | 项目源码 | 文件源码
def become_persona(self):
            if self.persona is not (None, None):
                uid, gid = self.persona
                # the order of these is important!
                os.setegid(gid)
                os.seteuid(uid)
项目:ZServer    作者:zopefoundation    | 项目源码 | 文件源码
def become_nobody(self):
            if self.persona is not (None, None):
                os.seteuid(self.PROCESS_UID)
                os.setegid(self.PROCESS_GID)

        # cwd, cdup, open, listdir
项目:procszoo    作者:procszoo    | 项目源码 | 文件源码
def change_users_and_groups(mamaji_data):
    current_users = mamaji_data['current_users']
    current_groups = mamaji_data['current_groups']
    pending_users = mamaji_data['pending_users']
    pending_groups = mamaji_data['pending_groups']
    groups = mamaji_data['supplementary_groups']

    if groups:
        os.setgroups(groups)

    group_types = [k for k in ['rgid', 'egid', 'sgid']
                      if pending_groups[k] is not None]
    group_types_len = len(group_types)
    if group_types_len == 3:
        setresgid(pending_groups['rgid'], pending_groups['egid'],
                      pending_groups['sgid'])
    elif group_types_len == 2:
        if 'rgid' in group_types and 'egid' in group_types:
            os.setregid(pending_groups['rgid'], pending_groups['egid'])
    elif group_types_len == 1:
        if 'egid' in group_types:
            os.setegid(pending_groups['egid'])

    user_types = [k for k in ['ruid', 'euid', 'suid']
                      if pending_users[k] is not None]
    user_types_len = len(user_types)
    if user_types_len == 3:
        setresuid(pending_users['ruid'], pending_users['euid'],
                      pending_users['suid'])
    elif user_types_len == 2:
        if 'ruid' in user_types and 'euid' in user_types:
            os.setreuid(pending_users['ruid'], pending_users['euid'])
    elif user_types_len == 1:
        if 'euid' in user_types:
            os.seteuid(pending_users['euid'])


    if pending_groups['gid'] is not None:
        os.setgid(pending_groups['gid'])

    if pending_users['uid'] is not None:
        os.setuid(pending_users['uid'])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def switchUID(uid, gid, euid=False):
    """
    Attempts to switch the uid/euid and gid/egid for the current process.

    If C{uid} is the same value as L{os.getuid} (or L{os.geteuid}),
    this function will issue a L{UserWarning} and not raise an exception.

    @type uid: C{int} or L{None}
    @param uid: the UID (or EUID) to switch the current process to. This
                parameter will be ignored if the value is L{None}.

    @type gid: C{int} or L{None}
    @param gid: the GID (or EGID) to switch the current process to. This
                parameter will be ignored if the value is L{None}.

    @type euid: C{bool}
    @param euid: if True, set only effective user-id rather than real user-id.
                 (This option has no effect unless the process is running
                 as root, in which case it means not to shed all
                 privileges, retaining the option to regain privileges
                 in cases such as spawning processes. Use with caution.)
    """
    if euid:
        setuid = os.seteuid
        setgid = os.setegid
        getuid = os.geteuid
    else:
        setuid = os.setuid
        setgid = os.setgid
        getuid = os.getuid
    if gid is not None:
        setgid(gid)
    if uid is not None:
        if uid == getuid():
            uidText = (euid and "euid" or "uid")
            actionText = "tried to drop privileges and set%s %s" % (uidText, uid)
            problemText = "%s is already %s" % (uidText, getuid())
            warnings.warn("%s but %s; should we be root? Continuing."
                          % (actionText, problemText))
        else:
            initgroups(uid, gid)
            setuid(uid)