我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.seteuid()。
def callIntoPAM(service, user, conv): """A testing hook. """ pam = PAM.pam() pam.start(service) pam.set_item(PAM.PAM_USER, user) pam.set_item(PAM.PAM_CONV, conv) gid = os.getegid() uid = os.geteuid() os.setegid(0) os.seteuid(0) try: pam.authenticate() # these will raise pam.acct_mgmt() return 1 finally: os.setegid(gid) os.seteuid(uid)
def create_user(no_utf8=False): curr_user = os.geteuid(); target = pwd.getpwnam('postgres') try: os.seteuid(target.pw_uid) m = hashlib.md5() m.update(DB_PASSWORD) m.update(DB_USER) command = """CREATE USER %s PASSWORD 'md5%s'""" % (DB_USER, m.hexdigest()) subprocess.check_call(['psql', '-c', command]) if no_utf8: command = """CREATE DATABASE %s OWNER %s""" % (DB_NAME, DB_USER) else: command = """CREATE DATABASE %s OWNER %s ENCODING 'UTF8'""" % (DB_NAME, DB_USER) subprocess.check_call(['psql', '-c', command]) os.seteuid(curr_user) except Exception, e: raise SystemExit(str(e)) return target
def test_interrupted_systemcall(self): ''' Make sure interrupted system calls don't break the world, since we can't control what all signals our connection thread will get ''' if 'linux' not in platform: raise SkipTest('Unable to reproduce error case on' ' non-linux platforms') path = 'interrupt_test' value = b"1" self.client.create(path, value) # set the euid to the current process' euid. # glibc sends SIGRT to all children, which will interrupt the # system call os.seteuid(os.geteuid()) # basic sanity test that it worked alright assert self.client.get(path)[0] == value
def test_getPrivateKeysAsRoot(self): """ L{OpenSSHFactory.getPrivateKeys} should switch to root if the keys aren't readable by the current user. """ keyFile = self.keysDir.child("ssh_host_two_key") # Fake permission error by changing the mode keyFile.chmod(0000) self.addCleanup(keyFile.chmod, 0o777) # And restore the right mode when seteuid is called savedSeteuid = os.seteuid def seteuid(euid): keyFile.chmod(0o777) return savedSeteuid(euid) self.patch(os, "seteuid", seteuid) keys = self.factory.getPrivateKeys() self.assertEqual(len(keys), 2) keyTypes = keys.keys() self.assertEqual(set(keyTypes), set([b'ssh-rsa', b'ssh-dss'])) self.assertEqual(self.mockos.seteuidCalls, [0, os.geteuid()]) self.assertEqual(self.mockos.setegidCalls, [0, os.getegid()])
def main(): #change to data directory if needed os.chdir("/root/data") #redirect outputs to a logfile sys.stdout = sys.stderr = Log(open(LOGFILE, 'a+')) #ensure the that the daemon runs a normal user os.setegid(103) #set group first "pydaemon" os.seteuid(103) #set user "pydaemon" #start the user program here: USERPROG()
def switchUID(uid, gid, euid=False): if euid: setuid = os.seteuid setgid = os.setegid else: setuid = os.setuid setgid = os.setgid if gid is not None: setgid(gid) if uid is not None: initgroups(uid, gid) setuid(uid)
def requestAvatarId(self, credentials): if pwd: try: cryptedPass = pwd.getpwnam(credentials.username)[1] except KeyError: return defer.fail(UnauthorizedLogin()) else: if cryptedPass not in ['*', 'x'] and \ verifyCryptedPassword(cryptedPass, credentials.password): return defer.succeed(credentials.username) if shadow: gid = os.getegid() uid = os.geteuid() os.setegid(0) os.seteuid(0) try: shadowPass = shadow.getspnam(credentials.username)[1] except KeyError: os.setegid(gid) os.seteuid(uid) return defer.fail(UnauthorizedLogin()) os.setegid(gid) os.seteuid(uid) if verifyCryptedPassword(shadowPass, credentials.password): return defer.succeed(credentials.username) return defer.fail(UnauthorizedLogin()) return defer.fail(UnauthorizedLogin())
def checkKey(self, credentials): sshDir = os.path.expanduser('~%s/.ssh/' % credentials.username) if sshDir.startswith('~'): # didn't expand return 0 uid, gid = os.geteuid(), os.getegid() ouid, ogid = pwd.getpwnam(credentials.username)[2:4] os.setegid(0) os.seteuid(0) os.setegid(ogid) os.seteuid(ouid) for name in ['authorized_keys2', 'authorized_keys']: if not os.path.exists(sshDir+name): continue lines = open(sshDir+name).xreadlines() os.setegid(0) os.seteuid(0) os.setegid(gid) os.seteuid(uid) for l in lines: l2 = l.split() if len(l2) < 2: continue try: if base64.decodestring(l2[1]) == credentials.blob: return 1 except binascii.Error: continue return 0
def _runAsUser(self, f, *args, **kw): euid = os.geteuid() egid = os.getegid() groups = os.getgroups() uid, gid = self.getUserGroupId() os.setegid(0) os.seteuid(0) os.setgroups(self.getOtherGroups()) os.setegid(gid) os.seteuid(uid) try: f = iter(f) except TypeError: f = [(f, args, kw)] try: for i in f: func = i[0] args = len(i)>1 and i[1] or () kw = len(i)>2 and i[2] or {} r = func(*args, **kw) finally: os.setegid(0) os.seteuid(0) os.setgroups(groups) os.setegid(egid) os.seteuid(euid) return r
def getPtyOwnership(self): ttyGid = os.stat(self.ptyTuple[2])[5] uid, gid = self.avatar.getUserGroupId() euid, egid = os.geteuid(), os.getegid() os.setegid(0) os.seteuid(0) try: os.chown(self.ptyTuple[2], uid, ttyGid) finally: os.setegid(egid) os.seteuid(euid)
def pull(directory): """ Pulls latest changes with the user rights that owns the folder """ try: st = os.stat(directory) logger.info("Pulling as {0}:{1}...".format(st.st_uid, st.st_gid)) # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed os.setegid(st.st_uid) os.seteuid(st.st_gid) repo = git.Repo(directory) info = repo.remotes.origin.pull()[0] if info.flags & info.ERROR: logger.error("Pull failed: {0}".format(info.note)) return False elif info.flags & info.REJECTED: logger.error("Could not merge after pull: {0}".format(info.note)) return False elif info.flags & info.HEAD_UPTODATE: logger.info("Head is already up to date") except PermissionError: logger.error("Insufficient permissions to set uid/gid") return False finally: logger.info("Restoring root permissions") os.setegid(0) os.seteuid(0) return True
def run(project, command, directory, slack_webhook_url): """ Run the specified command as the user that owns the directory """ try: st = os.stat(directory) # order is important: after seteuid() call the effective UID isn't 0 anymore, so seteuid() will not be allowed os.setegid(st.st_uid) os.seteuid(st.st_gid) logger.info("Changing working directory to '{0}'".format(directory)) logger.info("Spawning background command '{0}' as {1}:{2} for '{3}'...".format(command, st.st_uid, st.st_gid, project)) def background(): """ I don't care how long it takes to run the command, but Bitbucket gets angry when it takes longer than 10 seconds. My npm build takes around 15 secs, so I'd get 3 Webhooks from Bitbucket, because it thinks each Webhook timedout. Easy way out is to return response immediately and start a background thread that does all of the heavy lifting. """ start_time = time.time() output = subprocess.check_output(command, shell=True, cwd=directory, stderr=subprocess.STDOUT) completed_in = time.time() - start_time logger.info("'{0}' background command finished in {1:.2f} seconds".format(project, completed_in)) if slack_webhook_url: slack_notification(slack_webhook_url, "Deployed `{0}` in {1:.2f} seconds! :rocket:".format(project, completed_in), output) Thread(target=background).start() except PermissionError: logger.error("Insufficient permissions to set uid/gid") except subprocess.CalledProcessError as e: logger.error("Error: {0}".format(e.output)) finally: logger.info("Restoring root permissions") os.setegid(0) os.seteuid(0)
def test_seteuid(self): if os.getuid() != 0: self.assertRaises(os.error, os.seteuid, 0) self.assertRaises(OverflowError, os.seteuid, 1<<32)
def setUp(self): safe_rmpath(TESTFN) TestProcess.setUp(self) os.setegid(1000) os.seteuid(1000)
def tearDown(self): os.setegid(self.PROCESS_UID) os.seteuid(self.PROCESS_GID) TestProcess.tearDown(self)
def test_seteuid(self): if os.getuid() != 0: self.assertRaises(OSError, os.seteuid, 0) self.assertRaises(OverflowError, os.seteuid, 1<<32)
def _try_run_as_postgres(self): if platform.LINUX and os.getegid() == 0: try: uid = pwd.getpwnam('postgres').pw_uid os.seteuid(uid) return True except Exception as e: logging.error('Failed run as postgres: {0}'.format(e)) pass return False
def _try_run_as_postgres(self): if platform.UNIX and os.getegid() == 0: try: import pwd uid = pwd.getpwnam('postgres').pw_uid os.seteuid(uid) return True except Exception as e: sys.stderr.write("Failed run as postgres: {0}\n".format(e)) pass return False
def impersonate_user(self, username, password): """Change process effective user/group ids to reflect logged in user. """ try: pwdstruct = pwd.getpwnam(username) except KeyError: raise AuthorizerError(self.msg_no_such_user) else: os.setegid(pwdstruct.pw_gid) os.seteuid(pwdstruct.pw_uid)
def terminate_impersonation(self, username): """Revert process effective user/group IDs.""" os.setegid(PROCESS_GID) os.seteuid(PROCESS_UID)
def _change_process_user_group(self): # type: () -> None if self.user: self._log("changing process user to {}".format(self.user)) os.seteuid(self.user) if self.group: self._log("changing process group to {}".format(self.group)) os.setegid(self.group)
def runAsEffectiveUser(euid, egid, function, *args, **kwargs): """ Run the given function wrapped with seteuid/setegid calls. This will try to minimize the number of seteuid/setegid calls, comparing current and wanted permissions @param euid: effective UID used to call the function. @type euid: C{int} @type egid: effective GID used to call the function. @param egid: C{int} @param function: the function run with the specific permission. @type function: any callable @param *args: arguments passed to C{function} @param **kwargs: keyword arguments passed to C{function} """ uid, gid = os.geteuid(), os.getegid() if uid == euid and gid == egid: return function(*args, **kwargs) else: if uid != 0 and (uid != euid or gid != egid): os.seteuid(0) if gid != egid: os.setegid(egid) if euid != 0 and (euid != uid or gid != egid): os.seteuid(euid) try: return function(*args, **kwargs) finally: if euid != 0 and (uid != euid or gid != egid): os.seteuid(0) if gid != egid: os.setegid(gid) if uid != 0 and (uid != euid or gid != egid): os.seteuid(uid)
def setUp(self): self.factory = OpenSSHFactory() self.keysDir = FilePath(self.mktemp()) self.keysDir.makedirs() self.factory.dataRoot = self.keysDir.path self.moduliDir = FilePath(self.mktemp()) self.moduliDir.makedirs() self.factory.moduliRoot = self.moduliDir.path self.keysDir.child("ssh_host_foo").setContent(b"foo") self.keysDir.child("bar_key").setContent(b"foo") self.keysDir.child("ssh_host_one_key").setContent( keydata.privateRSA_openssh) self.keysDir.child("ssh_host_two_key").setContent( keydata.privateDSA_openssh) self.keysDir.child("ssh_host_three_key").setContent( b"not a key content") self.keysDir.child("ssh_host_one_key.pub").setContent( keydata.publicRSA_openssh) self.moduliDir.child("moduli").setContent(b""" # $OpenBSD: moduli,v 1.xx 2016/07/26 12:34:56 jhacker Exp $ # Time Type Tests Tries Size Generator Modulus 20030501000000 2 6 100 2047 2 FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE45B3DC2007CB8A163BF0598DA48361C55D39A69163FA8FD24CF5F83655D23DCA3AD961C62F356208552BB9ED529077096966D670C354E4ABC9804F1746C08CA18217C32905E462E36CE3BE39E772C180E86039B2783A2EC07A28FB5C55DF06F4C52C9DE2BCBF6955817183995497CEA956AE515D2261898FA051015728E5A8AACAA68FFFFFFFFFFFFFFFF 19981111000000 2 6 100 1023 2 FFFFFFFFFFFFFFFFC90FDAA22168C234C4C6628B80DC1CD129024E088A67CC74020BBEA63B139B22514A08798E3404DDEF9519B3CD3A431B302B0A6DF25F14374FE1356D6D51C245E485B576625E7EC6F44C42E9A637ED6B0BFF5CB6F406B7EDEE386BFB5A899FA5AE9F24117C4B1FE649286651ECE65381FFFFFFFFFFFFFFFF """) self.mockos = MockOS() self.patch(os, "seteuid", self.mockos.seteuid) self.patch(os, "setegid", self.mockos.setegid)
def _runAsUser(self, f, *args, **kw): euid = os.geteuid() egid = os.getegid() groups = os.getgroups() uid, gid = self.getUserGroupId() os.setegid(0) os.seteuid(0) os.setgroups(self.getOtherGroups()) os.setegid(gid) os.seteuid(uid) try: f = iter(f) except TypeError: f = [(f, args, kw)] try: for i in f: func = i[0] args = len(i) > 1 and i[1] or () kw = len(i) > 2 and i[2] or {} r = func(*args, **kw) finally: os.setegid(0) os.seteuid(0) os.setgroups(groups) os.setegid(egid) os.seteuid(euid) return r
def seteuid(self, egid): """ Mock C{os.seteuid}, store result. """ self.seteuidCalls.append(egid)
def tor_new_process(): """ Drops privileges to TOR_USER user and start a new Tor process """ debian_tor_uid = getpwnam(TOR_USER).pw_uid debian_tor_gid = getpwnam(TOR_USER).pw_gid os.setgid(debian_tor_gid) os.setuid(debian_tor_uid) os.setegid(debian_tor_gid) os.seteuid(debian_tor_uid) os.environ['HOME'] = "/var/lib/tor" tor_process = stem.process.launch_tor_with_config( config = { 'SocksPort': '6666', 'ControlPort': '6969', 'DNSPort': '9053', 'DNSListenAddress': '127.0.0.1', 'AutomapHostsOnResolve': '1', 'AutomapHostsSuffixes': '.exit,.onion', 'VirtualAddrNetwork': '10.192.0.0/10', 'TransPort': '9040', 'TransListenAddress': '127.0.0.1', 'AvoidDiskWrites': '1', 'WarnUnsafeSocks': '1', })
def become_persona(self): if self.persona is not (None, None): uid, gid = self.persona # the order of these is important! os.setegid(gid) os.seteuid(uid)
def become_nobody(self): if self.persona is not (None, None): os.seteuid(self.PROCESS_UID) os.setegid(self.PROCESS_GID) # cwd, cdup, open, listdir
def change_users_and_groups(mamaji_data): current_users = mamaji_data['current_users'] current_groups = mamaji_data['current_groups'] pending_users = mamaji_data['pending_users'] pending_groups = mamaji_data['pending_groups'] groups = mamaji_data['supplementary_groups'] if groups: os.setgroups(groups) group_types = [k for k in ['rgid', 'egid', 'sgid'] if pending_groups[k] is not None] group_types_len = len(group_types) if group_types_len == 3: setresgid(pending_groups['rgid'], pending_groups['egid'], pending_groups['sgid']) elif group_types_len == 2: if 'rgid' in group_types and 'egid' in group_types: os.setregid(pending_groups['rgid'], pending_groups['egid']) elif group_types_len == 1: if 'egid' in group_types: os.setegid(pending_groups['egid']) user_types = [k for k in ['ruid', 'euid', 'suid'] if pending_users[k] is not None] user_types_len = len(user_types) if user_types_len == 3: setresuid(pending_users['ruid'], pending_users['euid'], pending_users['suid']) elif user_types_len == 2: if 'ruid' in user_types and 'euid' in user_types: os.setreuid(pending_users['ruid'], pending_users['euid']) elif user_types_len == 1: if 'euid' in user_types: os.seteuid(pending_users['euid']) if pending_groups['gid'] is not None: os.setgid(pending_groups['gid']) if pending_users['uid'] is not None: os.setuid(pending_users['uid'])
def switchUID(uid, gid, euid=False): """ Attempts to switch the uid/euid and gid/egid for the current process. If C{uid} is the same value as L{os.getuid} (or L{os.geteuid}), this function will issue a L{UserWarning} and not raise an exception. @type uid: C{int} or L{None} @param uid: the UID (or EUID) to switch the current process to. This parameter will be ignored if the value is L{None}. @type gid: C{int} or L{None} @param gid: the GID (or EGID) to switch the current process to. This parameter will be ignored if the value is L{None}. @type euid: C{bool} @param euid: if True, set only effective user-id rather than real user-id. (This option has no effect unless the process is running as root, in which case it means not to shed all privileges, retaining the option to regain privileges in cases such as spawning processes. Use with caution.) """ if euid: setuid = os.seteuid setgid = os.setegid getuid = os.geteuid else: setuid = os.setuid setgid = os.setgid getuid = os.getuid if gid is not None: setgid(gid) if uid is not None: if uid == getuid(): uidText = (euid and "euid" or "uid") actionText = "tried to drop privileges and set%s %s" % (uidText, uid) problemText = "%s is already %s" % (uidText, getuid()) warnings.warn("%s but %s; should we be root? Continuing." % (actionText, problemText)) else: initgroups(uid, gid) setuid(uid)