我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pwd.getpwuid()。
def getuser(): """Get the username from the environment or password database. First try various environment variables, then the password database. This works on Windows as long as USERNAME is set. """ import os for name in ('LOGNAME', 'USER', 'LNAME', 'USERNAME'): user = os.environ.get(name) if user: return user # If this fails, the exception will "explain" why import pwd return pwd.getpwuid(os.getuid())[0] # Bind the name getpass to the appropriate function
def check_environ (): """Ensure that 'os.environ' has all the environment variables we guarantee that users can use in config files, command-line options, etc. Currently this includes: HOME - user's home directory (Unix only) PLAT - description of the current platform, including hardware and OS (see 'get_platform()') """ global _environ_checked if _environ_checked: return if os.name == 'posix' and 'HOME' not in os.environ: import pwd os.environ['HOME'] = pwd.getpwuid(os.getuid())[5] if 'PLAT' not in os.environ: os.environ['PLAT'] = get_platform() _environ_checked = 1
def _find_grail_rc(self): import glob import pwd import socket import tempfile tempdir = os.path.join(tempfile.gettempdir(), ".grail-unix") user = pwd.getpwuid(os.getuid())[0] filename = os.path.join(tempdir, user + "-*") maybes = glob.glob(filename) if not maybes: return None s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) for fn in maybes: # need to PING each one until we find one that's live try: s.connect(fn) except socket.error: # no good; attempt to clean it out, but don't fail: try: os.unlink(fn) except IOError: pass else: return s
def test_make_archive_owner_group(self): # testing make_archive with owner and group, with various combinations # this works even if there's not gid/uid support if UID_GID_SUPPORT: group = grp.getgrgid(0)[0] owner = pwd.getpwuid(0)[0] else: group = owner = 'root' base_dir, root_dir, base_name = self._create_files() base_name = os.path.join(self.mkdtemp() , 'archive') res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner, group=group) self.assertTrue(os.path.exists(res)) res = make_archive(base_name, 'zip', root_dir, base_dir) self.assertTrue(os.path.exists(res)) res = make_archive(base_name, 'tar', root_dir, base_dir, owner=owner, group=group) self.assertTrue(os.path.exists(res)) res = make_archive(base_name, 'tar', root_dir, base_dir, owner='kjhkjhkjg', group='oihohoh') self.assertTrue(os.path.exists(res))
def test_tarfile_root_owner(self): tmpdir, tmpdir2, base_name = self._create_files() old_dir = os.getcwd() os.chdir(tmpdir) group = grp.getgrgid(0)[0] owner = pwd.getpwuid(0)[0] try: archive_name = make_tarball(base_name, 'dist', compress=None, owner=owner, group=group) finally: os.chdir(old_dir) # check if the compressed tarball was created self.assertTrue(os.path.exists(archive_name)) # now checks the rights archive = tarfile.open(archive_name) try: for member in archive.getmembers(): self.assertEqual(member.uid, 0) self.assertEqual(member.gid, 0) finally: archive.close()
def compat_expanduser(path): """Expand ~ and ~user constructions. If user or $HOME is unknown, do nothing.""" if not path.startswith('~'): return path i = path.find('/', 1) if i < 0: i = len(path) if i == 1: if 'HOME' not in os.environ: import pwd userhome = pwd.getpwuid(os.getuid()).pw_dir else: userhome = compat_getenv('HOME') else: import pwd try: pwent = pwd.getpwnam(path[1:i]) except KeyError: return path userhome = pwent.pw_dir userhome = userhome.rstrip('/') return (userhome + path[i:]) or '/'
def get_current_os_user(): """ Find the real user who runs the current process. Return a tuple of uid, username, homedir. :rtype: (int, str, str, int) """ user_name = os.getenv('SUDO_USER') if not user_name: user_name = os.getenv('USER') if user_name: pw = getpwnam(user_name) user_uid = pw.pw_uid else: # If cannot find the user, use ruid instead. user_uid = os.getresuid()[0] pw = getpwuid(user_uid) user_name = pw.pw_name user_gid = pw.pw_gid user_home = pw.pw_dir return user_uid, user_name, user_home, user_gid
def username(self): """The name of the user that owns the process. On UNIX this is calculated by using *real* process uid. """ if POSIX: if pwd is None: # might happen if python was installed from sources raise ImportError( "requires pwd module shipped with standard python") real_uid = self.uids().real try: return pwd.getpwuid(real_uid).pw_name except KeyError: # the uid can't be resolved by the system return str(real_uid) else: return self._proc.username()
def chown(self, cpioinfo, cpiogetpath): """Set owner of cpiogetpath according to cpioinfo. """ if PWD and hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. try: g = GRP.getgrgid(cpioinfo.gid)[2] except KeyError: g = os.getgid() try: u = PWD.getpwuid(cpioinfo.uid)[2] except KeyError: u = os.getuid() try: if cpioinfo.issym() and hasattr(os, "lchown"): os.lchown(cpiogetpath, u, g) else: if sys.platform != "os2emx": os.chown(cpiogetpath, u, g) except EnvironmentError: raise ExtractError("could not change owner")
def update_job_statuses(db, jobs): # job_names_param = ",".join("gasp_" + x.id for x in jobs) p = subprocess.Popen(["/usr/cluster/bin/sacct", "-u", pwd.getpwuid(os.getuid())[0], "--format", "jobid,state,exitcode,jobname", "--noheader", "-P", "-S", (datetime.date.today() - datetime.timedelta(days=30)).strftime("%Y-%m-%d")], stdout=subprocess.PIPE, stderr=subprocess.PIPE) squeue_out, squeue_err = p.communicate() fake_data = """29646434 PENDING 0 29646435 COMPLETED 0 """ # only keep last record for jobs that were re-run slurm_jobs_found = dict() for line in squeue_out.rstrip().split("\n"): if line: slurm_job = line.strip().split("|") # strip off "gasp_" slurm_jobs_found[slurm_job[3][5:]] = slurm_job for slurm_job in slurm_jobs_found.values(): for j in jobs: if slurm_job[3][5:] == j.id: Tracker.update_job_status(db, j, slurm_job[1], slurm_job[2]) break
def get_queue(): cols = [["job_id", "%i"], ["job_name", "%j"], ["state", "%t"], ["time", "%M"], ["reason", "%R"]] col_names = [x[0] for x in cols] col_formats = [x[1] for x in cols] cmd = ["/usr/cluster/bin/squeue", "-u", pwd.getpwuid(os.getuid())[0], "-p", "encore", "-o", "|".join(col_formats), "--noheader"] p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) squeue_out, squeue_err = p.communicate() queue = {"running": [], "queued": []} for line in squeue_out.split("\n"): values = line.split("|") if len(values) != len(col_names): continue row = dict(zip(col_names, values)) row["job_name"] = row["job_name"][5:] if row["state"]=="R": queue["running"].append(row) elif row["state"] == "PD": queue["queued"].append(row) return queue
def test_tarfile_root_owner(self): tmpdir, tmpdir2, base_name = self._create_files() old_dir = os.getcwd() os.chdir(tmpdir) group = grp.getgrgid(0)[0] owner = pwd.getpwuid(0)[0] try: archive_name = _make_tarball(base_name, 'dist', compress=None, owner=owner, group=group) finally: os.chdir(old_dir) # check if the compressed tarball was created self.assertTrue(os.path.exists(archive_name)) # now checks the rights archive = tarfile.open(archive_name) try: for member in archive.getmembers(): self.assertEqual(member.uid, 0) self.assertEqual(member.gid, 0) finally: archive.close()
def test_initgroups(self): # It takes a string and an integer; check that it raises a TypeError # for other argument lists. self.assertRaises(TypeError, posix.initgroups) self.assertRaises(TypeError, posix.initgroups, None) self.assertRaises(TypeError, posix.initgroups, 3, "foo") self.assertRaises(TypeError, posix.initgroups, "foo", 3, object()) # If a non-privileged user invokes it, it should fail with OSError # EPERM. if os.getuid() != 0: name = pwd.getpwuid(posix.getuid()).pw_name try: posix.initgroups(name, 13) except OSError as e: self.assertEqual(e.errno, errno.EPERM) else: self.fail("Expected OSError to be raised by initgroups")
def test_add_graph(self): uid = os.getuid() info = pwd.getpwuid(uid) username = info.pw_name self.manager.dispatch_message( {"type": "custom-graph-add", "interpreter": "/bin/sh", "code": "echo hi!", "username": username, "graph-id": 123}) self.assertEqual( self.store.get_graphs(), [(123, os.path.join(self.data_path, "custom-graph-scripts", "graph-123"), username)])
def test_send_message_add_stored_graph(self): """ C{send_message} send the graph with no data, to notify the server of the existence of the script, even if the script hasn't been run yet. """ uid = os.getuid() info = pwd.getpwuid(uid) username = info.pw_name self.manager.dispatch_message( {"type": "custom-graph-add", "interpreter": "/bin/sh", "code": "echo hi!", "username": username, "graph-id": 123}) self.graph_manager.exchange() self.assertMessages( self.broker_service.message_store.get_pending_messages(), [{"api": b"3.2", "data": {123: {"error": u"", "script-hash": b"e00a2f44dbc7b6710ce32af2348aec9b", "values": []}}, "timestamp": 0, "type": "custom-graph"}])
def test_send_message_check_not_present_graph(self): """C{send_message} checks the presence of the custom-graph script.""" uid = os.getuid() info = pwd.getpwuid(uid) username = info.pw_name self.manager.dispatch_message( {"type": "custom-graph-add", "interpreter": "/bin/sh", "code": "echo hi!", "username": username, "graph-id": 123}) filename = self.store.get_graph(123)[1] os.unlink(filename) self.graph_manager.exchange() self.assertMessages( self.broker_service.message_store.get_pending_messages(), [{"api": b"3.2", "data": {}, "timestamp": 0, "type": "custom-graph"}])
def test_run_not_accepted_types(self): """ If "custom-graph" is not an accepted message-type anymore, C{CustomGraphPlugin.run} shouldn't even run the graph scripts. """ self.broker_service.message_store.set_accepted_types([]) uid = os.getuid() info = pwd.getpwuid(uid) username = info.pw_name self.manager.dispatch_message( {"type": "custom-graph-add", "interpreter": "/bin/sh", "code": "echo 1.0", "username": username, "graph-id": 123}) factory = StubProcessFactory() self.graph_manager.process_factory = factory result = self.graph_manager.run() self.assertEqual(len(factory.spawns), 0) return result.addCallback(self.assertIdentical, None)
def uid_exists(uid): """Check if a uid exists""" try: pwd.getpwuid(uid) uid_exists = True except KeyError: uid_exists = False return uid_exists
def owner(path): """Returns a tuple containing the username & groupname owning the path. :param str path: the string path to retrieve the ownership :return tuple(str, str): A (username, groupname) tuple containing the name of the user and group owning the path. :raises OSError: if the specified path does not exist """ stat = os.stat(path) username = pwd.getpwuid(stat.st_uid)[0] groupname = grp.getgrgid(stat.st_gid)[0] return username, groupname
def chown(self, tarinfo, targetpath): """Set owner of targetpath according to tarinfo. """ if pwd and hasattr(os, "geteuid") and os.geteuid() == 0: # We have to be root to do so. try: g = grp.getgrnam(tarinfo.gname)[2] except KeyError: try: g = grp.getgrgid(tarinfo.gid)[2] except KeyError: g = os.getgid() try: u = pwd.getpwnam(tarinfo.uname)[2] except KeyError: try: u = pwd.getpwuid(tarinfo.uid)[2] except KeyError: u = os.getuid() try: if tarinfo.issym() and hasattr(os, "lchown"): os.lchown(targetpath, u, g) else: if sys.platform != "os2emx": os.chown(targetpath, u, g) except EnvironmentError, e: raise ExtractError("could not change owner")
def expanduser(path): """Expand ~ and ~user constructions. If user or $HOME is unknown, do nothing.""" if not path.startswith('~'): return path i = path.find('/', 1) if i < 0: i = len(path) if i == 1: if 'HOME' not in os.environ: import pwd userhome = pwd.getpwuid(os.getuid()).pw_dir else: userhome = os.environ['HOME'] else: import pwd try: pwent = pwd.getpwnam(path[1:i]) except KeyError: return path userhome = pwent.pw_dir userhome = userhome.rstrip('/') or userhome return userhome + path[i:] # Expand paths containing shell variable substitutions. # This expands the forms $variable and ${variable} only. # Non-existent variables are left unchanged.
def does_uid_exist(uid): # type: (int) -> bool ''' Returns True if the given OS user id exists, False otherwise. ''' try: pwd.getpwuid(uid) return True except KeyError: return False
def __get_username(): """ Returns the effective username of the current process. """ if sys.platform == 'win32': return getpass.getuser() import pwd return pwd.getpwuid(os.geteuid()).pw_name
def _get_userdir(user=None): """Returns the user dir or None""" if user is not None and not isinstance(user, fsnative): raise TypeError if is_win: if "HOME" in environ: path = environ["HOME"] elif "USERPROFILE" in environ: path = environ["USERPROFILE"] elif "HOMEPATH" in environ and "HOMEDRIVE" in environ: path = os.path.join(environ["HOMEDRIVE"], environ["HOMEPATH"]) else: return if user is None: return path else: return os.path.join(os.path.dirname(path), user) else: import pwd if user is None: if "HOME" in environ: return environ["HOME"] else: try: return path2fsn(pwd.getpwuid(os.getuid()).pw_dir) except KeyError: return else: try: return path2fsn(pwd.getpwnam(user).pw_dir) except KeyError: return