我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用getpass.getpass()。
def user_credentials_prompt(): """ user credentials prompt """ # user message usr_msg = "Please type in your router credentials." print(usr_msg) # user credential prompt user = input('User: ') user_pw = getpass.getpass('User Password: ') enable_pw = getpass.getpass('Enable Password: ') # console formatting print('') return user, user_pw, enable_pw
def get_user_api_token(logger): """ Generate iAuditor API Token :param logger: the logger :return: API Token if authenticated else None """ username = input("iAuditor username: ") password = getpass() generate_token_url = "https://api.safetyculture.io/auth" payload = "username=" + username + "&password=" + password + "&grant_type=password" headers = { 'content-type': "application/x-www-form-urlencoded", 'cache-control': "no-cache", } response = requests.request("POST", generate_token_url, data=payload, headers=headers) if response.status_code == requests.codes.ok: return response.json()['access_token'] else: logger.error('An error occurred calling ' + generate_token_url + ': ' + str(response.json())) return None
def repo_creds(username, encrypted_password, has_pass): """Get a representation of the container repository credentials""" from azure.servicefabric.models import RegistryCredential from getpass import getpass # Wonky since we allow empty string as an encrypted passphrase if not any([username, encrypted_password is not None, has_pass]): return None if (encrypted_password is not None) and (not username): raise CLIError('Missing container repository username') if has_pass and (not username): raise CLIError('Missing container repository username') if encrypted_password is not None: return RegistryCredential(registry_user_name=username, registry_password=encrypted_password, password_encrypted=True) elif has_pass: passphrase = getpass(prompt='Container repository password: ') return RegistryCredential(registry_user_name=username, registry_password=passphrase, password_encrypted=False) return RegistryCredential(registry_user_name=username)
def __init__(self, okta_profile, verbose): home_dir = os.path.expanduser('~') okta_config = home_dir + '/.okta-aws' parser = RawConfigParser() parser.read(okta_config) profile = okta_profile if parser.has_option(profile, 'base-url'): self.base_url = "https://%s" % parser.get(profile, 'base-url') else: print("No base-url set in ~/.okta-aws") if parser.has_option(profile, 'username'): self.username = parser.get(profile, 'username') if verbose: print("Authenticating as: %s" % self.username) else: self.username = raw_input('Enter username: ') if parser.has_option(profile, 'password'): self.password = parser.get(profile, 'password') else: self.password = getpass('Enter password: ') self.verbose = verbose
def bootstrap(branch='master'): env.sudo_password = getpass('Initial value for env.sudo_password: ') env.domain_name = prompt('Enter your domain name:', default='meetup_facebook_bot') create_permanent_folder() create_log_folder() install_postgres() database_url = setup_postgres(username=env.user, database_name=env.user) renew_ini_file(database_url) install_python() fetch_sources_from_repo(branch, PROJECT_FOLDER) reinstall_venv() install_modules() install_nginx() configure_letsencrypt_if_necessary() add_nginx_reload_crontab_job() configure_nginx_if_necessary() setup_ufw() start_systemctl_service(UWSGI_SERVICE_NAME) start_systemctl_service('nginx') run_setup_scripts() status()
def keygen(args): """Sub-command to generate keys""" if not args.cert: args.cert = "data/{}.crt".format(args.signer_id.replace("@", "_at_")) for f in [args.cert, args.private_key]: check_writable(f) from pyseeder.crypto import keygen if args.no_encryption: priv_key_password = None else: from getpass import getpass priv_key_password = getpass("Set private key password: ").encode("utf-8") keygen(args.cert, args.private_key, args.signer_id, priv_key_password)
def prompt_pass(name, default=None): """ Grabs hidden (password) input from command line. :param name: prompt text :param default: default value if no input provided. """ prompt = name + (default and ' [%s]' % default or '') prompt += name.endswith('?') and ' ' or ': ' while True: rv = getpass.getpass(prompt) if rv: return rv if default is not None: return default
def popPeek(server, user, port=110): try: P = poplib.POP3(server, port) P.user(user) P.pass_(getpass.getpass()) except: print "Failed to connect to server." sys.exit(1) deleted = 0 try: l = P.list() msgcount = len(l[1]) for i in range(msgcount): msg = i+1 top = P.top(msg, 0) for line in top[1]: print line input = raw_input("D to delete, any other key to leave message on server: ") if input=="D": P.dele(msg) deleted += 1 P.quit() print "%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted) except: P.rset() P.quit() deleted = 0 print "\n%d messages deleted. %d messages left on server" % (deleted, msgcount-deleted)
def login(args): """ Logs in the specified user, prompting for password if necessary. """ if not args.password: args.password = getpass.getpass("Password: ") data = { "email": args.user, "password": args.password} if args.token: data["token"] = args.token response = sync_request( args, "POST", "sse_cfg/user", data=data) if response: return response.json() raise CliException( "Failed to login {User}".format( User=args.user))
def register(args): """ Registers the specified user, prompting twice for password if necessary. """ if not args.password: args.password = getpass.getpass("Password: ") check = getpass.getpass("Repeat: ") if args.password != check: raise CliException( "Passwords do not match") response = sync_request( args, "POST", "sse_cfg/register", data={ "email": args.user, "password": args.password}) if response: return response.json() raise CliException( "Failed to register {User}".format( User=args.user))
def _test_resource_paths(my): path = my.server.get_resource_path('admin') # not a very accurate test my.assertEquals(True, 'etc/admin.tacticrc' in path) paths = my.server.create_resource_paths() sys_login = getpass.getuser() dir = my.server.get_home_dir() is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir) if dir and is_dir_writeable: dir = "%s/.tactic/etc" % dir else: if os.name == 'nt': dir = 'C:/sthpw/etc' else: dir = '/tmp/sthpw/etc' compared = '%s/%s.tacticrc' %(dir, sys_login) in paths my.assertEquals(True, compared) # since we use admin to get resource path , my.login should also be admin my.assertEquals('admin', my.server.get_login())
def read_private(): global global_password if global_password is None: setpassword(getpass.getpass("Please enter the password to decrypt your keystore: ")) if os.path.exists('private.json'): with open('private.json','r') as f: toread = json.load(f) key = crypto.kdf(global_password,toread['salt']) try: plain = crypto.decrypt(toread['priv'],key) except ValueError: raise Exception("Invalid password for keystore") return json.loads(plain),toread['salt'] else: #file doesn't exist, just invent a salt return {'revoked_keys':[]},base64.b64encode(crypto.generate_random_key())
def init_mtls(config): logger.info("Setting up mTLS...") tls_dir = config["ca_dir"] if tls_dir[0]!='/': tls_dir = os.path.abspath('%s/%s'%(common.WORK_DIR,tls_dir)) # We need to securely pull in the ca password my_key_pw = getpass.getpass("Please enter the password to decrypt your keystore: ") ca_util.setpassword(my_key_pw) # Create HIL Server Connect certs (if not already present) if not os.path.exists("%s/%s-cert.crt"%(tls_dir,config["ip"])): logger.info("Generating new Node Monitor TLS Certs in %s for connecting"%tls_dir) ca_util.cmd_mkcert(tls_dir,config["ip"]) ca_path = "%s/cacert.crt"%(tls_dir) my_cert = "%s/%s-cert.crt"%(tls_dir,config["ip"]) my_priv_key = "%s/%s-private.pem"%(tls_dir,config["ip"]) context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) context.load_verify_locations(cafile=ca_path) context.load_cert_chain(certfile=my_cert,keyfile=my_priv_key,password=my_key_pw) context.verify_mode = ssl.CERT_REQUIRED return context
def validate_password(): passwd = getpass.getpass() print "Again" passwd2 = getpass.getpass() if (passwd != passwd2): print "Password mismatch" return validate_password() """Check against cracklib to avoid simple passwords""" try: cracklib.VeryFascistCheck (passwd) except ValueError as msg: print msg return validate_password() return passwd
def raw(prompt, *args, **kwargs): """Calls input to allow user to input an arbitrary string. User can go back by entering the `go_back` string. Works in both Python 2 and 3. """ go_back = kwargs.get('go_back', '<') type_ = kwargs.get('type', str) with stdout_redirected(sys.stderr): while True: try: if kwargs.get('secret', False): answer = getpass.getpass(prompt) elif sys.version_info < (3, 0): answer = raw_input(prompt) else: answer = input(prompt) if answer == go_back: raise QuestionnaireGoBack return type_(answer) except ValueError: eprint('\n`{}` is not a valid `{}`\n'.format(answer, type_))
def changePassPhrase(options): if not options['filename']: filename = os.path.expanduser('~/.ssh/id_rsa') options['filename'] = raw_input('Enter file in which the key is (%s): ' % filename) try: key = keys.getPrivateKeyObject(options['filename']) except keys.BadKeyError, e: if e.args[0] != 'encrypted key with no passphrase': raise else: if not options['pass']: options['pass'] = getpass.getpass('Enter old passphrase: ') key = keys.getPrivateKeyObject(options['filename'], passphrase = options['pass']) if not options['newpass']: while 1: p1 = getpass.getpass('Enter new passphrase (empty for no passphrase): ') p2 = getpass.getpass('Enter same passphrase again: ') if p1 == p2: break print 'Passphrases do not match. Try again.' options['newpass'] = p1 open(options['filename'], 'w').write( keys.makePrivateKeyString(key, passphrase=options['newpass'])) print 'Your identification has been saved with the new passphrase.'
def getGenericAnswers(self, name, instruction, prompts): responses = [] try: oldout, oldin = sys.stdout, sys.stdin sys.stdin = sys.stdout = open('/dev/tty','r+') if name: print name if instruction: print instruction for prompt, echo in prompts: if echo: responses.append(raw_input(prompt)) else: responses.append(getpass.getpass(prompt)) finally: sys.stdout,sys.stdin=oldout,oldin return defer.succeed(responses)
def defConv(items): resp = [] for i in range(len(items)): message, kind = items[i] if kind == 1: # password p = getpass.getpass(message) resp.append((p, 0)) elif kind == 2: # text p = raw_input(message) resp.append((p, 0)) elif kind in (3,4): print message resp.append(("", 0)) else: return defer.fail('foo') d = defer.succeed(resp) return d
def getCredentials(): '''prompt user for username and password''' colourPrint('bold', ('You may wish to store your credentials and server ' + 'preferences in this file by opening it in a text editor ' + 'and filling in the username, password, and server ' + 'fields.\nIf you choose not to do this, you will be ' + 'prompted for this information on each run of this script.')) colourPrint('yellow', '\nPlease enter your username for SmoothStreamsTV:') username = input('') colourPrint('green', '\nThank you, ' + username + '.\n') colourPrint('yellow', '\nPlease enter your password for SmoothStreamsTV:') password = getpass('') return username, password # end getCredentials()
def ask(message, ofile=sys.stderr, ifile=sys.stdin, style=Fore.MAGENTA, noecho=False, accept_empty=True): """ Print a question on *ofile* and wait for an answer on *ifile* using :py:meth:`io.TextIOBase.readline`. *style* may be ``None`` for non-colored output (this does not override the behavior setup by :py:func:`enable_colors`). """ if noecho and ifile != sys.stdin: raise ValueError("noecho option implies input from stdin") while True: with ScopedColoredStream(ofile, style, flush_on_exit=True) as stream: stream.write(message) if noecho: ans = getpass.getpass(prompt="", stream=ofile) else: ans = ifile.readline().rstrip("\n\r") if not accept_empty and not ans.strip(): continue return ans
def save_new_credentials(self): username = None password = None print("\nPlease input your NordVPN credentials:") while not username and not password: username = input("Username/Email: ") password = getpass.getpass("Password: ") if not self.config.has_section(self.SECTION_TITLE): self.config.add_section(self.SECTION_TITLE) self.config.set(self.SECTION_TITLE, 'username', username) self.config.set(self.SECTION_TITLE, 'password', password) self.save() self.logger.info("New credentials saved successfully!")
def adduser(): """add user""" from getpass import getpass username = raw_input("\_username: ") email = raw_input("\_email: ") role_id = raw_input("\_[1:moderator 2:admin 3:user]: ") password = getpass("\_password: ") u = User( email = email, username = username, password = password, role_id = role_id ) db.session.add(u) db.session.commit() print "<user %s add in database>" % username
def deploy(self, cwd=False): assert not self.is_deployed, 'This contract already exists on the chain.' assert self.sol, 'No solidity code loaded into this object' response = database.insert_contract(self.name, self.abi, self.bytecode, self.gas_estimates, self.method_identifiers, cwd) okay = web3.personal.Personal(self.web3) options = 'Unlock: \n' + '\n'.join([' '.join([str(i),':',x]) for i, x in enumerate(okay.listAccounts)]) + '\n' self.defaultAccount = okay.listAccounts[int(input(options))] result = okay.unlockAccount(self.defaultAccount, getpass.getpass('\nPassword:'), 5000) if result: self.address = self.web3.eth.sendTransaction(transaction={'data' : '0x' + self.bytecode, 'from': self.defaultAccount, 'gaslimit': 30000}) self.instance = self.web3.eth.contract(self.address) else: raise Exepction('unable to unlock account') #update the deployed and address to the db and an instance for pulling and interacting with the contract again return update_contract(json.dumps(self.address), self.method_identifiers, self.name)
def zoomeye_api_test(): zoomeye = ZoomEye() zoomeye.username = raw_input('ZoomEye Username: ') zoomeye.password = getpass.getpass(prompt='ZoomEye Password: ') zoomeye.login() print(zoomeye.resources_info()) data = zoomeye.dork_search('solr') show_site_ip(data) data = zoomeye.dork_search('country:cn') show_site_ip(data) data = zoomeye.dork_search('solr country:cn') show_site_ip(data) data = zoomeye.dork_search('solr country:cn', resource='host') show_ip_port(data)
def siteSign(self, address, privatekey=None, inner_path="content.json", publish=False): from Site import Site logging.info("Signing site: %s..." % address) site = Site(address, allow_create=False) if not privatekey: # If no privatekey definied from User import UserManager user = UserManager.user_manager.get() if user: site_data = user.getSiteData(address) privatekey = site_data.get("privatekey") else: privatekey = None if not privatekey: # Not found in users.json, ask from console import getpass privatekey = getpass.getpass("Private key (input hidden):") succ = site.content_manager.sign(inner_path=inner_path, privatekey=privatekey, update_changed_files=True) if succ and publish: self.sitePublish(address, inner_path=inner_path)
def _configure_github(github): """Determine and return the GitHub configuration. Args: github (dict): The current GitHub configuration. Returns: dict: The new GitHub configuration. """ answer = copy(github) logger.info('Since you intend to publish to GitHub, you need to ' 'supply credentials.') logger.info('Create an access token at: ' 'https://github.com/settings/tokens') logger.info('It needs the "repo" scope and nothing else.') while not answer.get('username'): answer['username'] = six.moves.input('GitHub username: ') while not answer.get('token'): answer['token'] = getpass.getpass('GitHub token (input is hidden): ') return answer
def interactively_acquire_token(self) -> str: """ Walks the user through executing a login into the Dualis-System to get the Token and saves it. @return: The Token for Dualis. """ print('[The following Input is not saved, it is only used temporarily to generate a login token.]') token = None while token is None: dualis_username = input('Username for Dualis: ') dualis_password = getpass('Password for Dualis [no output]: ') try: token = login_helper.obtain_login_token(dualis_username, dualis_password) except RequestRejectedError as error: print('Login Failed! (%s) Please try again.' % (error)) except (ValueError, RuntimeError) as error: print('Error while communicating with the Dualis System! (%s) Please try again.' % (error)) self.config_helper.set_property('token', token) return token
def get_password(max_password_prompts=3): """Read password from TTY.""" verify = strutils.bool_from_string(env("OS_VERIFY_PASSWORD")) pw = None if hasattr(sys.stdin, "isatty") and sys.stdin.isatty(): # Check for Ctrl-D try: for __ in moves.range(max_password_prompts): pw1 = getpass.getpass("OS Password: ") if verify: pw2 = getpass.getpass("Please verify: ") else: pw2 = pw1 if pw1 == pw2 and pw1: pw = pw1 break except EOFError: pass return pw
def request_secret(secret_key, message, hidden=True): """ Request a secret from the user. Save the secrets to disk afterwards. If there is already a secret for the key, return it. :param secret_key: the key the input should be stored under in the secrets :param message: the message to show to the user :param hidden: hide the input (recommended for passwords and such) :return the secret """ if secret_key in _secrets: return _secrets[secret_key] if hidden: secret = getpass(message).strip() else: secret = input(message).strip() _secrets[secret_key] = secret save_secrets() return secret
def rawInput(message, isPass=False): if isPass: data = getpass.getpass(message) else: data = raw_input(message) return tools.stdinEncode(data)
def main(args): public_key = fetch_public_key(args.repo) password = args.password or getpass('PyPI password: ') update_travis_deploy_password(encrypt(public_key, password.encode())) print("Wrote encrypted password to .travis.yml -- you're ready to deploy")
def login(self): db = Database() valid_creds = False attempts = 1 while not valid_creds: username = raw_input('\n\tUsername: ') password = getpass('\tPassword: ') sql = 'SELECT password FROM users WHERE username = %s' password_hash = db.get_query(sql, [username]) if attempts > 4: gc.error('\nDisconnecting after 5 failed login attempts.') exit() elif len(password_hash) == 0 or not gc.check_hash(password, password_hash[0][0]): time.sleep(2) gc.error('\nInvalid credentials. Please try again.') attempts += 1 else: gc.success('\nCredentials validated. Logging in...') self.name = username self.lookup() valid_creds = True # close database db.close()
def handle_401(self, resp, **kwargs): # We only care about 401 responses, anything else we want to just # pass through the actual response if resp.status_code != 401: return resp # We are not able to prompt the user so simply return the response if not self.prompting: return resp parsed = urllib_parse.urlparse(resp.url) # Prompt the user for a new username and password username = six.moves.input("User for %s: " % parsed.netloc) password = getpass.getpass("Password: ") # Store the new username and password to use for future requests if username or password: self.passwords[parsed.netloc] = (username, password) # Consume content and release the original connection to allow our new # request to reuse the same one. resp.content resp.raw.release_conn() # Add our new username and password to the request req = HTTPBasicAuth(username or "", password or "")(resp.request) # Send our new request new_resp = resp.connection.send(req, **kwargs) new_resp.history.append(resp) return new_resp
def finalize_options(self): orig.upload.finalize_options(self) self.username = ( self.username or getpass.getuser() ) # Attempt to obtain password. Short circuit evaluation at the first # sign of success. self.password = ( self.password or self._load_password_from_keyring() or self._prompt_for_password() )
def _prompt_for_password(self): """ Prompt for a password on the tty. Suppress Exceptions. """ try: return getpass.getpass() except (Exception, KeyboardInterrupt): pass
def main(): # - Get command line args and config args. args = get_args() (filters, cache_path, cache_ttl, vcenter_host, vsphere_user, vsphere_pass, vsphere_port, cert_check) \ = parse_config() # - Override settings with arg parameters if defined if not args.password: if not vsphere_pass: import getpass vsphere_pass = getpass.getpass() setattr(args, 'password', vsphere_pass) if not args.username: setattr(args, 'username', vsphere_user) if not args.hostname: setattr(args, 'hostname', vcenter_host) if not args.port: setattr(args, 'port', vsphere_port) if not args.no_cert_check: setattr(args, 'cert_check', cert_check) else: setattr(args, 'cert_check', False) # - Perform requested operations (list, host/guest, reload cache) if args.host or args.guest: print ('{}') exit(0) elif args.list or args.reload_cache: v = VSphere(args.hostname, args.username, args.password, vsphere_port=443, cert_check=args.cert_check) data = v.cached_inventory(filters, cache_path=cache_path, cache_ttl=cache_ttl, refresh=args.reload_cache) print ("{}".format(dumps(data))) exit(0)
def handle(self, *args, **options): if options['source_db_password'] is None: options['source_db_password'] = getpass('Database Password: ') instance = Controller.prepare(options['pool']).instance converter = get_converter( options['source_format'], instance=instance, db_name=options['source_db_name'], db_type=options['source_db_type'], db_user=options['source_db_username'], db_port=options['source_db_port'], db_password=options['source_db_password'], db_host=options['source_db_host'], prefix=options['source_db_prefix'] ) instance.loop.run_until_complete(self.convert(instance, converter))
def __init__(self, session_user_id, user_phone, api_id, api_hash, proxy=None): print_title('Initialization') print('Initializing interactive example...') super().__init__(session_user_id, api_id, api_hash, proxy) # Store all the found media in memory here, # so it can be downloaded if the user wants self.found_media = set() print('Connecting to Telegram servers...') if not self.connect(): print('Initial connection failed. Retrying...') if not self.connect(): print('Could not connect to Telegram servers.') return # Then, ensure we're authorized and have access if not self.is_user_authorized(): print('First run. Sending code request...') self.send_code_request(user_phone) self_user = None while self_user is None: code = input('Enter the code you just received: ') try: self_user = self.sign_in(user_phone, code) # Two-step verification may be enabled except SessionPasswordNeededError: pw = getpass('Two step verification is enabled. ' 'Please enter your password: ') self_user = self.sign_in(password=pw)
def __call__(self, parser, namespace, values, option_string): if values is None: values = getpass.getpass() setattr(namespace, self.dest, values)
def prompt_user_passwd(self, host, realm): """Override this in a GUI environment!""" import getpass try: user = raw_input("Enter username for %s at %s: " % (realm, host)) passwd = getpass.getpass("Enter password for %s in %s at %s: " % (user, realm, host)) return user, passwd except KeyboardInterrupt: print return None, None # Utility functions
def get_api_key(cb, parser, args): if not args.password: password = getpass.getpass("Password for {0}: ".format(args.username)) else: password = args.password print("API token for user {0}: {1}".format(args.username, get_api_token(cb.credentials.url, args.username, password, verify=cb.credentials.ssl_verify)))
def deploy(branch='master'): update_dependencies = confirm('Update dependencies?') print('OK, deploying branch %s' % branch) env.sudo_password = getpass('Initial value for env.sudo_password: ') fetch_sources_from_repo(branch, PROJECT_FOLDER) if update_dependencies: reinstall_venv() install_modules() start_systemctl_service(UWSGI_SERVICE_NAME) start_systemctl_service('nginx') status()
def status(): if env.sudo_password is None: env.sudo_password = getpass('Initial value for env.sudo_password: ') sudo('systemctl status %s' % UWSGI_SERVICE_NAME)