我们从Python开源项目中,提取了以下22个代码示例,用于说明如何使用requests.response()。
def request(self, method, *resources, **kwargs): method = method.upper() response_key = kwargs.pop('response_key', None) key = kwargs.pop('key', None) if key is not None: response_key = key query = kwargs.pop('query', None) data = kwargs.pop('data', None) fragment = kwargs.pop('fragment', '') params = kwargs.pop('params', '') keep_blank_values = kwargs.pop('keep_blank_values', None) timeout = kwargs.pop('timeout', 60) resource = self.build_resource(resources) content_type = kwargs.pop('content_type', 'json') if data is not None: if 'json' in content_type: kwargs['json'] = data if content_type == 'body': kwargs['data'] = data url = self.url_for(resource, query, params=params, fragment=fragment, keep_blank_values=keep_blank_values) self.logger.info('Request %s for %s', method, url) response = requests.request(method, url, timeout=timeout, **kwargs) return self.handle_response(response, response_key=response_key)
def emaillogin(email, otp): """Raw Cloud API call, request cloud token with email address & OTP. Args: email(str): Email address connected to Cozify account. otp(int): One time passcode. Returns: str: cloud token """ payload = { 'email': email, 'password': otp } response = requests.post(cloudBase + 'user/emaillogin', params=payload) if response.status_code == 200: return response.text else: raise APIError(response.status_code, response.text)
def hubkeys(cloud_token): """1:1 implementation of user/hubkeys Args: cloud_token(str) Cloud remote authentication token. Returns: dict: Map of hub_id: hub_token pairs. """ headers = { 'Authorization': cloud_token } response = requests.get(cloudBase + 'user/hubkeys', headers=headers) if response.status_code == 200: return json.loads(response.text) else: raise APIError(response.status_code, response.text)
def refreshsession(cloud_token): """1:1 implementation of user/refreshsession Args: cloud_token(str) Cloud remote authentication token. Returns: str: New cloud remote authentication token. Not automatically stored into state. """ headers = { 'Authorization': cloud_token } response = requests.get(cloudBase + 'user/refreshsession', headers=headers) if response.status_code == 200: return response.text else: raise APIError(response.status_code, response.text)
def cone_search(ra, dec, radius, table="gaiadr1.gaia_source", **kwargs): """ Perform a cone search against the ESA Gaia database using the TAP. :param ra: Right ascension (degrees). :param dec: Declination (degrees). :param radius: Cone search radius (degrees). :param table: [optional] The table name to perform the cone search on. Some examples are: gaiadr1.gaia_source gaiadr1.tgas_source :param kwargs: Keyword arguments are passed directly to the `query` method. :returns: The data returned by the ESA/Gaia archive -- either as an astropy table or as a dictionary -- and optionally, the `requests.response` object used. """ return query( """ SELECT * FROM {table} WHERE CONTAINS( POINT('ICRS',{table}.ra,{table}.dec), CIRCLE('ICRS',{ra:.10f},{dec:.10f},{radius:.10f})) = 1;""".format( table=table, ra=ra, dec=dec, radius=radius), **kwargs)
def __init__(self, response, description, *args, **kwargs): """Exception exception instance has: response, description, content and status_code :param response: requests.response :param description: str - description for error """ self.response = response self.description = description self.status_code = response.status_code self.content = response.content super(ResponseError, self).__init__(*args, **kwargs)
def __repr__(self): # pragma: no cover return 'Error status code: {}. Description: {}'.format( self.response.status_code, self.description)
def handle_response(self, response, response_key=None): """Handler for response object :param response: requests.response obj :param response_key: key for dict in response obj :return object, result for response, python obj """ status_code = response.status_code try: result = response.json() except Exception as e: self.logger.exception(e) raise ResponseError(response, e) if result: if response_key is not None and status_code in self.ok_statuses: if response_key in result: result = result[response_key] else: raise ResponseError(response, 'Response key not found!') elif response_key is not None and status_code in self.to_none_statuses: result = None elif status_code not in self.ok_statuses and status_code not in self.to_none_statuses: raise ResponseError(response, 'Status code {} not in ok_statuses {}'.format( status_code, self.ok_statuses)) if response_key is not None and self.empty_to_none and result is not None and not result: result = None return result
def make_402_payment(self, response, max_price): """Payment handling method implemented by a BitRequests subclass. Args: response (requests.response): 402 response from the API server. max_price (int): maximum allowed price for a request (in satoshi). Returns: headers (dict): dict of headers with payment data to send to the API server to inform payment status for the resource. """ raise NotImplementedError()
def make_402_payment(self, response, max_price): """Make an on-chain payment.""" # Retrieve payment headers headers = response.headers price = headers.get(OnChainRequests.HTTP_BITCOIN_PRICE) payee_address = headers.get(OnChainRequests.HTTP_BITCOIN_ADDRESS) # Verify that the payment method is supported if price is None or payee_address is None: raise UnsupportedPaymentMethodError( 'Resource does not support that payment method.') # Convert string headers into correct data types price = int(price) # Verify resource cost against our budget if max_price and price > max_price: max_price_err = 'Resource price ({}) exceeds max price ({}).' raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price)) # Create the signed transaction onchain_payment = self.wallet.make_signed_transaction_for( payee_address, price, use_unconfirmed=True)[0].get('txn').to_hex() return_address = self.wallet.current_address logger.debug('[OnChainRequests] Signed transaction: {}'.format( onchain_payment)) return { 'Bitcoin-Transaction': onchain_payment, 'Return-Wallet-Address': return_address, OnChainRequests.HTTP_BITCOIN_PRICE: str(price), OnChainRequests.HTTP_PAYER_21USERNAME: urllib.parse.quote(self.username) if self.username else None }
def get_402_info(self, url): """Get channel payment information about the resource.""" response = requests.get(url) price = response.headers.get(ChannelRequests.HTTP_BITCOIN_PRICE) channel_url = response.headers.get(ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER) return {ChannelRequests.HTTP_BITCOIN_PRICE: price, ChannelRequests.HTTP_BITCOIN_PAYMENT_CHANNEL_SERVER: channel_url}
def requestlogin(email): """Raw Cloud API call, request OTP to be sent to account email address. Args: email(str): Email address connected to Cozify account. """ payload = { 'email': email } response = requests.post(cloudBase + 'user/requestlogin', params=payload) if response.status_code is not 200: raise APIError(response.status_code, response.text)
def lan_ip(): """1:1 implementation of hub/lan_ip This call will fail with an APIError if the requesting source address is not the same as that of the hub, i.e. if they're not in the same NAT network. The above is based on observation and may only be partially true. Returns: list: List of Hub ip addresses. """ response = requests.get(cloudBase + 'hub/lan_ip') if response.status_code == 200: return json.loads(response.text) else: raise APIError(response.status_code, response.text)
def _default_is_success(status_code): """Returns true if the success status is between [200, 300). :param response_status: the http response status :type response_status: int :returns: True for success status; False otherwise :rtype: bool """ return 200 <= status_code < 300
def get_auth_scheme(response): """Return authentication scheme and realm requested by server for 'Basic' or 'acsjwt' (DCOS acs auth) or 'oauthjwt' (DCOS acs oauth) type :param response: requests.response :type response: requests.Response :returns: auth_scheme, realm :rtype: (str, str) """ if 'www-authenticate' in response.headers: auths = response.headers['www-authenticate'].split(',') scheme = next((auth_type.rstrip().lower() for auth_type in auths if auth_type.rstrip().lower().startswith("basic") or auth_type.rstrip().lower().startswith("acsjwt") or auth_type.rstrip().lower().startswith("oauthjwt")), None) if scheme: scheme_info = scheme.split("=") auth_scheme = scheme_info[0].split(" ")[0].lower() realm = scheme_info[-1].strip(' \'\"').lower() return auth_scheme, realm else: return None, None else: return None, None
def _get_http_auth(response, url, auth_scheme): """Get authentication mechanism required by server :param response: requests.response :type response: requests.Response :param url: parsed request url :type url: str :param auth_scheme: str :type auth_scheme: str :returns: AuthBase :rtype: AuthBase """ hostname = url.hostname username = url.username password = url.password if 'www-authenticate' in response.headers: if auth_scheme not in ['basic', 'acsjwt', 'oauthjwt']: msg = ("Server responded with an HTTP 'www-authenticate' field of " "'{}', DCOS only supports 'Basic'".format( response.headers['www-authenticate'])) raise DCOSException(msg) if auth_scheme == 'basic': # for basic auth if username + password was present, # we'd already be authed by python requests module username, password = _get_auth_credentials(username, hostname) return HTTPBasicAuth(username, password) # dcos auth (acs or oauth) else: return _get_dcos_auth(auth_scheme, username, password, hostname) else: msg = ("Invalid HTTP response: server returned an HTTP 401 response " "with no 'www-authenticate' field") raise DCOSException(msg)
def _get_dcos_auth(auth_scheme, username, password, hostname): """Get authentication flow for dcos acs auth and dcos oauth :param auth_scheme: authentication_scheme :type auth_scheme: str :param username: username user for authentication :type username: str :param password: password for authentication :type password: str :param hostname: hostname for credentials :type hostname: str :returns: DCOSAcsAuth :rtype: AuthBase """ toml_config = util.get_config() token = toml_config.get("core.dcos_acs_token") if token is None: dcos_url = toml_config.get("core.dcos_url") if auth_scheme == "acsjwt": creds = _get_dcos_acs_auth_creds(username, password, hostname) else: creds = _get_dcos_oauth_creds(dcos_url) verify = _verify_ssl() # Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad cert if verify is not None: silence_requests_warnings() url = urllib.parse.urljoin(dcos_url, 'acs/api/v1/auth/login') # using private method here, so we don't retry on this request # error here will be bubbled up to _request_with_auth response = _request('post', url, json=creds, verify=verify) if response.status_code == 200: token = response.json()['token'] config.set_val("core.dcos_acs_token", token) return DCOSAcsAuth(token)
def query(query, authenticate=False, json=False, full_output=False, **kwargs): """ Execute a synchronous TAP query to the ESA Gaia database. :param query: The TAP query to execute. :param authenticate: [optional] Authenticate with the username and password information stored in the config. :param json: [optional] Return the data in JSON format. If set to False, then the data will be returned as an `astropy.table.Table`. :param full_output: [optional] Return a two-length tuple containing the data and the corresponding `requests.response` object. :returns: The data returned - either as an astropy table or a dictionary (JSON) - and optionally, the `requests.response` object used. """ format = "json" if json else "votable" params = dict(REQUEST="doQuery", LANG="ADQL", FORMAT=format, query=query) params.update(kwargs) # Create session. session = requests.Session() if authenticate: utils.login(session) response = session.get("{}/tap/sync".format(config.url), params=params) if not response.ok: raise TAPQueryException(response) if json: data = response.json() else: # Take the table contents and return an astropy table. data = Table.read(StringIO(response.text), format="votable") return (data, response) if full_output else data
def __init__(self, endpoint, ok_statuses=None, to_none_statuses=None, empty_to_none=True, close_slash=True, logger=None, name=None, keep_blank_values=True): """Create a client :param endpoint: str, ex. http://localhost:5000 or http://localhost:5000/api/ :param ok_statuses: default - (200, 201, 202, ), status codes for "ok" :param to_none_statuses: statuses, for generate None as response, default - (404, ) :param empty_to_none: boolean, default - True, if True - empty response will be generate None response (empty str, empty list, empty dict) :param close_slash: boolean, url += '/', if url.endswith != '/', default - True :param logger: logger instance :param name: name for client :type name: str """ if name is None: name = '<client: {}>'.format(endpoint) if logger is None: logger = get_logger(__name__) self.logger = InstanceLogger(self, logger) if endpoint.endswith('/'): endpoint = endpoint[:-1] if ok_statuses is not None: self.ok_statuses = ok_statuses if to_none_statuses is not None: self.to_none_statuses = to_none_statuses self.empty_to_none = empty_to_none self.close_slash = close_slash parsed_url = urlparse.urlparse(endpoint) endpoint = self.get_endpoint_from_parsed_url(parsed_url) self.keep_blank_values = keep_blank_values self.endpoint = endpoint self.path = parsed_url.path self.query = urlparse.parse_qs(parsed_url.query, keep_blank_values=self.keep_blank_values) self.fragment = parsed_url.fragment self.params = parsed_url.params self.name = name self.logger.debug( 'Client built, endpoint: "%s", path: "%s", query: %s, params: %s, fragment: %s', self.endpoint, self.path, self.query, self.params, self.fragment)
def request(self, method, url, max_price=None, mock_requests=False, **kwargs): """Make a 402 request for a resource. This is the BitRequests public method that should be used to complete a 402 request using the desired payment method (as constructed by a class implementing BitRequests) Args: method (string): HTTP method for completing the request in lower- case letters. Examples: 'get', 'post', 'put' url (string): URL of the requested resource. data (dict): python dict of parameters to send with the request. max_price (int): maximum allowed price for a request (in satoshi). Returns: response (requests.response): response from paying for the requested resource. """ if mock_requests: fake_response = requests.models.Response() fake_response.status_code = 200 fake_response._content = b'' return fake_response # Make the initial request for the resource response = requests.request(method, url, **kwargs) # Return if we receive a status code other than 402: payment required if response.status_code != requests.codes.payment_required: return response # Pass the response to the main method for handling payment logger.debug('[BitRequests] 402 payment required: {} satoshi.'.format( response.headers['price'])) payment_headers = self.make_402_payment(response, max_price) # Reset the position of any files that have been used self._reset_file_positions(kwargs.get('files'), kwargs.get('data')) # Add any user-provided headers to the payment headers dict if 'headers' in kwargs: if isinstance(kwargs['headers'], dict): kwargs['headers'].update(payment_headers) else: raise ValueError('argument \'headers\' must be a dict.') else: kwargs['headers'] = payment_headers paid_response = requests.request(method, url, **kwargs) setattr(paid_response, 'amount_paid', int(response.headers['price'])) if paid_response.status_code == requests.codes.ok: logger.debug('[BitRequests] Successfully purchased resource.') else: logger.debug('[BitRequests] Could not purchase resource.') return paid_response
def make_402_payment(self, response, max_price): """Make a bit-transfer payment to the payment-handling service.""" # Retrieve payment headers headers = response.headers price = headers.get(BitTransferRequests.HTTP_BITCOIN_PRICE) payee_address = headers.get(BitTransferRequests.HTTP_BITCOIN_ADDRESS) payee_username = headers.get(BitTransferRequests.HTTP_BITCOIN_USERNAME) # Verify that the payment method is supported if price is None or payee_address is None or payee_username is None: raise UnsupportedPaymentMethodError( 'Resource does not support that payment method.') # Convert string headers into correct data types price = int(price) # verify that we have the money to purchase the resource buffer_balance = self.client.get_earnings()["total_earnings"] if price > buffer_balance: insuff_funds_err = 'Resource price ({}) exceeds buffer balance ({}).' raise InsufficientBalanceError(insuff_funds_err.format(price, buffer_balance)) # Verify resource cost against our budget if max_price and price > max_price: max_price_err = 'Resource price ({}) exceeds max price ({}).' raise ResourcePriceGreaterThanMaxPriceError(max_price_err.format(price, max_price)) # Get the signing public key pubkey = self.wallet.get_public_key() compressed_pubkey = codecs.encode(pubkey.compressed_bytes, 'base64').decode() # Create and sign BitTranfer bittransfer = json.dumps({ 'payer': self.username, 'payer_pubkey': compressed_pubkey, 'payee_address': payee_address, 'payee_username': payee_username, 'amount': price, 'timestamp': time.time(), 'description': response.url }) if not isinstance(bittransfer, str): raise TypeError("Serialized bittransfer must be a string") signature = self.wallet.sign_message(bittransfer) logger.debug('[BitTransferRequests] Signature: {}'.format(signature)) logger.debug('[BitTransferRequests] BitTransfer: {}'.format(bittransfer)) return { 'Bitcoin-Transfer': bittransfer, 'Authorization': signature }
def request(method, url, is_success=_default_is_success, timeout=None, verify=None, **kwargs): """Sends an HTTP request. If the server responds with a 401, ask the user for their credentials, and try request again (up to 3 times). :param method: method for the new Request object :type method: str :param url: URL for the new Request object :type url: str :param is_success: Defines successful status codes for the request :type is_success: Function from int to bool :param timeout: request timeout :type timeout: int :param verify: whether to verify SSL certs or path to cert(s) :type verify: bool | str :param kwargs: Additional arguments to requests.request (see http://docs.python-requests.org/en/latest/api/#requests.request) :type kwargs: dict :rtype: Response """ if 'headers' not in kwargs: kwargs['headers'] = {'Accept': 'application/json'} verify = _verify_ssl(verify) # Silence 'Unverified HTTPS request' and 'SecurityWarning' for bad certs if verify is not None: silence_requests_warnings() response = _request(method, url, is_success, timeout, verify=verify, **kwargs) if response.status_code == 401: response = _request_with_auth(response, method, url, is_success, timeout, verify, **kwargs) if is_success(response.status_code): return response elif response.status_code == 403: raise DCOSAuthorizationException(response) else: raise DCOSHTTPException(response)