我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用requests.auth.HTTPBasicAuth()。
def auth(): # Basic Authentication requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass')) requests.get('https://api.github.com/user', auth=('user', 'pass')) # Digest Authentication url = 'http://httpbin.org/digest-auth/auth/user/pass' requests.get(url, auth=HTTPDigestAuth('user', 'pass')) # OAuth2 Authentication????requests-oauthlib url = 'https://api.twitter.com/1.1/account/verify_credentials.json' auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN') requests.get(url, auth=auth) pass
def __init__(self, base_url, *args, **kwargs): requests.Session.__init__(self, *args, **kwargs) self.base_url = base_url # Check for basic authentication parsed_url = urlparse(self.base_url) if parsed_url.username and parsed_url.password: netloc = parsed_url.hostname if parsed_url.port: netloc += ":%d" % parsed_url.port # remove username and password from the base_url self.base_url = urlunparse((parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment)) # configure requests to use basic auth self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None): """ :param api_url: str Moira API URL :param auth_custom: dict auth custom headers :param auth_user: str auth user :param auth_pass: str auth password :param login: str auth login """ if not api_url.endswith('/'): self.api_url = api_url + '/' else: self.api_url = api_url self.auth = None self.auth_custom = {'X-Webauth-User': login} if auth_user and auth_pass: self.auth = HTTPBasicAuth(auth_user, auth_pass) if auth_custom: self.auth_custom.update(auth_custom)
def __init__( self, *, broker_url, pact_dir='.', user=None, password=None, authentication=False ): self.broker_url = broker_url.rstrip('/') self.username = user self.password = password self.pact_dir = pact_dir self.authentication = authentication self._auth = None if self.authentication: if not (self.username and self.password): raise ValueError( 'When authentication is True, username and password ' 'must be provided.' ) else: self._auth = HTTPBasicAuth(self.username, self.password)
def test_pull_pact_authentication(mock_get, pull_pact_response): mock_get.return_value = pull_pact_response broker_client = BrokerClient( broker_url=settings.PACT_BROKER_URL, user='user', password='password', authentication=True ) broker_client.pull_pact( provider=PROVIDER, consumer=CONSUMER, ) mock_get.assert_called_once_with( EXPECTED_PULL_PACT_URL, auth=HTTPBasicAuth('user', 'password') )
def __init__(self, server_url, project="DefaultCollection", user=None, password=None, verify=False, auth_type=HTTPBasicAuth, connect_timeout=20, read_timeout=180, ): """ :param server_url: url to TFS server, e.g. https://tfs.example.com/ :param project: Collection or Collection\\Project :param user: username, or DOMAIN\\username :param password: password :param verify: True|False - verify HTTPS cert :param connect_timeout: Requests CONNECTION timeout, sec or None :param read_timeout: Requests READ timeout, sec or None """ if user is None or password is None: raise ValueError('User name and api-key must be specified!') self.rest_client = TFSHTTPClient(server_url, project=project, user=user, password=password, verify=verify, timeout=(connect_timeout, read_timeout), auth_type=auth_type, )
def _compose_args(self, method, data=None): args = {} if data is None: data = {} query = 'params' if method == 'get' else 'data' args[query] = self._structure.get('common-params', {}) args[query].update(data) args['headers'] = self._structure.get('common-headers', {}) auth = self._structure.get('auth') if auth == 'params': args[query].update({'username': self.billing_username, 'password': self.billing_password}) elif auth == 'headers': args['auth'] = HTTPBasicAuth( self.billing_username, self.billing_password) if self._structure.get('verify') == False: args['verify'] = False return args
def grant_access_token(self, code): res = self._session.post( 'https://bitbucket.org/site/oauth2/access_token', data={ 'grant_type': 'authorization_code', 'code': code, }, auth=HTTPBasicAuth(self._oauth_key, self._oauth_secret) ) try: res.raise_for_status() except requests.RequestException as reqe: error_info = res.json() raise BitbucketAPIError( res.status_code, error_info.get('error', ''), error_info.get('error_description', ''), request=reqe.request, response=reqe.response ) data = res.json() self._access_token = data['access_token'] self._refresh_token = data['refresh_token'] self._token_type = data['token_type'] return data
def refresh_token(self): """ Refresh token regardless of when it expires. Raises HTTPError on any non 2xx response code """ refresh_token = self._token['refresh_token'] params = { 'refresh_token': refresh_token, 'grant_type': 'refresh_token' } auth = HTTPBasicAuth(self.client_id, self.client_secret) response = self.session.post(self.TOKEN_URL, data=params, auth=auth) response.raise_for_status() token = response.json() token['expires_at'] = int(time.time()) + token['expires_in'] if 'refresh_token' not in token: token['refresh_token'] = refresh_token self.token = token
def jsonrpc_call(url, method, params={}, verify_ssl_cert=True, username=None, password=None): payload = {"method": method, "params": params, "jsonrpc": "2.0", "id": 0} kwargs = { "url": url, "headers": {'content-type': 'application/json'}, "data": json.dumps(payload), "verify": verify_ssl_cert, } if username and password: kwargs["auth"] = HTTPBasicAuth(username, password) global method_cumulative_calltime begin = time.time() response = requests.post(**kwargs).json() method_cumulative_calltime[method] += (time.time() - begin) if "result" not in response: raise JsonRpcCallFailed(payload, response) return response["result"]
def _handle_basic_auth_401(self, r, kwargs): if self.pos is not None: r.request.body.seek(self.pos) # Consume content and release the original connection # to allow our new request to reuse the same one. r.content r.raw.release_conn() prep = r.request.copy() if not hasattr(prep, '_cookies'): prep._cookies = cookies.RequestsCookieJar() cookies.extract_cookies_to_jar(prep._cookies, r.request, r.raw) prep.prepare_cookies(prep._cookies) self.auth = auth.HTTPBasicAuth(self.username, self.password) prep = self.auth(prep) _r = r.connection.send(prep, **kwargs) _r.history.append(r) _r.request = prep return _r
def add_strategy(self, domain, strategy): """Add a new domain and authentication strategy. :param str domain: The domain you wish to match against. For example: ``'https://api.github.com'`` :param str strategy: The authentication strategy you wish to use for that domain. For example: ``('username', 'password')`` or ``requests.HTTPDigestAuth('username', 'password')`` .. code-block:: python a = AuthHandler({}) a.add_strategy('https://api.github.com', ('username', 'password')) """ # Turn tuples into Basic Authentication objects if isinstance(strategy, tuple): strategy = HTTPBasicAuth(*strategy) key = self._key_from_url(domain) self.strategies[key] = strategy
def get_strategy_for(self, url): """Retrieve the authentication strategy for a specified URL. :param str url: The full URL you will be making a request against. For example, ``'https://api.github.com/user'`` :returns: Callable that adds authentication to a request. .. code-block:: python import requests a = AuthHandler({'example.com', ('foo', 'bar')}) strategy = a.get_strategy_for('http://example.com/example') assert isinstance(strategy, requests.auth.HTTPBasicAuth) """ key = self._key_from_url(url) return self.strategies.get(key, NullAuthStrategy())
def _auth(http_auth): if not http_auth: return None if http_auth['auth_type'] == 'basic': return HTTPBasicAuth( http_auth['username'], http_auth['password'] ) if http_auth['auth_type'] == 'digest': return HTTPDigestAuth( http_auth['username'], http_auth['password'] ) raise Exception('Authorization information is not valid.')
def test_get_podm_status_Offline_by_http_exception(self, mock_get): mock_get.side_effect = requests.ConnectionError self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) mock_get.asset_called_once_with('url', auth=auth.HTTPBasicAuth('username', 'password')) # SSL Error mock_get.side_effect = requests.exceptions.SSLError self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) self.assertEqual(mock_get.call_count, 2) # Timeout mock_get.side_effect = requests.Timeout self.assertEqual(redfish.pod_status('url', 'username', 'password'), constants.PODM_STATUS_OFFLINE) self.assertEqual(mock_get.call_count, 3)
def _upload(url, data_file, username, password): """ Upload will submit a results file via polarion ReST interface. """ url_match = '(http(s)?)\:\/\/localhost' if re.search(url_match, url): print("Please configure url settings.") exit(1) polarion_request = post(url, data=data_file, auth=auth.HTTPBasicAuth(username, password)) status_code = polarion_request.status_code if status_code == codes.ok: return status_code else: print("Results upload failed with the follow: {}".format( polarion_request.status_code)) raise exceptions.RequestException
def division_standings(): url = base_url + 'division_team_standings.json' response = requests.get((url), auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw)) data = response.json() for division in data['divisionteamstandings']['division']: for team in division['teamentry']: team_id = team['team']['ID'] rank = team['rank'] conn = sqlite3.connect('nflpool.sqlite') cur = conn.cursor() cur.execute('''INSERT INTO division_standings(team_id, rank) VALUES(?,?)''', (team_id, rank)) conn.commit() conn.close() # Get Playoff Standings for each team (need number 5 & 6 in each conference for Wild Card picks)
def playoff_standings(): url = base_url + 'playoff_team_standings.json' response = requests.get((url), auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw)) data = response.json() for conference in data['playoffteamstandings']['conference']: for team in conference['teamentry']: team_id = team['team']['ID'] rank = team['rank'] conn = sqlite3.connect('nflpool.sqlite') cur = conn.cursor() cur.execute('''INSERT INTO playoff_rankings(team_id, rank) VALUES(?,?)''', (team_id, rank)) conn.commit() conn.close() # Get individual statistics for each category
def parse_auth_argument(args): auth = args.auth if auth == 'basic': if not (args.username and args.password): print("For basic authentication, 'username' and 'password' " "parameter are needed") exit(3) auth = HTTPBasicAuth(args.username, args.password) elif auth == 'oauth': if not (args.consumer_key and args.private_key): print("For oauth authentication, 'consumer-key' " "and 'private-key' parameter are needed") exit(3) auth = get_oauth1session(args.consumer_key, args.consumer_secret, args.private_key, args.passphrase) return auth
def post_result(url, user, password, verify, data, debug): r = requests.post( url, auth=HTTPBasicAuth(user, password), verify=verify, json=data ) if debug: err("Request result: " + str(r)) if r.status_code == 403: err("HTTP 403 Forbidden - Does your bitbucket user have rights to the repo?") elif r.status_code == 401: err("HTTP 401 Unauthorized - Are your bitbucket credentials correct?") # All other errors, just dump the JSON if r.status_code != 204: # 204 is a success per Bitbucket docs err(json_pp(r.json())) return r # Stop all this from executing if we were imported, say, for testing.
def refresh(self, session=None, auth=None): """ Refreshes the token. :param session: :class:`requests_oauthlib.OAuth2Session` for refreshing token with. :param auth: :class:`requests.auth.HTTPBasicAuth` """ if self.can_refresh: if not session: session = OAuth2Session(app_settings.ESI_SSO_CLIENT_ID) if not auth: auth = HTTPBasicAuth(app_settings.ESI_SSO_CLIENT_ID, app_settings.ESI_SSO_CLIENT_SECRET) try: self.access_token = \ session.refresh_token(app_settings.ESI_TOKEN_URL, refresh_token=self.refresh_token, auth=auth)[ 'access_token'] self.created = timezone.now() self.save() except (InvalidGrantError, MissingTokenError): raise TokenInvalidError() except InvalidClientError: raise ImproperlyConfigured('Verify ESI_SSO_CLIENT_ID and ESI_SSO_CLIENT_SECRET settings.') else: raise NotRefreshableTokenError()
def whois_search(self, query='', field=''): """ Performs a WHOIS search with the given parameters :param query: the search term :type str: :param field: WHOIS field to execute the search on: domain, email, name, organization, address, phone, nameserver :type str: :returns dict using json.loads() """ log.debug('Permforming WHOIS search with query: {0} and field: ' '{1}'.format(query, field)) params = {'query': query, 'field': field} r = requests.get('https://api.passivetotal.org/v2/whois/search', auth=HTTPBasicAuth(self.username, self.apikey), proxies=self.proxies, params=params) if r.status_code == 200: results = json.loads(r.text) return results else: log.error('HTTP code {0} returned during WHOIS search ' 'request.'.format(r.status_code)) return None
def cuckoo_check_if_dup(self, sha256): """ Check if file already was analyzed by cuckoo """ try: print("Looking for tasks for: {}".format(sha256)) res = requests.get(urljoin(self.url_base, "/files/view/sha256/{}".format(sha256)), verify=False, auth=HTTPBasicAuth(self.api_user,self.api_passwd), timeout=60) if res and res.ok and res.status_code == 200: print("Sample found in Sandbox, with ID: {}".format(res.json().get("sample", {}).get("id", 0))) return True else: return False except Exception as e: print(e) return False
def postfile(self, artifact, fileName): """ Send a file to Cuckoo """ files = {"file": (fileName, open(artifact, "rb").read())} try: res = requests.post(urljoin(self.url_base, "tasks/create/file").encode("utf-8"), files=files, auth=HTTPBasicAuth( self.api_user, self.api_passwd ), verify=False) if res and res.ok: print("Cuckoo Request: {}, Task created with ID: {}".format(res.status_code, res.json()["task_id"])) else: print("Cuckoo Request failed: {}".format(res.status_code)) except Exception as e: print("Cuckoo Request failed: {}".format(e)) return
def posturl(self, scanUrl): """ Send a URL to Cuckoo """ data = {"url": scanUrl} try: res = requests.post(urljoin(self.url_base, "tasks/create/url").encode("utf-8"), data=data, auth=HTTPBasicAuth( self.api_user, self.api_passwd ), verify=False) if res and res.ok: print("Cuckoo Request: {}, Task created with ID: {}".format(res.status_code, res.json()["task_id"])) else: print("Cuckoo Request failed: {}".format(res.status_code)) except Exception as e: print("Cuckoo Request failed: {}".format(e)) return
def run(self, args, logger): response = request('POST', 'https://eu.battle.net/oauth/token', auth=HTTPBasicAuth(config.API_KEY, config.API_SECRET), params=dict(grant_type='client_credentials'), allow_redirects=False) if response.status_code != 200: logger.error("failed to get access token got %s: %s" % (response.status_code, response.content)) return 1 data = response.json() access_token = data['access_token'] logger.info("writing access_token to %s, expires in %s" % (args.filename, data['expires_in'])) with open(args.filename, 'w') as f: f.write(access_token) return 0
def ch138(): ek = 'RZ_CH138_PW' s = requests.Session() auth = HTTPBasicAuth('captcha', 'QJc9U6wxD4SFT0u') url = 'http://captcha.ringzer0team.com:7421' for _ in xrange(1001): time.sleep(0.10) r = s.get('{0}/form1.php'.format(url), auth=auth) m = re.search(r'if \(A == "([a-z0-9]*)"\)', r.text) captcha = m.group(1) r = s.get('{0}/captcha/captchabroken.php?new'.format(url), auth=auth) payload = {'captcha':captcha} r = s.post('{0}/captcha1.php'.format(url), auth=auth, data=payload) doc = lxml.html.document_fromstring(r.text) alert = doc.xpath('//div[contains(@class, "alert")]')[0] msg = alert.text_content().strip() print msg
def __init__(self, host, port, username, password, verify): """ Initializes the epoRemote with the information for the target ePO instance :param host: the hostname of the ePO to run remote commands on :param port: the port of the desired ePO :param username: the username to run the remote commands as :param password: the password for the ePO user :param verify: Whether to verify the ePO server's certificate """ logger.debug('Initializing epoRemote for ePO {} on port {} with user {}'.format(host, port, username)) self._baseurl = 'https://{}:{}/remote'.format(host, port) self._auth = HTTPBasicAuth(username, password) self._session = requests.Session() self._verify = verify
def artifactory_repo_present(data): del data['state'] headers = { "Content-Type": "application/json" } user = data['user'] password = data['password'] del data['user'] del data['password'] url = "{}/{}/{}".format(data['artifactory'], 'api/repositories', data['key']) result = requests.put(url, json.dumps(data), headers=headers, auth=HTTPBasicAuth(user, password)) if result.status_code == 200: return False, True, {"status": result.status_code} elif result.status_code == 400 and 'errors' in result.json(): for errors in result.json()['errors']: if 'key already exists' in errors['message']: return False, False, result.json() # default: something went wrong meta = {"status": result.status_code, 'response': result.json()} return True, False, meta
def artifactory_license(data): headers = { "Content-Type": "application/json" } user = data['user'] password = data['password'] del data['user'] del data['password'] url = "{}/{}".format(data['artifactory'], 'api/system/license') result = requests.post(url, json.dumps(data), headers=headers, auth=HTTPBasicAuth(user, password)) if result.status_code == 200: return False, True, result.json() # default: something went wrong meta = {"status": result.status_code, 'response': result.json()} return True, False, meta
def post(self, data): """ Post data to the associated endpoint and await the server's response. :param data: the data to be posted. :type data: str or json """ auth = None if self._username is None or self._password is None: raise ValueError("Username or Password is not set") else: auth = HTTPBasicAuth(self._username, self._password) resp = requests.post(self._endpoint, data=data, json=None, verify=self._verify_ssl, timeout=self._timeout, auth=auth) if resp.text == '': return {"code": resp.status_code, "name": resp.reason, "message": ""} return resp.text
def overwrite_comment(self, url, body): """ overwrite comment on the given api url with body """ payload = { "body": body } res = self.requests.patch( url, data = json.dumps(payload), auth = HTTPBasicAuth( self.name, self.token ) ) res.raise_for_status()
def post_comment(self, url, body): """ Add github comment using url endpoint """ payload = { "body": body } res = self.requests.post( url, data = json.dumps(payload), auth = HTTPBasicAuth( self.name, self.token ) ) res.raise_for_status()
def _get_access_token(self): """Get token and refresh_token from mailup.""" data = { "grant_type": "password", "username": self.username, "password": self.password, } response = self.post( ENDPOINT["token"], data=data, auth=HTTPBasicAuth(self.client_id, self.client_secret), ) data = response_parser(response) self.refresh_token = data["refresh_token"] return data["access_token"]
def get_host_id(name): s = requests.Session() url = tower_base_url+"hosts/" headers = { 'content-type': "application/json" } querystring = {"name": name} response = s.request("GET", url, headers=headers, params=querystring, verify=False, auth=HTTPBasicAuth(args.tower_username, args.tower_password)) results = response.json()['results'] if len(results) < 1: print("No host found with that name.") sys.exit(1) elif len(results) > 1: print("Multiple hosts found with that name, so I won't remove any of them.") sys.exit(1) else: return results[0]['id']
def add_host(host_name, inventory): s = requests.Session() url = tower_base_url+"hosts/" headers = { 'content-type': "application/json" } payload = { "name": host_name, "enabled": True, "inventory": inventory } s.request("POST", url, headers=headers, data=json.dumps(payload), verify=False, auth=HTTPBasicAuth(args.tower_username, args.tower_password))
def get_jwt_for_registry(auth_url, registry, appname): # get auth username and password from dockercfg try: cfg = auth.resolve_authconfig(auth.load_config(), registry=registry) username = cfg['username'] if 'username' in cfg else cfg['Username'] password = cfg['password'] if 'password' in cfg else cfg['Password'] # phase, phase_config = get_phase_config_from_registry(registry) # domain = phase_config.get(user_config.domain_key, '') # only use `lain.local` as service url = "%s?service=%s&scope=repository:%s:push,pull&account=%s" % ( auth_url, "lain.local", appname, username) response = requests.get(url, auth=HTTPBasicAuth(username, password)) if response.status_code < 400 and response.json()['token']: return response.json()['token'] except Exception as e: warn("can not load registry auth config : %s, need lain login first." % e) return ''
def test_oauth_token(self): """oauth_token() makes a POST to /oauth/token with the appropriate headers and query params""" uaac = UAAClient('http://example.com', 'foo', False) m = Mock() uaac._request = m uaac.oauth_token('foo', 'bar', 'baz') args, kwargs = m.call_args assert args == ('/oauth/token', 'POST') assert kwargs['params'] == { 'code': 'foo', 'grant_type': 'authorization_code', 'response_type': 'token' } assert isinstance(kwargs['auth'], HTTPBasicAuth) assert kwargs['auth'].username == 'bar' assert kwargs['auth'].password == 'baz'
def test_get_client_token(self): """_get_client_token() makes a POST to /oauth/token with the appropriate headers and query params""" uaac = UAAClient('http://example.com', 'foo', False) m = Mock() uaac._request = m uaac._get_client_token('bar', 'baz') args, kwargs = m.call_args assert args == ('/oauth/token', 'POST') assert kwargs['params'] == { 'grant_type': 'client_credentials', 'response_type': 'token' } assert isinstance(kwargs['auth'], HTTPBasicAuth) assert kwargs['auth'].username == 'bar' assert kwargs['auth'].password == 'baz'
def _get_client_token(self, client_id, client_secret): """ Returns the client credentials token Args: client_id: The oauth client id that this code was generated for client_secret: The secret for the client_id above Raises: UAAError: there was an error getting the token Returns: dict: An object representing the token """ response = self._request( '/oauth/token', 'POST', params={ 'grant_type': 'client_credentials', 'response_type': 'token' }, auth=HTTPBasicAuth(client_id, client_secret) ) return response.get('access_token', None)
def complete_pair(self, pin): # The user should have a PIN on the screen now, pass it in here to complete the pairing process payload = self._build_json_payload("actRegister", [{"clientid":self.device_id, "nickname":self.nickname}, [{"value":"no", "function":"WOL"}]]) self.auth = HTTPBasicAuth('',pin) # Going to keep this in the object, just in case we need it again later r = self.do_POST(url='/sony/accessControl', payload=payload, auth=self.auth) if r.status_code == 200: print("have paired") self.paired = True # let's call connect again to get the cookies all set up properly a,b = self.connect() if b is True: return r,True else: return r,False else: return None,False