我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用requests.auth()。
def getESCXBalance(address): try: payload = { "method": "get_balances", "params": { "filters":[{"field": "address", "op": "==", "value": address}, {"field": "asset", "op": "==", "value": "ESCX"}], "filterop": "and" }, "jsonrpc":"2.0", "id":0 } response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth) json_data = response.json() #quantity = json_data.quantity return (json_data['result'].pop()['quantity']) / 100000000 except: return 0;
def auth(): # Basic Authentication requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass')) requests.get('https://api.github.com/user', auth=('user', 'pass')) # Digest Authentication url = 'http://httpbin.org/digest-auth/auth/user/pass' requests.get(url, auth=HTTPDigestAuth('user', 'pass')) # OAuth2 Authentication????requests-oauthlib url = 'https://api.twitter.com/1.1/account/verify_credentials.json' auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN') requests.get(url, auth=auth) pass
def retrieve_access_token(self): """ once you have the authorization code, you can call this function to get the access_token. The access_token gives you full access to the API and is valid throughout the day """ if self.api_key is None: raise (TypeError, 'Value api_key cannot be None. Please go to the Developer Console to get this value') if self.redirect_uri is None: raise (TypeError, 'Value redirect_uri cannot be None. Please go to the Developer Console to get this value') if self.api_secret is None: raise (TypeError, 'Value api_secret cannot be None. Please go to the Developer Console to get this value') if self.code is None: raise (TypeError, 'Value code cannot be None. Please visit the login URL to generate a code') params = {'code': self.code, 'redirect_uri': self.redirect_uri, 'grant_type': 'authorization_code'} url = self.config['host'] + self.config['routes']['accessToken'] headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key} r = requests.post(url, auth=(self.api_key, self.api_secret), data=json.dumps(params), headers=headers) body = json.loads(r.text) if 'access_token' not in body: raise SystemError(body); return body['access_token']
def get_state(self) -> str: """Get device state. Returns: "on", "off", or "unknown" """ if self.state_cmd is None: return "unknown" resp = requests.request(self.state_method, self.state_cmd, data=self.state_data, json=self.state_json, headers=self.headers, auth=self.auth) if self.state_response_off in resp.text: return "off" elif self.state_response_on in resp.text: return "on" return "unknown"
def _ghost2loggergetGUID(self): # Let's deal with the JSON API calls here HEADERS = {"Content-Type": 'application/json'} _URL = self._ghost_url + "/v1/guid" try: r = requests.get(_URL, headers=HEADERS, verify=False, auth=(self._username, self._password)) _data = r.json() if _data['host'] == "GUID": for _item in _data['pattern']: _guid = str(_item) return str(_guid) else: return "" except Exception as e: self._logger.info('[Ghost2logger]: Cannot get GUID: ' + str(e))
def _ghost2loggergpurge(self): # Let's deal with the JSON API calls here self._logger.info('[Ghost2logger]: Purge Ghost2logger') HEADERS = {"Content-Type": 'application/json'} _URL = self._ghost_url + "/v1/all" try: r = requests.delete(_URL, headers=HEADERS, verify=False, auth=(self._username, self._password)) _data = r.json() self._logger.info('[Ghost2logger]: Purged ghost2logger' + str(_data)) except Exception as e: self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' + str(e))
def __getAppSpaces(self): """Returns a list with the name of availables appspaces on FileNet, to apps variable. """ try: appspaces = requests.get(self.baseurl+'appspacenames', auth=self.cred) appspaces.raise_for_status() self.appspaces = appspaces.json() self.apps = appspaces.json().keys() except Exception as e: print (str(e)+':\n'+str(appspaces.json()['UserMessage']['Text'])) print (appspaces.json()) self.apps = appspaces.json()
def __getQueues(self): """Creates a list with URL adresses from workbaskets. Also creates a dictionary with workbasket name as key and it's URL as value. """ for apps in self.roles.keys(): for roles in self.roles.values(): if roles: for role in roles: my_role = requests.get(self.baseurl +'appspaces/' +apps+'/roles/' +role, auth = self.cred) if my_role.ok: for uri in my_role.json()['workbaskets'].values(): self.queue_urls.append(uri['URI']) self.workbaskets[uri['URI'].split( '/')[-1]] = uri['URI']
def getQueue(self, work_basket): """Returns a Queue for a given Workbasket. Usage: >>> my_queue = pe.getQueue('workbasket_name') >>> my_queue.get('count')->Variable with the total tasks in this Queue. """ queue = requests.get(self.client.baseurl + self.client.workbaskets.get(work_basket), auth = self.client.cred) count = requests.get(queue.url + '/queueelements/count', auth = self.client.cred).json()['count'] queue = queue.json() queue['count'] = count return queue
def getAllTasks(self): """Returns all tasks from all Queues. Usage: >>> tasks = pe.getAllTasks() When a Queue has no tasks, a message informing which Queues are empty, will be printed. """ tasks = [] for uri in self.client.queue_urls: queue = requests.get(self.client.baseurl + uri, auth = self.client.cred) found_tasks = self.getTasks(queue.json()) if found_tasks: tasks.append(found_tasks) return [tsk for task in tasks for tsk in task]
def lockTask(self, task): """Receives a task dictionary, obtainned with getTasks() method, and locks the task so other users can't access this task at same time. Usage: >>> pe.lockTask(task) """ locked = requests.get(self.client.baseurl +task['stepElement'], auth = self.client.cred) eTag = locked.headers['ETag'] locked = requests.put(self.client.baseurl + task['stepElement'], auth = self.client.cred, params={'action':'lock', 'If-Match':eTag} )
def endTask(self, task, comment=None): """Receives a task and finishes it, finishing the workflow itself or moving to the next step in the task. Is also possible to create a comment before ending the task. Usage: >>> pe.endTask(task) #or >>> pe.endTask(task, u'Completed the task!') """ params = {'action':'dispatch'} step = self.getStep(task) if step.get('systemProperties').get('responses')\ and not step.get('systemProperties').get('selectedResponse'): return "This task needs to be updated. Check the updateTask method." else: params['selectedResponse'] = 1 if comment: task = self.saveAndUnlockTask(task, comment) lock = self.lockTask(task) params['If-Match'] = task['ETag'] dispatched = requests.put(self.client.baseurl + task['stepElement'], auth = self.client.cred, params=params)
def getUser(self, search_string): """Receives a string and looks for it in directory service. If the string search isn't found, the message "User not Found" will be returned, otherwise a list with all matching cases, limited to 50 results will be returned. Usage: >>> users = pe.getUser('user_name') """ users = [] user = requests.get(self.client.baseurl+'users', auth=self.client.cred, params={'searchPattern':search_string, 'searchType':4, 'limit':50}) if user.json().get('users'): for usr in user.json()['users']: users.append(usr['displayName']) else: return "User not Found" return users
def getGroup(self, search_string): """Receives a string and looks for it in directory service. If the string search isn't found, the message "Group not Found" will be returned, otherwise a list with all matching cases, limited to 50 results will be returned. Usage: >>> users = pe.getGroup('group_name') """ groups = [] group = requests.get(self.client.baseurl+'groups', auth=self.client.cred, params={'searchPattern':search_string, 'searchType':4, 'limit':3000}) if group.json().get('groups'): for grp in group.json()['groups']: groups.append(grp['displayName']) else: return "Group not Found" return groups
def sendMessage() -> str: """ Send a message (internal or external) to the HackerOne report identified by the given ID""" data = request.get_json(force=True) message = data['message'] internal = data['internal'] id = data['id'] if config.DEBUG: print("/v1/sendMessage: id=%s, internal=%s" % (id, internal)) if config.DEBUGVERBOSE: print("message=%s" % message) h1Data = {'data': {'type': 'activity-comment', 'attributes': {'message': message, 'internal': internal}}} headers = {'Content-Type': 'application/json'} resp = requests.post('https://api.hackerone.com/v1/reports/%s/activities' % id, headers=headers, data=json.dumps(h1Data).encode('utf-8'), auth=(config.apiName, secrets.apiToken)) return json.dumps(resp.json())
def changeStatus() -> str: """ Change the status of the report at the given ID to the given status """ data = request.get_json(force=True) status = data['status'] message = data['message'] id = data['id'] if config.DEBUG: print("/v1/changeStatus: id=%s, status=%s" % (id, status)) if config.DEBUGVERBOSE: print("message=%s" % message) h1Data = {'data': {'type': 'state-change', 'attributes': {'message': message, 'state': status}}} headers = {'Content-Type': 'application/json'} resp = requests.post('https://api.hackerone.com/v1/reports/%s/state_changes' % id, headers=headers, data=json.dumps(h1Data).encode('utf-8'), auth=(config.apiName, secrets.apiToken)) return json.dumps(resp.json())
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping: """ Post a comment to the report with the given ID using the information in the given VulnTestInfo - Set internal=True in order to post an internal comment - Set addStopMessage=True in order to add the stop message """ if config.DEBUG: print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id)) if addStopMessage: message = vti.message + '\n\n' + constants.disableMessage else: message = vti.message postMessage("Posting Message: \n\n%s" % message) # TODO: Delete this resp = requests.post('http://api:8080/v1/sendMessage', json={'message': message, 'internal': internal, 'id': id}, auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken)) if config.triageOnReproduce and vti.reproduced: changeStatus(id, 'triaged') return resp.json()
def __init__(self, api_url, auth_custom=None, auth_user=None, auth_pass=None, login=None): """ :param api_url: str Moira API URL :param auth_custom: dict auth custom headers :param auth_user: str auth user :param auth_pass: str auth password :param login: str auth login """ if not api_url.endswith('/'): self.api_url = api_url + '/' else: self.api_url = api_url self.auth = None self.auth_custom = {'X-Webauth-User': login} if auth_user and auth_pass: self.auth = HTTPBasicAuth(auth_user, auth_pass) if auth_custom: self.auth_custom.update(auth_custom)
def get(self, path='', **kwargs): """ :param path: str api path :param kwargs: additional parameters for request :return: dict response :raises: HTTPError :raises: InvalidJSONError """ r = requests.get(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs) r.raise_for_status() try: return r.json() except ValueError: raise InvalidJSONError(r.content)
def delete(self, path='', **kwargs): """ :param path: str api path :param kwargs: additional parameters for request :return: dict response :raises: HTTPError :raises: InvalidJSONError """ r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs) r.raise_for_status() try: return r.json() except ValueError: raise InvalidJSONError(r.content)
def put(self, path='', **kwargs): """ :param path: str api path :param kwargs: additional parameters for request :return: dict response :raises: HTTPError :raises: InvalidJSONError """ r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs) r.raise_for_status() try: return r.json() except ValueError: raise InvalidJSONError(r.content)
def add_torrent(self, torrent_data: Union[str, bytes], download_dir: str=None) -> bool: self.total_size = 0 self.expected_torrent_name = '' lf = NamedTemporaryFile() lf.write(torrent_data) params = {'action': 'add-file', 'token': self.token} files = {'torrent_file': open(lf.name, 'rb')} try: response = requests.post( self.UTORRENT_URL, auth=self.auth, params=params, files=files, timeout=25).json() lf.close() if 'error' in response: return False else: return True except RequestException: lf.close() return False
def add_url(self, url: str, download_dir: str=None) -> bool: self.total_size = 0 self.expected_torrent_name = '' params = {'action': 'add-url', 'token': self.token, 's': url} try: response = requests.get( self.UTORRENT_URL, auth=self.auth, # cookies=self.cookies, params=params, timeout=25).json() if 'error' in response: return False else: return True except RequestException: return False
def push_pact(self, *, pact_file, provider, consumer, consumer_version): request_url = urls.PUSH_PACT_URL.format( broker_url=self.broker_url, provider=provider, consumer=consumer, consumer_version=consumer_version ) with open(pact_file) as data_file: pact_json = json.load(data_file) response = requests.put( request_url, auth=self._auth, json=pact_json ) response.raise_for_status() return response, ( f'Pact between {consumer} and {provider} pushed.' )
def tag_consumer(self, *, provider, consumer, consumer_version, tag): request_url = urls.TAG_CONSUMER_URL.format( broker_url=self.broker_url, consumer=consumer, consumer_version=consumer_version, tag=tag ) response = requests.put( request_url, headers={'Content-Type': 'application/json'}, auth=self._auth ) response.raise_for_status() return response, ( f'{consumer} version {consumer_version} tagged as {tag}' )
def __init__(self, base_url, project, user, password, verify=False, timeout=None, auth_type=None): if not base_url.endswith('/'): base_url += '/' collection, project = self.get_collection_and_project(project) # Remove part after / in project-name, like DefaultCollection/MyProject => DefaultCollection # API responce only in Project, without subproject self._url = base_url + '%s/_apis/' % collection if project: self._url_prj = base_url + '%s/%s/_apis/' % (collection, project) else: self._url_prj = self._url self.http_session = requests.Session() auth = auth_type(user, password) self.http_session.auth = auth self.timeout = timeout self._verify = verify if not self._verify: from requests.packages.urllib3.exceptions import InsecureRequestWarning requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
def _compose_args(self, method, data=None): args = {} if data is None: data = {} query = 'params' if method == 'get' else 'data' args[query] = self._structure.get('common-params', {}) args[query].update(data) args['headers'] = self._structure.get('common-headers', {}) auth = self._structure.get('auth') if auth == 'params': args[query].update({'username': self.billing_username, 'password': self.billing_password}) elif auth == 'headers': args['auth'] = HTTPBasicAuth( self.billing_username, self.billing_password) if self._structure.get('verify') == False: args['verify'] = False return args
def get_url(url): '''request url''' headers = {'Content-type': 'application/json'} auth = (controller['any_user'], controller['any_pass']) logging.info(url) try: response = requests.get(url, headers=headers, auth=auth, verify=False) logging.info("Url GET Status: %s" % response.status_code) if response.status_code in [200]: return True, response.json() else: return False, str(response.text) except requests.exceptions.ConnectionError, e: logging.error('Connection Error: %s' % e.message) return False, str(e.message)
def post_xml(url, data): '''request post''' headers = {'Content-type': 'application/xml'} auth = (controller['any_user'], controller['any_pass']) try: response = requests.post(url, data=data, auth=auth, headers=headers, verify=False) logging.info("Url POST Status: %s" % response.status_code) if response.status_code in [200, 204]: if len(response.text) > 0: return True, response.json() else: return True, {} else: return False, str(response.text) except requests.exceptions.ConnectionError, e: logging.error('Connection Error: %s' % e.message) return False, str(e.message)
def put_xml(url, data): """request post""" headers = {'Content-type': 'application/xml'} auth = (controller['any_user'], controller['any_pass']) try: response = requests.put(url, data=data, auth=auth, headers=headers, verify=False) logging.info("Url PUT Status: %s" % response.status_code) if response.status_code in [200, 201, 204]: if len(response.text) > 0: return True, response.json() else: return True, {} else: return False, str(response.text) except requests.exceptions.ConnectionError, e: logging.error('Connection Error: %s' % e.message) return False, str(e.message)
def put_json(url, data): """request post""" headers = {'Content-type': 'application/json'} auth = (controller['any_user'], controller['any_pass']) try: response = requests.put(url, data=json.dumps(data), auth=auth, headers=headers, verify=False) logging.info("Url PUT Status: %s" % response.status_code) if response.status_code in [200, 204]: if len(response.text) > 0: return True, response.json() else: return True, {} else: return False, str(response.text) except requests.exceptions.ConnectionError, e: logging.error('Connection Error: %s' % e.message) return False, str(e.message)
def grant_access_token(self, code): res = self._session.post( 'https://bitbucket.org/site/oauth2/access_token', data={ 'grant_type': 'authorization_code', 'code': code, }, auth=HTTPBasicAuth(self._oauth_key, self._oauth_secret) ) try: res.raise_for_status() except requests.RequestException as reqe: error_info = res.json() raise BitbucketAPIError( res.status_code, error_info.get('error', ''), error_info.get('error_description', ''), request=reqe.request, response=reqe.response ) data = res.json() self._access_token = data['access_token'] self._refresh_token = data['refresh_token'] self._token_type = data['token_type'] return data
def refresh_token(self): """ Refresh token regardless of when it expires. Raises HTTPError on any non 2xx response code """ refresh_token = self._token['refresh_token'] params = { 'refresh_token': refresh_token, 'grant_type': 'refresh_token' } auth = HTTPBasicAuth(self.client_id, self.client_secret) response = self.session.post(self.TOKEN_URL, data=params, auth=auth) response.raise_for_status() token = response.json() token['expires_at'] = int(time.time()) + token['expires_in'] if 'refresh_token' not in token: token['refresh_token'] = refresh_token self.token = token
def jsonrpc_call(url, method, params={}, verify_ssl_cert=True, username=None, password=None): payload = {"method": method, "params": params, "jsonrpc": "2.0", "id": 0} kwargs = { "url": url, "headers": {'content-type': 'application/json'}, "data": json.dumps(payload), "verify": verify_ssl_cert, } if username and password: kwargs["auth"] = HTTPBasicAuth(username, password) global method_cumulative_calltime begin = time.time() response = requests.post(**kwargs).json() method_cumulative_calltime[method] += (time.time() - begin) if "result" not in response: raise JsonRpcCallFailed(payload, response) return response["result"]
def get_strategy_for(self, url): """Retrieve the authentication strategy for a specified URL. :param str url: The full URL you will be making a request against. For example, ``'https://api.github.com/user'`` :returns: Callable that adds authentication to a request. .. code-block:: python import requests a = AuthHandler({'example.com', ('foo', 'bar')}) strategy = a.get_strategy_for('http://example.com/example') assert isinstance(strategy, requests.auth.HTTPBasicAuth) """ key = self._key_from_url(url) return self.strategies.get(key, NullAuthStrategy())
def notify(tee, servers, meta_data): for server in servers: connector_access = server['connector_access'] try: json_data = connector_access.get('json_data') if connector_access.get('add_meta_data'): json_data = connector_access.get('json_data', {}) for key, val in meta_data.items(): json_data[key] = val r = requests.post( connector_access['url'], json=json_data, auth=_auth(connector_access.get('auth')), verify=connector_access.get('ssl_verify', True) ) r.raise_for_status() except: tee('Could not notify server {}: {}'.format(connector_access['url'], format_exc()))
def test_prepared_from_session(self, httpbin): class DummyAuth(requests.auth.AuthBase): def __call__(self, r): r.headers['Dummy-Auth-Test'] = 'dummy-auth-test-ok' return r req = requests.Request('GET', httpbin('headers')) assert not req.auth s = requests.Session() s.auth = DummyAuth() prep = s.prepare_request(req) resp = s.send(prep) assert resp.json()['headers'][ 'Dummy-Auth-Test'] == 'dummy-auth-test-ok'
def _user_agent_digest_hash(self): """ Hash the user agent and user agent password Section 3.10 of https://www.nar.realtor/retsorg.nsf/retsproto1.7d6.pdf :return: md5 """ if not self.version: raise MissingVersion("A version is required for user agent auth. The RETS server should set this" "automatically but it has not. Please instantiate the session with a version argument" "to provide the version.") version_number = self.version.strip('RETS/') user_str = '{0!s}:{1!s}'.format(self.user_agent, self.user_agent_password).encode('utf-8') a1 = hashlib.md5(user_str).hexdigest() session_id = self.session_id if self.session_id is not None else '' digest_str = '{0!s}::{1!s}:{2!s}'.format(a1, session_id, version_number).encode('utf-8') digest = hashlib.md5(digest_str).hexdigest() return digest
def division_standings(): url = base_url + 'division_team_standings.json' response = requests.get((url), auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw)) data = response.json() for division in data['divisionteamstandings']['division']: for team in division['teamentry']: team_id = team['team']['ID'] rank = team['rank'] conn = sqlite3.connect('nflpool.sqlite') cur = conn.cursor() cur.execute('''INSERT INTO division_standings(team_id, rank) VALUES(?,?)''', (team_id, rank)) conn.commit() conn.close() # Get Playoff Standings for each team (need number 5 & 6 in each conference for Wild Card picks)
def playoff_standings(): url = base_url + 'playoff_team_standings.json' response = requests.get((url), auth=HTTPBasicAuth(secret.msf_username, secret.msf_pw)) data = response.json() for conference in data['playoffteamstandings']['conference']: for team in conference['teamentry']: team_id = team['team']['ID'] rank = team['rank'] conn = sqlite3.connect('nflpool.sqlite') cur = conn.cursor() cur.execute('''INSERT INTO playoff_rankings(team_id, rank) VALUES(?,?)''', (team_id, rank)) conn.commit() conn.close() # Get individual statistics for each category
def parse_auth_argument(args): auth = args.auth if auth == 'basic': if not (args.username and args.password): print("For basic authentication, 'username' and 'password' " "parameter are needed") exit(3) auth = HTTPBasicAuth(args.username, args.password) elif auth == 'oauth': if not (args.consumer_key and args.private_key): print("For oauth authentication, 'consumer-key' " "and 'private-key' parameter are needed") exit(3) auth = get_oauth1session(args.consumer_key, args.consumer_secret, args.private_key, args.passphrase) return auth
def run(self): try: if self.data_type == 'hash': query_url = 'scan/' query_data = self.getParam('data', None, 'Hash is missing') elif self.data_type == 'file': query_url = 'scan/' hashes = self.getParam('attachment.hashes', None) if hashes is None: filepath = self.getParam('file', None, 'File is missing') query_data = hashlib.sha256(open(filepath, 'r').read()).hexdigest() else: # find SHA256 hash query_data = next(h for h in hashes if len(h) == 64) elif self.data_type == 'filename': query_url = 'search?query=filename:' query_data = self.getParam('data', None, 'Filename is missing') else: self.notSupported() url = str(self.basic_url) + str(query_url) + str(query_data) error = True while error: r = requests.get(url, headers=self.headers, auth=HTTPBasicAuth(self.api_key, self.secret), verify=False) if "error" in r.json().get('response') == "Exceeded maximum API requests per minute(5). Please try again later.": time.sleep(60) else: error = False self.report({'results': r.json()}) except ValueError as e: self.unexpectedError(e)
def get_token(self, token_only=True, scopes=None): if scopes is None: scopes = ['send_notification', 'view_room'] cache_key = 'hipchat-tokens:%s:%s' % (self.id, ','.join(scopes)) def gen_token(): data = { 'grant_type': 'client_credentials', 'scope': ' '.join(scopes), } resp = requests.post( self.token_url, data=data, auth=HTTPBasicAuth(self.id, self.secret), timeout=10 ) if resp.status_code == 200: return resp.json() elif resp.status_code == 401: raise OauthClientInvalidError(self) else: raise Exception('Invalid token: %s' % resp.text) if token_only: token = cache.get(cache_key) if not token: data = gen_token() token = data['access_token'] cache.set(cache_key, token, data['expires_in'] - 20) return token return gen_token()
def _get_session(self): if not self.session: self.session = requests.session() self.session.auth = HTTPBasicAuth(self.foreman_user, self.foreman_pw) self.session.verify = self.foreman_ssl_verify return self.session