我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用getpass.getuser()。
def __init__(self, additional_compose_file=None, additional_services=None): # To resolve docker client server version mismatch issue. os.environ["COMPOSE_API_VERSION"] = "auto" dir_name = os.path.split(os.getcwd())[-1] self.project = "{}{}".format( re.sub(r'[^a-z0-9]', '', dir_name.lower()), getpass.getuser() ) self.additional_compose_file = additional_compose_file self.services = ["zookeeper", "schematizer", "kafka"] if additional_services is not None: self.services.extend(additional_services) # This variable is meant to capture the running/not-running state of # the dependent testing containers when tests start running. The idea # is, we'll only start and stop containers if they aren't already # running. If they are running, we'll just use the ones that exist. # It takes a while to start all the containers, so when running lots of # tests, it's best to start them out-of-band and leave them up for the # duration of the session. self.containers_already_running = self._are_containers_already_running()
def safeInstall(): FACTORIOPATH = getFactorioPath() try: if not os.path.isdir("%s" % (FACTORIOPATH) ): if os.access("%s/.." % (FACTORIOPATH), os.W_OK): os.mkdir(FACTORIOPATH, 0o777) else: subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH]) subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH]) os.mkdir(os.path.join(FACTORIOPATH, "saves")) os.mkdir(os.path.join(FACTORIOPATH, "config")) with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc: lines = bashrc.read() if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1: bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") print("You'll want to restart your shell for command autocompletion. Tab is your friend.") updateFactorio() except IOError as e: print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e)) sys.exit(1)
def _test_resource_paths(my): path = my.server.get_resource_path('admin') # not a very accurate test my.assertEquals(True, 'etc/admin.tacticrc' in path) paths = my.server.create_resource_paths() sys_login = getpass.getuser() dir = my.server.get_home_dir() is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir) if dir and is_dir_writeable: dir = "%s/.tactic/etc" % dir else: if os.name == 'nt': dir = 'C:/sthpw/etc' else: dir = '/tmp/sthpw/etc' compared = '%s/%s.tacticrc' %(dir, sys_login) in paths my.assertEquals(True, compared) # since we use admin to get resource path , my.login should also be admin my.assertEquals('admin', my.server.get_login())
def __init__(self, this_filename, generation=1): # Constructor self.plant_id = str(uuid.uuid4()) self.life_stages = (3600*24, (3600*24)*3, (3600*24)*10, (3600*24)*20, (3600*24)*30) # self.life_stages = (5, 10, 15, 20, 25) self.stage = 0 self.mutation = 0 self.species = random.randint(0,len(self.species_list)-1) self.color = random.randint(0,len(self.color_list)-1) self.rarity = self.rarity_check() self.ticks = 0 self.age_formatted = "0" self.generation = generation self.dead = False self.write_lock = False self.owner = getpass.getuser() self.file_name = this_filename self.start_time = int(time.time()) self.last_time = int(time.time()) # must water plant first day self.watered_timestamp = int(time.time())-(24*3600)-1 self.watered_24h = False
def main(domain, django_version, python_version, nuke): if domain == 'your-username.pythonanywhere.com': username = getpass.getuser().lower() domain = f'{username}.pythonanywhere.com' project = DjangoProject(domain) project.sanity_checks(nuke=nuke) project.create_virtualenv(python_version, django_version, nuke=nuke) project.run_startproject(nuke=nuke) project.find_django_files() project.update_settings_file() project.run_collectstatic() project.create_webapp(nuke=nuke) project.add_static_file_mappings() project.update_wsgi_file() project.webapp.reload() print(snakesay(f'All done! Your site is now live at https://{domain}'))
def main(repo_url, domain, python_version, nuke): if domain == 'your-username.pythonanywhere.com': username = getpass.getuser().lower() domain = f'{username}.pythonanywhere.com' project = DjangoProject(domain) project.sanity_checks(nuke=nuke) project.create_webapp(nuke=nuke) project.add_static_file_mappings() project.create_virtualenv(python_version, nuke=nuke) project.download_repo(repo_url, nuke=nuke), project.find_django_files() project.update_wsgi_file() project.update_settings_file() project.run_collectstatic() project.run_migrate() project.webapp.reload() print(snakesay(f'All done! Your site is now live at https://{domain}'))
def sanity_checks(self, nuke): print(snakesay('Running API sanity checks')) token = os.environ.get('API_TOKEN') if not token: raise SanityException(dedent( ''' Could not find your API token. You may need to create it on the Accounts page? You will also need to close this console and open a new one once you've done that. ''' )) if nuke: return url = API_ENDPOINT.format(username=getpass.getuser()) + self.domain + '/' response = call_api(url, 'get') if response.status_code == 200: raise SanityException(f'You already have a webapp for {self.domain}.\n\nUse the --nuke option if you want to replace it.')
def virtualenvs_folder(): actual_virtualenvs = Path(f'/home/{getuser()}/.virtualenvs') old_virtualenvs = set(Path(actual_virtualenvs).iterdir()) tempdir = _get_temp_dir() old_workon = os.environ.get('WORKON_HOME') os.environ['WORKON_HOME'] = str(tempdir) yield tempdir if old_workon: os.environ['WORKON_HOME'] = old_workon shutil.rmtree(tempdir) new_envs = set(actual_virtualenvs.iterdir()) - set(old_virtualenvs) if new_envs: raise Exception('virtualenvs path mocking failed somewehere: {}, {}'.format( new_envs, tempdir ))
def get_system_username(): """ Try to determine the current system user's username. :returns: The username as a unicode string, or an empty string if the username could not be determined. """ try: result = getpass.getuser() except (ImportError, KeyError): # KeyError will be raised by os.getpwuid() (called by getuser()) # if there is no corresponding entry in the /etc/passwd file # (a very restricted chroot environment, for example). return '' if six.PY2: try: result = result.decode(DEFAULT_LOCALE_ENCODING) except UnicodeDecodeError: # UnicodeDecodeError - preventive treatment for non-latin Windows. return '' return result
def doConnect(options): # log.deferr = handleError # HACK if '@' in options['host']: options['user'], options['host'] = options['host'].split('@',1) host = options['host'] if not options['user']: options['user'] = getpass.getuser() if not options['port']: options['port'] = 22 else: options['port'] = int(options['port']) host = options['host'] port = options['port'] conn = SSHConnection() conn.options = options vhk = default.verifyHostKey uao = default.SSHUserAuthClient(options['user'], options, conn) connect.connect(host, port, options, vhk, uao).addErrback(_ebExit)
def doConnect(): # log.deferr = handleError # HACK if '@' in options['host']: options['user'], options['host'] = options['host'].split('@',1) if not options.identitys: options.identitys = ['~/.ssh/id_rsa', '~/.ssh/id_dsa'] host = options['host'] if not options['user']: options['user'] = getpass.getuser() if not options['port']: options['port'] = 22 else: options['port'] = int(options['port']) host = options['host'] port = options['port'] vhk = default.verifyHostKey uao = default.SSHUserAuthClient(options['user'], options, SSHConnection()) connect.connect(host, port, options, vhk, uao).addErrback(_ebExit)
def create_deployment_manifest(method): """Returns a dict describing the current deployable.""" service_info = get_service_info() commit = get_commit() version = get_git_version() info = { 'deployable': service_info['name'], 'method': method, 'username': getpass.getuser(), 'datetime': datetime.utcnow().isoformat(), 'branch': get_branch(), 'commit': commit, 'commit_url': get_repo_url() + "/commit/" + commit, 'release': version['tag'] if version else 'untagged-branch' } return info
def post_message(message): if not SLACKBOT_TOKEN: print "Slackbot key not found, cannot notify slack of '%s'" % message print "Set the environment variable DRIFT_SLACKBOT_KEY to remedy." return final_message = "{}: {}".format(getpass.getuser(), message) try: from slacker import Slacker except ImportError: print "Message '{}' not posted to slack".format(message) print "Slack integration disabled. Enable slack with 'pip install slacker'" try: slack = Slacker(SLACKBOT_TOKEN) slack.chat.post_message(NOTIFICATION_CHANNEL, final_message) except Exception as e: log.warning("Cannot post '%s' to Slack: %s", final_message, e)
def _build_fixture_file_name(self): def commit_author_date(commit): cmd_output = subprocess.check_output(['git', 'show', '-s', '--format=%ai', head_commit]).strip() cmd_output = re.sub(r'\s*\+.*', '', cmd_output) cmd_output = re.sub(r'\s+', 'T', cmd_output) return cmd_output head_commit = subprocess.check_output(['git', 'rev-parse', '--short', 'HEAD']).strip() format_args = { 'commit_date': commit_author_date(head_commit), 'commit': head_commit, 'run_date': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S'), 'user': getpass.getuser(), } return '%(commit_date)s+%(commit)s+%(run_date)s+%(user)s.zip' % format_args
def register(self): '''Register action.''' ftrack.EVENT_HUB.subscribe( 'topic=ftrack.action.discover and source.user.username={0}'.format( getpass.getuser() ), self.discover ) ftrack.EVENT_HUB.subscribe( 'topic=ftrack.action.launch and source.user.username={0} ' 'and data.actionIdentifier={1}'.format( getpass.getuser(), self.identifier ), self.launch )
def register(self): '''Register discover actions on logged in user.''' ftrack.EVENT_HUB.subscribe( 'topic=ftrack.action.discover and source.user.username={0}'.format( getpass.getuser() ), self.discover ) ftrack.EVENT_HUB.subscribe( 'topic=ftrack.action.launch and source.user.username={0} ' 'and data.actionIdentifier={1}'.format( getpass.getuser(), self.identifier ), self.launch )
def __init__(self): self._pipelineName_by_jobId = {} self._cmdStr_by_jobId = {} self._resources_by_jobId = {} self._job_outdir_by_jobId = {} self._localStatus_by_jobId = {} self._running_processes_by_jobId = {} self._filehandles_by_jobId = {} try: self._user = getpass.getuser() except KeyError: self._user = "unknown" self._firehose_mode = self._user.startswith('cgaadm') (cpu_count, mem_gb) = self._get_host_resources() self._numcores = cpu_count self._totmem = mem_gb print ('Local host has ' + str(cpu_count) + ' cores and ' + str(mem_gb) + 'GB.')
def logBasicSettings(): # record basic user inputs and settings to log file for future purposes import getpass, time f = open(textFilePath,'a+') f.write("\n################################################################################################################\n") f.write("Executing \"Export SSURGO Shapefiles\" tool\n") f.write("User Name: " + getpass.getuser() + "\n") f.write("Date Executed: " + time.ctime() + "\n") f.write("User Parameters:\n") f.write("\tFile Geodatabase Feature Dataset: " + inLoc + "\n") f.write("\tExport Folder: " + outLoc + "\n") #f.write("\tArea of Interest: " + AOI + "\n") f.close del f ## ===================================================================================
def create_config(project_dir, config_file): identifier = get_input('Project idenitifer', default=project_dir) name = get_input('Project name', default=project_dir) desc = get_input('Project description', default='') author = get_input('Your name', default=getpass.getuser()) love_version = get_input('LOVE version to use', default=LATEST_LOVE_VERSION) new_config = Configuration({ 'id': identifier, 'name': name, 'description': desc, 'author': author, 'loveVersion': love_version, 'targets': [get_current_platform()] }) with open(config_file, 'w') as f: f.write(yaml.dump(new_config.as_dict(), default_flow_style=False))
def saveCurrent(self): self._seq.data[self.curFrame] = self._seq.data[self.curFrame]._replace(gtorig=self.curData) if hasattr(self._seq.data[self.curFrame], 'extraData'): ed = {'pb': {'pbp': self.curPbP, 'pb': self.curPb}, 'vis': self.curVis} self._seq.data[self.curFrame] = self._seq.data[self.curFrame]._replace(extraData=ed) if self.filename_joints is not None and self.filename_pb is not None and self.filename_vis is not None: self.importer.saveSequenceAnnotations(self._seq, {'joints': self.filename_joints, 'vis': self.filename_vis, 'pb': self.filename_pb}) # save log file if self.file_log is not None: self.file_log.write('{}, {}, {}@{}, {}\n'.format(self._seq.data[self.curFrame].fileName, time.strftime("%d.%m.%Y %H:%M:%S +0000", time.gmtime()), getpass.getuser(), os.uname()[1], time.time() - self.start_time)) self.file_log.flush()
def post(content, token=None, username=None, public=False, debug=False): ''' Post a gist on GitHub. ''' random = hashlib.sha1(os.urandom(16)).hexdigest() username = getuser() if username is None else username now = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S") description = ('{hash} (twitter-message-bus); from {host} by {user} ' 'at {time} UTC.').format(host=getfqdn(), user=username, time=now, hash=random) payload = json.dumps({ 'files': { 'message': { 'content': content if content is not None else '' } }, 'public': public, 'description': description }) response = github(http='post', uri='gists', token=token, payload=payload, debug=debug) return (response['id'], random) if 'id' in response else (None, None)
def client_name(self, client_name=None): """ Generate a valid :class:`ClientNameRequest`. Parameters ---------- client_name : string, optional defaults to output of ``getpass.getuser()`` Returns ------- ClientNameRequest """ if client_name is None: client_name = getpass.getuser() command = ClientNameRequest(client_name) return command
def __validate_configuration(self): """ Validates that all essential keys are present in configuration, if not creates them with default values """ was_valid = True # If any value is not present indicates need to save configuration to file if "Application" not in self.config: self.config["Application"] = {} was_valid = False if "cacheDir" not in self.config['Application']: self.config["Application"]["cacheDir"] = "/tmp/easy-ebook-viewer-cache-" + getpass.getuser() + "/" was_valid = False if "javascript" not in self.config['Application']: self.config["Application"]["javascript"] = "False" was_valid = False if "caret" not in self.config['Application']: self.config["Application"]["caret"] = "False" was_valid = False if "stylesheet" not in self.config['Application']: self.config["Application"]["stylesheet"] = "Day" was_valid = False if not was_valid: # Something changed? self.save_configuration()
def kill_using_shell(logger, pid, signal=signal.SIGTERM): try: process = psutil.Process(pid) # Use sudo only when necessary - consider SubDagOperator and SequentialExecutor case. if process.username() != getpass.getuser(): args = ["sudo", "kill", "-{}".format(int(signal)), str(pid)] else: args = ["kill", "-{}".format(int(signal)), str(pid)] # PID may not exist and return a non-zero error code logger.error(subprocess.check_output(args, close_fds=True)) logger.info("Killed process {} with signal {}".format(pid, signal)) return True except psutil.NoSuchProcess as e: logger.warning("Process {} no longer exists".format(pid)) return False except subprocess.CalledProcessError as e: logger.warning("Failed to kill process {} with signal {}. Output: {}" .format(pid, signal, e.output)) return False
def __init__(self, task, execution_date, state=None): self.dag_id = task.dag_id self.task_id = task.task_id self.execution_date = execution_date self.task = task self.queue = task.queue self.pool = task.pool self.priority_weight = task.priority_weight_total self.try_number = 0 self.max_tries = self.task.retries self.unixname = getpass.getuser() self.run_as_user = task.run_as_user if state: self.state = state self.hostname = '' self.init_on_load() self._log = logging.getLogger("airflow.task")
def ls(path): """List directory contents""" cl = Client() dir_ls = cl.list_dir(path) stat = dir_ls['status'] if stat == Status.ok: for item, info in dir_ls['items'].items(): fr = "-rw-r--r--" if info['type'] == NodeType.directory: fr = "drwxr-xr-x" date = datetime.datetime.fromtimestamp(info['date']) date_format = '%b %d %H:%M' if date.year == datetime.datetime.today().year else '%b %d %Y' print('%.11s %.10s %6sB %.15s %s' % (fr, getpass.getuser(), info['size'], datetime.datetime.fromtimestamp(info['date']).strftime(date_format), item)) else: print(Status.description(stat))
def _get_build_prefix(): """ Returns a safe build_prefix """ path = os.path.join(tempfile.gettempdir(), 'pip-build-%s' % \ getpass.getuser()) if sys.platform == 'win32': """ on windows(tested on 7) temp dirs are isolated """ return path try: os.mkdir(path) except OSError: file_uid = None try: fd = os.open(path, os.O_RDONLY | os.O_NOFOLLOW) file_uid = os.fstat(fd).st_uid os.close(fd) except OSError: file_uid = None if file_uid != os.getuid(): msg = "The temporary folder for building (%s) is not owned by your user!" \ % path print (msg) print("pip will not work until the temporary folder is " + \ "either deleted or owned by your user account.") raise pip.exceptions.InstallationError(msg) return path
def __init__( self, host = '127.0.0.1', port = 3128, verbose = True ): if getuser() != 'root': raise UserWarning( 'Must be run as root' ) self.verbose = verbose self.port = int( port ) self.port = 3128 if not port else port self.host = host self.http = str( host ) + ':' + str( self.port ) if not str( host ).endswith( '/' ) else str( host[ :-1 ] ) + ':' + str( self.port ) super( self.__class__, self ).__init__()
def get_sudoer( self ): if 'SUDO_USER' in os.environ: return os.environ[ 'SUDO_USER' ] else: return getuser()
def finalize_options(self): orig.upload.finalize_options(self) self.username = ( self.username or getpass.getuser() ) # Attempt to obtain password. Short circuit evaluation at the first # sign of success. self.password = ( self.password or self._load_password_from_keyring() or self._prompt_for_password() )
def __init__(self, cmd_str, uuid): self.cmd = cmd_str self.user = getpass.getuser() self.timestamp = datetime.datetime.now() self.uuid = uuid