我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pwd.getpwnam()。
def mkdir(path, owner='root', group='root', perms=0o555, force=False): """Create a directory""" log("Making dir {} {}:{} {:o}".format(path, owner, group, perms)) uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid realpath = os.path.abspath(path) path_exists = os.path.exists(realpath) if path_exists and force: if not os.path.isdir(realpath): log("Removing non-directory file {} prior to mkdir()".format(path)) os.unlink(realpath) os.makedirs(realpath, perms) elif not path_exists: os.makedirs(realpath, perms) os.chown(realpath, uid, gid) os.chmod(realpath, perms)
def chown(self, tarinfo, targetpath): """Set owner of targetpath according to tarinfo. """ if pwd and hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. try: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: g = tarinfo.gid try: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: u = tarinfo.uid try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) else: if sys.platform != "os2emx": os.chown(targetpath, u, g) except EnvironmentError as e: raise ExtractError("could not change owner")
def create_instance_user(problem_name, instance_number): """ Generates a random username based on the problem name. The username returned is guaranteed to not exist. Args: problem_name: The name of the problem instance_number: The unique number for this instance Returns: The created username """ converted_name = sanitize_name(problem_name) username = get_username(converted_name, instance_number) try: #Check if the user already exists. user = getpwnam(username) new = False except KeyError: create_user(username) new = True return username, new
def default(): """ Remove the .../stack/defaults directory. Preserve available_roles """ # Keep yaml human readable/editable friendly_dumper = yaml.SafeDumper friendly_dumper.ignore_aliases = lambda self, data: True preserve = {} content = None pathname = "/srv/pillar/ceph/stack/default/{}/cluster.yml".format('ceph') with open(pathname, "r") as sls_file: content = yaml.safe_load(sls_file) preserve['available_roles'] = content['available_roles'] stack_default = "/srv/pillar/ceph/stack/default" shutil.rmtree(stack_default) os.makedirs("{}/{}".format(stack_default, 'ceph')) with open(pathname, "w") as sls_file: sls_file.write(yaml.dump(preserve, Dumper=friendly_dumper, default_flow_style=False)) uid = pwd.getpwnam("salt").pw_uid gid = grp.getgrnam("salt").gr_gid for path in [stack_default, "{}/{}".format(stack_default, 'ceph'), pathname]: os.chown(path, uid, gid)
def getpwnam_gid(name): cmd = [ '/usr/bin/python2', os.path.realpath(__file__), 'pwd.getpwnam.gid', name ] s,o,e = ave.cmd.run(cmd) o = o.strip() if s != 0: raise Exception(o) return int(o) # a bug in winbindd or its client makes it close "inherited" file descriptors # *before* it forks. it will in fact close one or more file descriptors which # it does not own. unfortunately, the Python pwd module may use this backend on # systems that authenticate against ActiveDirectory. so, calling pwd functions # must be done in a context where no file descriptors can be clobbered (i.e. # none were opened before the call). e.g. in a new process (__main__, below). # it is not entirely safe to communicate with the new process over e.g. a pipe # as winbindd could clobber *that* file descriptor. printing on stdout appears # to be safe, so just execute __file__ as a separate program with some options # to steer the use of pwd and print the result.
def drop_privileges(self, uid_name, gid_name): if os.getuid() != 0: # We're not root so, like, whatever dude self.logger.info("Not running as root. Cannot drop permissions.") return # Get the uid/gid from the name running_uid = pwd.getpwnam(uid_name).pw_uid running_gid = grp.getgrnam(gid_name).gr_gid # Remove group privileges os.setgroups([]) # Try setting the new uid/gid os.setgid(running_gid) os.setuid(running_uid) # Ensure a very conservative umask old_umask = os.umask(0o077) self.logger.info("Changed permissions to: %s: %i, %s, %i"%(uid_name, running_uid, gid_name, running_gid))
def secure_user(user): home = pwd.getpwnam(user).pw_dir # Append only bash history subprocess.check_output(['touch', os.path.join(home, '.bash_history')]) subprocess.check_output(['chown', 'root:' + user, os.path.join(home, '.bash_history')]) subprocess.check_output(['chmod', '660', os.path.join(home, '.bash_history')]) subprocess.check_output(['chattr', '+a', os.path.join(home, '.bash_history')]) # Secure bashrc subprocess.check_output(['cp', '/opt/hacksports/config/securebashrc', os.path.join(home, '.bashrc')]) subprocess.check_output(['chown', 'root:' + user, os.path.join(home, '.bashrc')]) subprocess.check_output(['chmod', '755', os.path.join(home, '.bashrc')]) subprocess.check_output(['chattr', '+a', os.path.join(home, '.bashrc')]) # Secure profile subprocess.check_output(['chown', 'root:' + user, os.path.join(home, '.profile')]) subprocess.check_output(['chmod', '755', os.path.join(home, '.profile')]) subprocess.check_output(['chattr', '+a', os.path.join(home, '.profile')]) # User should not own their home directory subprocess.check_output(["chown", "root:" + user, home]) subprocess.check_output(["chmod", "1770", home])
def __init__(self, username): ConchUser.__init__(self) self.username = username self.pwdData = pwd.getpwnam(self.username) l = [self.pwdData[3]] for groupname, password, gid, userlist in grp.getgrall(): if username in userlist: l.append(gid) self.otherGroups = l self.listeners = {} # dict mapping (interface, port) -> listener self.channelLookup.update( {"session": session.SSHSession, "direct-tcpip": forwarding.openConnectForwardingClient}) self.subsystemLookup.update( {"sftp": filetransfer.FileTransferServer})
def compat_expanduser(path): """Expand ~ and ~user constructions. If user or $HOME is unknown, do nothing.""" if not path.startswith('~'): return path i = path.find('/', 1) if i < 0: i = len(path) if i == 1: if 'HOME' not in os.environ: import pwd userhome = pwd.getpwuid(os.getuid()).pw_dir else: userhome = compat_getenv('HOME') else: import pwd try: pwent = pwd.getpwnam(path[1:i]) except KeyError: return path userhome = pwent.pw_dir userhome = userhome.rstrip('/') return (userhome + path[i:]) or '/'
def get_current_os_user(): """ Find the real user who runs the current process. Return a tuple of uid, username, homedir. :rtype: (int, str, str, int) """ user_name = os.getenv('SUDO_USER') if not user_name: user_name = os.getenv('USER') if user_name: pw = getpwnam(user_name) user_uid = pw.pw_uid else: # If cannot find the user, use ruid instead. user_uid = os.getresuid()[0] pw = getpwuid(user_uid) user_name = pw.pw_name user_gid = pw.pw_gid user_home = pw.pw_dir return user_uid, user_name, user_home, user_gid
def __init__(self): self.module = AnsibleModule( argument_spec = dict( user = dict(required = True, type = 'str') ), supports_check_mode = False ) self.args = { 'user': self.module.params.get('user') } try: self.userInfo = { 'home': pwd.getpwnam(self.args['user']).pw_dir } except KeyError: self.module.fail_json( msg = 'user (%s) not found' % (self.args['user']) )
def initialize_ssh_keys(user='root'): home_dir = pwd.getpwnam(user).pw_dir ssh_dir = os.path.join(home_dir, '.ssh') if not os.path.isdir(ssh_dir): os.mkdir(ssh_dir) priv_key = os.path.join(ssh_dir, 'id_rsa') if not os.path.isfile(priv_key): log('Generating new ssh key for user %s.' % user) cmd = ['ssh-keygen', '-q', '-N', '', '-t', 'rsa', '-b', '2048', '-f', priv_key] check_output(cmd) pub_key = '%s.pub' % priv_key if not os.path.isfile(pub_key): log('Generating missing ssh public key @ %s.' % pub_key) cmd = ['ssh-keygen', '-y', '-f', priv_key] p = check_output(cmd).decode('UTF-8').strip() with open(pub_key, 'wt') as out: out.write(p) check_output(['chown', '-R', user, ssh_dir])
def execute_cmd(cmd, user=None): LOG.debug("Executing cmd:\n%s", cmd_str(cmd)) kwargs = { "shell": True, "stdin": sys.stdin, "stdout": sys.stdout, "stderr": sys.stderr} # If openstackclient command is being executed, appropriate environment # variables will be set for prefix in ["openstack ", "neutron ", "murano "]: if cmd.startswith(prefix): kwargs['preexec_fn'] = openstackclient_preexec_fn() break # Execute as user if `user` param is provided, execute as current user # otherwise else: if user: LOG.debug('Executing as user %s', user) pw_record = pwd.getpwnam(user) user_uid = pw_record.pw_uid user_gid = pw_record.pw_gid user_home = pw_record.pw_dir kwargs['preexec_fn'] = preexec_fn(user_uid, user_gid, user_home) return subprocess.Popen(cmd_str(cmd), **kwargs)
def user_exists(username): """Check if a user exists""" try: pwd.getpwnam(username) user_exists = True except KeyError: user_exists = False return user_exists
def write_file(path, content, owner='root', group='root', perms=0o444): """Create or overwrite a file with the contents of a byte string.""" log("Writing file {} {}:{} {:o}".format(path, owner, group, perms)) uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid with open(path, 'wb') as target: os.fchown(target.fileno(), uid, gid) os.fchmod(target.fileno(), perms) target.write(content)
def chownr(path, owner, group, follow_links=True, chowntopdir=False): """Recursively change user and group ownership of files and directories in given path. Doesn't chown path itself by default, only its children. :param str path: The string path to start changing ownership. :param str owner: The owner string to use when looking up the uid. :param str group: The group string to use when looking up the gid. :param bool follow_links: Also Chown links if True :param bool chowntopdir: Also chown path itself if True """ uid = pwd.getpwnam(owner).pw_uid gid = grp.getgrnam(group).gr_gid if follow_links: chown = os.chown else: chown = os.lchown if chowntopdir: broken_symlink = os.path.lexists(path) and not os.path.exists(path) if not broken_symlink: chown(path, uid, gid) for root, dirs, files in os.walk(path): for name in dirs + files: full = os.path.join(root, name) broken_symlink = os.path.lexists(full) and not os.path.exists(full) if not broken_symlink: chown(full, uid, gid)
def run(): np.random.seed(42) config = ServerConfig.load(('./server.conf',)) if sys.version_info[2] >= 6: thread_pool = ThreadPoolExecutor(thread_name_prefix='query-thread-') else: thread_pool = ThreadPoolExecutor(max_workers=32) app = Application(config, thread_pool) if config.ssl_key: ssl_ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) ssl_ctx.load_cert_chain(config.ssl_chain, config.ssl_key) app.listen(config.port, ssl_options=ssl_ctx) else: app.listen(config.port) if config.user: os.setgid(grp.getgrnam(config.user)[2]) os.setuid(pwd.getpwnam(config.user)[2]) if sd: sd.notify('READY=1') tokenizer_service = TokenizerService() tokenizer_service.run() for language in config.languages: load_language(app, tokenizer_service, language, config.get_model_directory(language)) sys.stdout.flush() tornado.ioloop.IOLoop.current().start()
def _get_uid(name): """Returns an uid, given a user name.""" if getpwnam is None or name is None: return None try: result = getpwnam(name) except KeyError: result = None if result is not None: return result[2] return None
def chown(self, tarinfo, targetpath, numeric_owner): """Set owner of targetpath according to tarinfo. If numeric_owner is True, use .gid/.uid instead of .gname/.uname. If numeric_owner is False, fall back to .gid/.uid when the search based on name fails. """ if hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. g = tarinfo.gid u = tarinfo.uid if not numeric_owner: try: if grp: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: pass try: if pwd: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: pass try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) else: os.chown(targetpath, u, g) except OSError: raise ExtractError("could not change owner")