我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用requests_oauthlib.OAuth2()。
def auth(): # Basic Authentication requests.get('https://api.github.com/user', auth=HTTPBasicAuth('user', 'pass')) requests.get('https://api.github.com/user', auth=('user', 'pass')) # Digest Authentication url = 'http://httpbin.org/digest-auth/auth/user/pass' requests.get(url, auth=HTTPDigestAuth('user', 'pass')) # OAuth2 Authentication????requests-oauthlib url = 'https://api.twitter.com/1.1/account/verify_credentials.json' auth = OAuth2('YOUR_APP_KEY', 'YOUR_APP_SECRET', 'USER_OAUTH_TOKEN') requests.get(url, auth=auth) pass
def __init__(self, client_id=None, client_secret=None, server_token=None): self.client_id = client_id or UBER_CLIENT_ID self.client_secret = client_secret or UBER_CLIENT_SECRET self.server_token = server_token or UBER_SERVER_TOKEN # self.oauth_client = OAuth2(client_id=self.client_id, token=self.server_token)
def query_GitHub(url, username=None, password=None, token=None, data=None, OTP=None, headers=None, params=None, files=None): """ Query GitHub API. In case of a multipage result, DOES NOT query the next page. """ headers = headers or {} if OTP: headers['X-GitHub-OTP'] = OTP if token: auth = OAuth2(client_id=username, token=dict(access_token=token, token_type='bearer')) else: auth = HTTPBasicAuth(username, password) if data: r = requests.post(url, auth=auth, data=data, headers=headers, params=params, files=files) else: r = requests.get(url, auth=auth, headers=headers, params=params, stream=True) if r.status_code == 401: two_factor = r.headers.get('X-GitHub-OTP') if two_factor: print("A two-factor authentication code is required:", two_factor.split(';')[1].strip()) OTP = raw_input("Authentication code: ") return query_GitHub(url, username=username, password=password, token=token, data=data, OTP=OTP) raise AuthenticationFailed("invalid username or password") r.raise_for_status() return r # ------------------------------------------------ # Vagrant related configuration
def SetCredentials(self, consumer_key, consumer_secret, access_token_key=None, access_token_secret=None, application_only_auth=False): """Set the consumer_key and consumer_secret for this instance Args: consumer_key: The consumer_key of the twitter account. consumer_secret: The consumer_secret for the twitter account. access_token_key: The oAuth access token key value you retrieved from running get_access_token.py. access_token_secret: The oAuth access token's secret, also retrieved from the get_access_token.py run. application_only_auth: Whether to generate a bearer token and use Application-Only Auth """ self._consumer_key = consumer_key self._consumer_secret = consumer_secret self._access_token_key = access_token_key self._access_token_secret = access_token_secret if application_only_auth: self._bearer_token = self.GetAppOnlyAuthToken(consumer_key, consumer_secret) self.__auth = OAuth2(token=self._bearer_token) else: auth_list = [consumer_key, consumer_secret, access_token_key, access_token_secret] if all(auth_list): self.__auth = OAuth1(consumer_key, consumer_secret, access_token_key, access_token_secret) self._config = None
def make_request(self, url, data={}, method=None, **kwargs): """ Builds and makes the OAuth2 Request, catches errors https://wiki.fitbit.com/display/API/API+Response+Format+And+Errors """ if not method: method = 'POST' if data else 'GET' try: auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) except (HTTPUnauthorized, TokenExpiredError) as e: self.refresh_token() auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) # yet another token expiration check # (the above try/except only applies if the expired token was obtained # using the current instance of the class this is a a general case) if response.status_code == 401: d = json.loads(response.content.decode('utf8')) try: if(d['errors'][0]['errorType'] == 'expired_token' and d['errors'][0]['message'].find('Access token expired:') == 0): self.refresh_token() auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) except: pass if response.status_code == 401: raise HTTPUnauthorized(response) elif response.status_code == 403: raise HTTPForbidden(response) elif response.status_code == 404: raise HTTPNotFound(response) elif response.status_code == 409: raise HTTPConflict(response) elif response.status_code == 429: exc = HTTPTooManyRequests(response) exc.retry_after_secs = int(response.headers['Retry-After']) raise exc elif response.status_code >= 500: raise HTTPServerError(response) elif response.status_code >= 400: raise HTTPBadRequest(response) return response
def make_request(self, url, data={}, method=None, **kwargs): """ Builds and makes the OAuth2 Request, catches errors https://wiki.fitbit.com/display/API/API+Response+Format+And+Errors """ if not method: method = 'POST' if data else 'GET' try: auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) except HTTPUnauthorized as e: self.refresh_token() auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) # yet another token expiration check # (the above try/except only applies if the expired token was obtained # using the current instance of the class this is a a general case) if response.status_code == 401: d = json.loads(response.content.decode('utf8')) try: if(d['errors'][0]['errorType'] == 'expired_token' and d['errors'][0]['message'].find('Access token expired:') == 0): self.refresh_token() auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) except: pass if response.status_code == 401: raise HTTPUnauthorized(response) elif response.status_code == 403: raise HTTPForbidden(response) elif response.status_code == 404: raise HTTPNotFound(response) elif response.status_code == 409: raise HTTPConflict(response) elif response.status_code == 429: exc = HTTPTooManyRequests(response) exc.retry_after_secs = int(response.headers['Retry-After']) raise exc elif response.status_code >= 500: raise HTTPServerError(response) elif response.status_code >= 400: raise HTTPBadRequest(response) return response
def make_request(self, url, data={}, method=None, **kwargs): """ Builds and makes the OAuth2 Request, catches errors https://wiki.fitbit.com/display/API/API+Response+Format+And+Errors """ if not method: method = 'POST' if data else 'GET' try: auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) except TokenExpiredError as e: self.refresh_token() auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) #yet another token expiration check #(the above try/except only applies if the expired token was obtained #using the current instance of the class this is a a general case) if response.status_code == 401: d = json.loads(response.content.decode('utf8')) try: if(d['errors'][0]['errorType']=='oauth' and d['errors'][0]['fieldName']=='access_token' and d['errors'][0]['message'].find('Access token invalid or expired:')==0): self.refresh_token() auth = OAuth2(client_id=self.client_id, token=self.token) response = self._request(method, url, data=data, auth=auth, **kwargs) except: pass if response.status_code == 401: raise HTTPUnauthorized(response) elif response.status_code == 403: raise HTTPForbidden(response) elif response.status_code == 404: raise HTTPNotFound(response) elif response.status_code == 409: raise HTTPConflict(response) elif response.status_code == 429: exc = HTTPTooManyRequests(response) exc.retry_after_secs = int(response.headers['Retry-After']) raise exc elif response.status_code >= 500: raise HTTPServerError(response) elif response.status_code >= 400: raise HTTPBadRequest(response) return response