我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.setgid()。
def _dropPrivileges(self, user, group): import pwd, grp # Get the uid/gid from the name runningUid = pwd.getpwnam(user).pw_uid runningGid = grp.getgrnam(group).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(runningGid) os.setuid(runningUid) # Reset logging self.resetLogging()
def drop_privileges(self, uid_name, gid_name): if os.getuid() != 0: # We're not root so, like, whatever dude self.logger.info("Not running as root. Cannot drop permissions.") return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(0o077) self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
def change_process_owner(uid, gid): """ Change the owning UID and GID of this process. :param uid: The target UID for the daemon process. :param gid: The target GID for the daemon process. :return: ``None``. Set the GID then the UID of the process (in that order, to avoid permission errors) to the specified `gid` and `uid` values. Requires appropriate OS privileges for this process. """ try: os.setgid(gid) os.setuid(uid) except Exception as exc: error = DaemonOSEnvironmentError( "Unable to change process owner ({exc})".format(exc=exc)) raise error
def sudo(user): raise NotImplemented """ Run your function as the given user Please note that this *permanently* changes user, you won't be able to change back unless you have sudo privileges. Best used inside @background. """ user = pwd.getpwnam(user) print(user) def decorator(func): def func_wrapper(*args,**kwargs): os.setuid(user.pw_uid) os.setgid(user.pw_gid) p = func(*args,**kwargs) return p return func_wrapper return decorator
def set_owner_process(uid, gid, initgroups=False): """ set user and group of workers processes """ if gid: if uid: try: username = get_username(uid) except KeyError: initgroups = False # versions of python < 2.6.2 don't manage unsigned int for # groups like on osx or fedora gid = abs(gid) & 0x7FFFFFFF if initgroups: os.initgroups(username, gid) else: os.setgid(gid) if uid: os.setuid(uid)
def _setgroup(group): ''' Normalizes group to a gid and sets the current gid, or does nothing if group is None. ''' if group is None: return # Normalize group to gid elif isinstance(group, str): gid = grp.getgrnam(group).gr_gid # The group is already a gid. else: gid = group try: os.setgid(gid) except OSError: self.logger.error('Unable to change group.') sys.exit(1)
def setugid(user): """Change process user and group ID Argument is a numeric user id or a user name""" try: from pwd import getpwuid passwd = getpwuid(int(user)) except ValueError: from pwd import getpwnam passwd = getpwnam(user) if hasattr(os, 'initgroups'): # python >= 2.7 os.initgroups(passwd.pw_name, passwd.pw_gid) else: import ctypes if ctypes.CDLL(None).initgroups(passwd.pw_name, passwd.pw_gid) < 0: err = ctypes.c_int.in_dll(ctypes.pythonapi,"errno").value raise OSError(err, os.strerror(err), 'initgroups') os.setgid(passwd.pw_gid) os.setuid(passwd.pw_uid) os.environ['HOME'] = passwd.pw_dir
def drop_privileges(uid_name='nobody', gid_name='nogroup'): if os.getuid() != 0: # We're not root so, like, whatever dude return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(077)
def drop_privileges(uid_name='nobody', gid_name='nobody'): import os, pwd, grp if os.getuid() != 0: # We're not root so, like, whatever dude return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(0o77)
def _execChild(self, path, uid, gid, executable, args, environment): """ The exec() which is done in the forked child. """ if path: os.chdir(path) if uid is not None or gid is not None: if uid is None: uid = os.geteuid() if gid is None: gid = os.getegid() # set the UID before I actually exec the process os.setuid(0) os.setgid(0) switchUID(uid, gid) os.execvpe(executable, args, environment)
def test_mockSetUid(self): """ Try creating a process with setting its uid: it's almost the same path as the standard path, but with a C{switchUID} call before the exec. """ cmd = b'/mock/ouch' d = defer.Deferred() p = TrivialProcessProtocol(d) try: reactor.spawnProcess(p, cmd, [b'ouch'], env=None, usePTY=False, uid=8080) except SystemError: self.assertTrue(self.mockos.exited) self.assertEqual( self.mockos.actions, [('fork', False), ('setuid', 0), ('setgid', 0), ('switchuid', 8080, 1234), 'exec', ('exit', 1)]) else: self.fail("Should not be here")
def drop_privileges(uid_name="nobody", gid_name="nogroup"): if os.getuid() != 0: # Already not root, take no action return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(077)
def drop_privileges_Arch(uid_name="nobody", gid_name="nobody"): if os.getuid() != 0: # Already not root, take no action return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(077)
def run(): np.random.seed(42) config = ServerConfig.load(('./server.conf',)) if sys.version_info[2] >= 6: thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-') else: thread_pool = ThreadPoolExecutor(max_workers=32) app = Application(config, thread_pool) if config.ssl_key: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key) app.listen(config.port, ssl_options=ssl_ctx) else: app.listen(config.port) if config.user: os.setgid(grp.getgrnam(config.user)[2]) os.setuid(pwd.getpwnam(config.user)[2]) if sd: sd.notify('READY=1') tokenizer_service = TokenizerService() tokenizer_service.run() for language in config.languages: load_language(app, tokenizer_service, language, config.get_model_directory(language)) sys.stdout.flush() tornado.ioloop.IOLoop.current().start()
def drop_privileges(user): '''If running as root, drop process privileges to the given user and user's main group.''' if os.getuid() == 0: pwnam = pwd.getpwnam(user) running_uid, running_gid = (pwnam[2], pwnam[3]) if running_gid != os.getgid(): os.setgid(running_gid) if running_uid != os.getuid(): os.setuid(running_uid)
def set_user(username): if username is None: return import pwd import grp try: pwrec = pwd.getpwnam(username) except KeyError: logging.error('user not found: %s' % username) raise user = pwrec[0] uid = pwrec[2] gid = pwrec[3] cur_uid = os.getuid() if uid == cur_uid: return if cur_uid != 0: logging.error('can not set user as nonroot user') # will raise later # inspired by supervisor if hasattr(os, 'setgroups'): groups = [grprec[2] for grprec in grp.getgrall() if user in grprec[3]] groups.insert(0, gid) os.setgroups(groups) os.setgid(gid) os.setuid(uid)
def _run_as_user(user, gid=None): try: user = pwd.getpwnam(user) except KeyError: log('Invalid user: %s' % user) raise Exception uid = user.pw_uid gid = gid or user.pw_gid os.environ['HOME'] = user.pw_dir def _inner(): os.setgid(gid) os.setuid(uid) return _inner
def become_user(name): ''' Change the current process' effective UID to that of the given user name. Can only be called by super user 0. This function is only intended for use from the ``init`` process during system boot. :arg name: An OS user name. Must be found in the ``password`` database, or a replacement authentication system. :returns: The user's home directory. ''' uid = ave.pwd.getpwnam_uid(name) gid = ave.pwd.getpwnam_gid(name) if os.geteuid() == uid: return if os.geteuid() != 0: raise Exception('only root can execute with modified privileges') try: os.setgid(gid) # must be done before changing euid os.setuid(uid) except OSError, e: if e.errno == errno.EPERM: raise Exception( 'could not execute with modified privileges: %s' % str(e) ) return ave.pwd.getpwnam_dir(name)
def initgroups(uid, primaryGid): """Initializes the group access list. This is done by reading the group database /etc/group and using all groups of which C{uid} is a member. The additional group C{primaryGid} is also added to the list. If the given user is a member of more than C{NGROUPS}, arbitrary groups will be silently discarded to bring the number below that limit. """ try: # Try to get the maximum number of groups max_groups = os.sysconf("SC_NGROUPS_MAX") except: # No predefined limit max_groups = 0 username = pwd.getpwuid(uid)[0] l = [] if primaryGid is not None: l.append(primaryGid) for groupname, password, gid, userlist in grp.getgrall(): if username in userlist: l.append(gid) if len(l) == max_groups: break # No more groups, ignore any more try: _setgroups_until_success(l) except OSError, e: # We might be able to remove this code now that we # don't try to setgid/setuid even when not asked to. if e.errno == errno.EPERM: for g in getgroups(): if g not in l: raise else: raise
def switchUID(uid, gid, euid=False): if euid: setuid = os.seteuid setgid = os.setegid else: setuid = os.setuid setgid = os.setgid if gid is not None: setgid(gid) if uid is not None: initgroups(uid, gid) setuid(uid)
def contain(command, image_name, image_dir, container_id, container_dir, cpu_shares, memory, memory_swap, user): _setup_cpu_cgroup(container_id, cpu_shares) _setup_memory_cgroup(container_id, memory, memory_swap) linux.sethostname(container_id) # change hostname to container_id linux.mount(None, '/', None, linux.MS_PRIVATE | linux.MS_REC, None) new_root = create_container_root( image_name, image_dir, container_id, container_dir) print('Created a new root fs for our container: {}'.format(new_root)) _create_mounts(new_root) old_root = os.path.join(new_root, 'old_root') os.makedirs(old_root) linux.pivot_root(new_root, old_root) os.chdir('/') linux.umount2('/old_root', linux.MNT_DETACH) # umount old root os.rmdir('/old_root') # rmdir the old_root dir # TODO: if user is set, drop privileges using os.setuid() # (and optionally os.setgid()). os.execvp(command[0], command)
def daemonize(keepfd=None, chdir='/'): os.umask(0) if chdir: os.chdir(chdir) else: os.chdir('/') os.setgid(os.getgid()) # relinquish elevations os.setuid(os.getuid()) # relinquish elevations # Double fork to daemonize if os.fork() > 0: os._exit(0) # Parent exits os.setsid() # Obtain new process group if os.fork() > 0: os._exit(0) # Parent exits # Signal handling signal.signal(signal.SIGTERM, signal.SIG_IGN) signal.signal(signal.SIGINT, signal.SIG_IGN) # Close open files maxfd = resource.getrlimit(resource.RLIMIT_NOFILE)[1] if maxfd == resource.RLIM_INFINITY: maxfd = 256 for fd in reversed(range(maxfd)): try: if fd != keepfd: os.close(fd) except OSError: _, exc, _ = sys.exc_info() if exc.errno != errno.EBADF: raise # Redirect I/O to /dev/null os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdin.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stdout.fileno()) os.dup2(os.open(os.devnull, os.O_RDWR), sys.stderr.fileno())
def preexec_fn(user_uid, user_gid, user_home): def result(): os.setgid(user_gid) os.setuid(user_uid) os.environ["HOME"] = user_home return result
def __init__(self, request, client_address, server): # Change user and group (only when runned as root) if (os.getgid() == 0) and pysshrp.common.config.userId: os.setgid(pysshrp.common.config.userId) if (os.getuid() == 0) and pysshrp.common.config.groupId: os.setuid(pysshrp.common.config.groupId) SocketServer.BaseRequestHandler.__init__(self, request, client_address, server)
def set_owner_process(uid, gid): """ set user and group of workers processes """ if gid: # versions of python < 2.6.2 don't manage unsigned int for # groups like on osx or fedora gid = abs(gid) & 0x7FFFFFFF os.setgid(gid) if uid: os.setuid(uid)
def demote(self): # demote root user to any specified user or group try: if os.getuid() == 0: # drop supplementary groups os.setgroups([]) if self.group: try: os.setgid(self.the_grp.gr_gid) except Exception, ex: logging.critical("failed to set group to \"%s\" [%s]" % (self.group, str(ex))) sys.exit(1) if self.user: try: the_pwd = pwd.getpwnam(self.user) os.setuid(self.the_pwd.pw_uid) except Exception, ex: logging.critical("failed to set user to \"%s\" [%s]" % (self.user, str(ex))) sys.exit(1) else: if self.user or self.group: logging.critical('not privileged ~~ cannot change to user [%s] / group [%s]' % (self.user, self.group)) sys.exit(1) except Exception, ex: logging.critical("daemon.demote() caught exception [%s]" % str(ex)) sys.exit(1)
def test_setgid(self): if os.getuid() != 0: self.assertRaises(os.error, os.setgid, 0) self.assertRaises(OverflowError, os.setgid, 1<<32)
def run_package_reporter(self): """ Run the L{PackageReporter} if there were successfully completed tasks. """ if self.handled_tasks_count == 0: # Nothing was done return if os.getuid() == 0: os.setgid(grp.getgrnam("landscape").gr_gid) os.setuid(pwd.getpwnam("landscape").pw_uid) command = find_reporter_command(self._config) if self._config.config is not None: command += " -c %s" % self._config.config os.system(command)
def _set_permission(self): pw = getpwnam(self.username) uid = pw.pw_uid gid = pw.pw_gid os.setgroups([gid]) os.setgid(gid) os.setuid(uid)