我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.setuid()。
def _dropPrivileges(self, user, group): import pwd, grp # Get the uid/gid from the name runningUid = pwd.getpwnam(user).pw_uid runningGid = grp.getgrnam(group).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(runningGid) os.setuid(runningUid) # Reset logging self.resetLogging()
def drop_privileges(self, uid_name, gid_name): if os.getuid() != 0: # We're not root so, like, whatever dude self.logger.info("Not running as root. Cannot drop permissions.") return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(0o077) self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
def change_process_owner(uid, gid): """ Change the owning UID and GID of this process. :param uid: The target UID for the daemon process. :param gid: The target GID for the daemon process. :return: ``None``. Set the GID then the UID of the process (in that order, to avoid permission errors) to the specified `gid` and `uid` values. Requires appropriate OS privileges for this process. """ try: os.setgid(gid) os.setuid(uid) except Exception as exc: error = DaemonOSEnvironmentError( "Unable to change process owner ({exc})".format(exc=exc)) raise error
def sudo(user): raise NotImplemented """ Run your function as the given user Please note that this *permanently* changes user, you won't be able to change back unless you have sudo privileges. Best used inside @background. """ user = pwd.getpwnam(user) print(user) def decorator(func): def func_wrapper(*args,**kwargs): os.setuid(user.pw_uid) os.setgid(user.pw_gid) p = func(*args,**kwargs) return p return func_wrapper return decorator
def set_owner_process(uid, gid, initgroups=False): """ set user and group of workers processes """ if gid: if uid: try: username = get_username(uid) except KeyError: initgroups = False # versions of python < 2.6.2 don't manage unsigned int for # groups like on osx or fedora gid = abs(gid) & 0x7FFFFFFF if initgroups: os.initgroups(username, gid) else: os.setgid(gid) if uid: os.setuid(uid)
def _setuser(user): ''' Normalizes user to a uid and sets the current uid, or does nothing if user is None. ''' if user is None: return # Normalize group to gid elif isinstance(user, str): uid = pwd.getpwnam(user).pw_uid # The group is already a gid. else: uid = user try: os.setuid(uid) except OSError: self.logger.error('Unable to change user.') sys.exit(1)
def drop_privileges(uid_name='nobody'): """Drop root privileges.""" if os.getuid() != 0: # We're not root, nothing to do. return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setuid(running_uid) # Ensure a very conservative umask os.umask(0o77) # TODO: probably redundant, as it will not have access to the # cred cache anyway. os.environ['KRB5CCNAME'] = 'FILE:/no_such_krbcc'
def setugid(user): """Change process user and group ID Argument is a numeric user id or a user name""" try: from pwd import getpwuid passwd = getpwuid(int(user)) except ValueError: from pwd import getpwnam passwd = getpwnam(user) if hasattr(os, 'initgroups'): # python >= 2.7 os.initgroups(passwd.pw_name, passwd.pw_gid) else: import ctypes if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0: err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value raise OSError(err, os.strerror(err), 'initgroups') os.setgid(passwd.pw_gid) os.setuid(passwd.pw_uid) os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name='nobody', gid_name='nogroup'): if os.getuid() != 0: # We're not root so, like, whatever dude return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(077)
def drop_privileges(uid_name='nobody', gid_name='nobody'): import os, pwd, grp if os.getuid() != 0: # We're not root so, like, whatever dude return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(0o77)
def _execChild(self, path, uid, gid, executable, args, environment): """ The exec() which is done in the forked child. """ if path: os.chdir(path) if uid is not None or gid is not None: if uid is None: uid = os.geteuid() if gid is None: gid = os.getegid() # set the UID before I actually exec the process os.setuid(0) os.setgid(0) switchUID(uid, gid) os.execvpe(executable, args, environment)
def test_mockSetUid(self): """ Try creating a process with setting its uid: it's almost the same path as the standard path, but with a C{switchUID} call before the exec. """ cmd = b'/mock/ouch' d = defer.Deferred() p = TrivialProcessProtocol(d) try: reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=False, uid=8080) except SystemError: self.assertTrue(self.mockos.exited) self.assertEqual( self.mockos.actions, [('fork', False), ('setuid', 0), ('setgid', 0), ('switchuid', 8080, 1234), 'exec', ('exit', 1)]) else: self.fail("Should not be here")
def drop_privileges(uid_name="nobody", gid_name="nogroup"): if os.getuid() != 0: # Already not root, take no action return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(077)
def drop_privileges_Arch(uid_name="nobody", gid_name="nobody"): if os.getuid() != 0: # Already not root, take no action return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(077)
def run(): np.random.seed(42) config = ServerConfig.load(('./server.conf',)) if sys.version_info[2] >= 6: thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-') else: thread_pool = ThreadPoolExecutor(max_workers=32) app = Application(config, thread_pool) if config.ssl_key: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key) app.listen(config.port, ssl_options=ssl_ctx) else: app.listen(config.port) if config.user: os.setgid(grp.getgrnam(config.user)[2]) os.setuid(pwd.getpwnam(config.user)[2]) if sd: sd.notify('READY=1') tokenizer_service = TokenizerService() tokenizer_service.run() for language in config.languages: load_language(app, tokenizer_service, language, config.get_model_directory(language)) sys.stdout.flush() tornado.ioloop.IOLoop.current().start()
def drop_privileges(user): '''If running as root, drop process privileges to the given user and user's main group.''' if os.getuid() == 0: pwnam = pwd.getpwnam(user) running_uid, running_gid = (pwnam[2], pwnam[3]) if running_gid != os.getgid(): os.setgid(running_gid) if running_uid != os.getuid(): os.setuid(running_uid)
def _demo_posix(): # # Example 1: Simple redirection: Get process list # plist = Popen(["ps"], stdout=PIPE).communicate()[0] print "Process list:" print plist # # Example 2: Change uid before executing child # if os.getuid() == 0: p = Popen(["id"], preexec_fn=lambda: os.setuid(100)) p.wait() # # Example 3: Connecting several subprocesses # print "Looking for 'hda'..." p1 = Popen(["dmesg"], stdout=PIPE) p2 = Popen(["grep", "hda"], stdin=p1.stdout, stdout=PIPE) print repr(p2.communicate()[0]) # # Example 4: Catch execution error # print print "Trying a weird file..." try: print Popen(["/this/path/does/not/exist"]).communicate() except OSError, e: if e.errno == errno.ENOENT: print "The file didn't exist. I thought so..." print "Child traceback:" print e.child_traceback else: print "Error", e.errno else: print >>sys.stderr, "Gosh. No error."
def set_user(username): if username is None: return import pwd import grp try: pwrec = pwd.getpwnam(username) except KeyError: logging.error('user not found: %s' % username) raise user = pwrec[0] uid = pwrec[2] gid = pwrec[3] cur_uid = os.getuid() if uid == cur_uid: return if cur_uid != 0: logging.error('can not set user as nonroot user') # will raise later # inspired by supervisor if hasattr(os, 'setgroups'): groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]] groups.insert(0, gid) os.setgroups(groups) os.setgid(gid) os.setuid(uid)
def _run_as_user(user, gid=None): try: user = pwd.getpwnam(user) except KeyError: log('Invalid user: %s' % user) raise Exception uid = user.pw_uid gid = gid or user.pw_gid os.environ['HOME'] = user.pw_dir def _inner(): os.setgid(gid) os.setuid(uid) return _inner
def become_user(name): ''' Change the current process' effective UID to that of the given user name. Can only be called by super user 0. This function is only intended for use from the ``init`` process during system boot. :arg name: An OS user name. Must be found in the ``password`` database, or a replacement authentication system. :returns: The user's home directory. ''' uid = ave.pwd.getpwnam_uid(name) gid = ave.pwd.getpwnam_gid(name) if os.geteuid() == uid: return if os.geteuid() != 0: raise Exception('only root can execute with modified privileges') try: os.setgid(gid) # must be done before changing euid os.setuid(uid) except OSError, e: if e.errno == errno.EPERM: raise Exception( 'could not execute with modified privileges: %s' % str(e) ) return ave.pwd.getpwnam_dir(name)
def _check_pid(self): # Lame fork detection to remind developers to invoke Random.atfork() # after every call to os.fork(). Note that this check is not reliable, # since process IDs can be reused on most operating systems. # # You need to do Random.atfork() in the child process after every call # to os.fork() to avoid reusing PRNG state. If you want to avoid # leaking PRNG state to child processes (for example, if you are using # os.setuid()) then you should also invoke Random.atfork() in the # *parent* process. if os.getpid() != self._pid: raise AssertionError("PID check failed. RNG must be re-initialized after fork(). Hint: Try Random.atfork()")
def initgroups(uid, primaryGid): """Initializes the group access list. This is done by reading the group database /etc/group and using all groups of which C{uid} is a member. The additional group C{primaryGid} is also added to the list. If the given user is a member of more than C{NGROUPS}, arbitrary groups will be silently discarded to bring the number below that limit. """ try: # Try to get the maximum number of groups max_groups = os.sysconf("SC_NGROUPS_MAX") except: # No predefined limit max_groups = 0 username = pwd.getpwuid(uid)[0] l = [] if primaryGid is not None: l.append(primaryGid) for groupname, password, gid, userlist in grp.getgrall(): if username in userlist: l.append(gid) if len(l) == max_groups: break # No more groups, ignore any more try: _setgroups_until_success(l) except OSError, e: # We might be able to remove this code now that we # don't try to setgid/setuid even when not asked to. if e.errno == errno.EPERM: for g in getgroups(): if g not in l: raise else: raise
def switchUID(uid, gid, euid=False): if euid: setuid = os.seteuid setgid = os.setegid else: setuid = os.setuid setgid = os.setgid if gid is not None: setgid(gid) if uid is not None: initgroups(uid, gid) setuid(uid)
def contain(command, image_name, image_dir, container_id, container_dir, cpu_shares, memory, memory_swap, user): _setup_cpu_cgroup(container_id, cpu_shares) _setup_memory_cgroup(container_id, memory, memory_swap) linux.sethostname(container_id) # change hostname to container_id linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None) new_root = create_container_root( image_name, image_dir, container_id, container_dir) print('Created a new root fs for our container: {}'.format(new_root)) _create_mounts(new_root) old_root = os.path.join(new_root, 'old_root') os.makedirs(old_root) linux.pivot_root(new_root, old_root) os.chdir('/') linux.umount2('/old_root', linux.MNT_DETACH) # umount old root os.rmdir('/old_root') # rmdir the old_root dir # TODO: if user is set, drop privileges using os.setuid() # (and optionally os.setgid()). os.execvp(command[0], command)
def daemonize(keepfd=None, chdir='/'): os.umask(0) if chdir: os.chdir(chdir) else: os.chdir('/') os.setgid(os.getgid()) # relinquish elevations os.setuid(os.getuid()) # relinquish elevations # Double fork to daemonize if os.fork() > 0: os._exit(0) # Parent exits os.setsid() # Obtain new process group if os.fork() > 0: os._exit(0) # Parent exits # Signal handling signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) # Close open files maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if maxfd == resource.RLIM_INFINITY: maxfd = 256 for fd in reversed(range(maxfd)): try: if fd != keepfd: os.close(fd) except OSError: _, exc, _ = sys.exc_info() if exc.errno != errno.EBADF: raise # Redirect I/O to /dev/null os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
def preexec_fn(user_uid, user_gid, user_home): def result(): os.setgid(user_gid) os.setuid(user_uid) os.environ["HOME"] = user_home return result
def __init__(self, request, client_address, server): # Change user and group (only when runned as root) if (os.getgid() == 0) and pysshrp.common.config.userId: os.setgid(pysshrp.common.config.userId) if (os.getuid() == 0) and pysshrp.common.config.groupId: os.setuid(pysshrp.common.config.groupId) SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
def set_owner_process(uid, gid): """ set user and group of workers processes """ if gid: # versions of python < 2.6.2 don't manage unsigned int for # groups like on osx or fedora gid = abs(gid) & 0x7FFFFFFF os.setgid(gid) if uid: os.setuid(uid)
def demote(self): # demote root user to any specified user or group try: if os.getuid() == 0: # drop supplementary groups os.setgroups([]) if self.group: try: os.setgid(self.the_grp.gr_gid) except Exception, ex: logging.critical("failed to set group to \"%s\" [%s]" % (self.group, str(ex))) sys.exit(1) if self.user: try: the_pwd = pwd.getpwnam(self.user) os.setuid(self.the_pwd.pw_uid) except Exception, ex: logging.critical("failed to set user to \"%s\" [%s]" % (self.user, str(ex))) sys.exit(1) else: if self.user or self.group: logging.critical('not privileged ~~ cannot change to user [%s] / group [%s]' % (self.user, self.group)) sys.exit(1) except Exception, ex: logging.critical("daemon.demote() caught exception [%s]" % str(ex)) sys.exit(1)
def config_datastore(): print u'---------------------------------------------------------------------------------------------' print u' ????DataStore' print u'---------------------------------------------------------------------------------------------' if not os.path.isdir("/home/" + username + "/arcgis/datastore/usr/arcgisdatastore"): # newuid = pwd.getpwnam(username).pw_uid # os.setuid(newuid) os.system('sudo -u ' + username + ' mkdir -p ' + "/home/" + username + "/arcgis/datastore/usr/arcgisdatastore") os.system( 'sudo -u ' + username + ' bash /home/' + username + '/arcgis/datastore/tools/configuredatastore.sh https://' + fqdn + ':6443/arcgis/admin ' + username + ' ' + passwd + ' /home/' + username + '/arcgis/datastore/usr/arcgisdatastore --stores relational,tileCache') print u'---------------------------------------------------------------------------------------------' print u' DataStore????' print u'---------------------------------------------------------------------------------------------'
def signal_handler(signal, frame): print('stopping script') os.setuid(0) for x in data: try: os.kill(int(data[x]['pid']), 9) print('killing ' + x) except: pid_not_found = True print("Byebye...") f = open('.proc','w+') f.write('0') f.close() wipe = subprocess.Popen(["screen", "-wipe"], stdout=subprocess.PIPE) os.kill(os.getpid(), 9)