我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oauth2.Client()。
def get_token_and_auth_url(self, callback_url=None): """First step is to get the token and then send the request that provides the auth URL Returns a tuple of token and the authorisation URL. """ client = oauth.Client(self.consumer) params = {} params['oauth_callback'] = callback_url or 'oob' request = oauth.Request(parameters=params) url = REQUEST_TOKEN_URL resp, content = client.request(url, "POST", request.to_postdata()) if resp.get('status') == '200': token = oauth.Token.from_string(content) yql_logger.debug("token: %s", token) data = dict(parse_qsl(content)) yql_logger.debug("data: %s", data) return token, data['xoauth_request_auth_url'] else: raise YQLError(resp, content, url)
def get_token_and_auth_url(self, callback_url=None): """First step is to get the token and then send the request that provides the auth URL Returns a tuple of token and the authorisation URL. """ client = oauth.Client(self.consumer) params = {} params['oauth_callback'] = callback_url or 'oob' request = oauth.Request(parameters=params) url = REQUEST_TOKEN_URL resp, content = client.request(url, "POST", request.to_postdata()) if resp.get('status') == '200': token = oauth.Token.from_string(content) yql_logger.debug("token: %s", token) data = dict(parse_qsl(content)) yql_logger.debug("data: %s", data) return token, data['xoauth_request_auth_url'] else: raise YQLError, (resp, content, url)
def test_set_signature_method(self): consumer = oauth.Consumer('key', 'secret') client = oauth.Client(consumer) class Blah: pass try: client.set_signature_method(Blah()) self.fail("Client.set_signature_method() accepted invalid method.") except ValueError: pass m = oauth.SignatureMethod_HMAC_SHA1() client.set_signature_method(m) self.assertEqual(m, client.method)
def test_init(self): class Blah(): pass try: client = oauth.Client(Blah()) self.fail("Client.__init__() accepted invalid Consumer.") except ValueError: pass consumer = oauth.Consumer('token', 'secret') try: client = oauth.Client(consumer, Blah()) self.fail("Client.__init__() accepted invalid Token.") except ValueError: pass
def make_request(self, url, method="GET", body="", headers=None): """Makes a request to the TradeKing API.""" consumer = Consumer(key=TRADEKING_CONSUMER_KEY, secret=TRADEKING_CONSUMER_SECRET) token = Token(key=TRADEKING_ACCESS_TOKEN, secret=TRADEKING_ACCESS_TOKEN_SECRET) client = Client(consumer, token) self.logs.debug("TradeKing request: %s %s %s %s" % (url, method, body, headers)) response, content = client.request(url, method=method, body=body, headers=headers) self.logs.debug("TradeKing response: %s %s" % (response, content)) try: return loads(content) except ValueError: self.logs.error("Failed to decode JSON response: %s" % content) return None
def get_user_information(self): token = oauth2.Token(self.oauth_token, self.oauth_token_secret) token.set_verifier(self.oauth_verifier) client = oauth2.Client(self.consumer, token) url = 'https://api.twitter.com/oauth/access_token' resp, content = client.request(url, 'POST') if resp.status != 200: raise Error('{} from Twitter'.format(resp.status)) provider_user = dict(parse_qsl(content.decode('utf-8'))) if 'user_id' not in provider_user: raise Error('No user_id from Twitter') self.status = 200 self.user_id = provider_user.get('user_id') self.user_name = provider_user.get('screen_name', None) return (self.user_id, self.user_name,)
def get_request_token(self): """ Function for generating initial request token for three-legged oauth Takes no input, returns a request_token dict with two keys, oauth_token and oauth_token_secret """ callback_url = self.request.build_absolute_uri(reverse('bot-authorize', args=(self.object.pk, ))) consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) client = oauth.Client(consumer) resp, content = client.request(self.REQUEST_TOKEN_URL, "POST", body=urllib.parse.urlencode({'oauth_callback': callback_url})) if resp['status'] != '200': raise Exception("Invalid response %s." % resp['status']) request_token = dict(urllib.parse.parse_qsl(content)) # urllib returns bytes, which will need to be decoded using the string.decode() method before use request_token = {key.decode(): value.decode() for key, value in request_token.items()} # Return the token dict containing token and secret return request_token
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier): """ Func for final leg of three-legged oauth, takes token and secret returned in oauth_callback GET dict and exchanges them for the permanent access token and secret returns an access_token dict with two keys, oauth_token and oauth_token_secret """ token = oauth.Token(oauth_token, oauth_token_secret) token.set_verifier(oauth_verifier) consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) client = oauth.Client(consumer, token) # Now we fire the request at access token url instead of request token resp, content = client.request(self.ACCESS_TOKEN_URL, "POST") if resp['status'] != '200': raise Exception("Invalid response %s." % resp['status']) access_token = dict(urllib.parse.parse_qsl(content)) # urllib returns bytes, which will need to be decoded using the string.decode() method before use access_token = {key.decode(): value.decode() for key, value in access_token.items()} # Return the token dict containing token and secret return access_token
def get(self, url, params): """ Issues a GET request against the API, properly formatting the params :param url: a string, the url you are requesting :param params: a dict, the key-value of all the paramaters needed in the request :returns: a dict parsed of the JSON response """ url = self.host + url if params: url = url + "?" + urllib.urlencode(params) client = oauth.Client(self.consumer, self.token, proxy_info=self.proxy_info) client.disable_ssl_certificate_validation = True try: client.follow_redirects = False resp, content = client.request(url, method="GET", redirections=False) except RedirectLimit, e: resp, content = e.args return self.json_parse(content)
def post(self, url, params={}, files=[]): """ Issues a POST request against the API, allows for multipart data uploads :param url: a string, the url you are requesting :param params: a dict, the key-value of all the parameters needed in the request :param files: a list, the list of tuples of files :returns: a dict parsed of the JSON response """ url = self.host + url try: if files: return self.post_multipart(url, params, files) else: client = oauth.Client(self.consumer, self.token, proxy_info=self.proxy_info) client.disable_ssl_certificate_validation = True resp, content = client.request(url, method="POST", body=urllib.urlencode(params)) return self.json_parse(content) except urllib2.HTTPError, e: return self.json_parse(e.read())
def twitter_oauth2(oauth_token, oauth_verifier): auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1) if not auth_tokens: return None auth_token = auth_tokens[0] # Step 3: Once the consumer has redirected the user back to the oauth_callback # URL you can request the access token the user has approved. You use the # request token to sign this request. After this is done you throw away the # request token and use the access token returned. You should store this # access token somewhere safe, like a database, for future use. token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret) token.set_verifier(oauth_verifier) consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret) client = oauth.Client(consumer, token) resp, content = client.request(access_token_url, "POST") access_token = dict(urlparse.parse_qsl(content)) auth_token.oauth_token = access_token['oauth_token'] auth_token.oauth_token_secret = access_token['oauth_token_secret'] auth_token.valid_token = True auth_token.time_between_posts = 5 * 60 # every 5 minutes auth_token.put() return auth_token
def getAuthorizeURL(self): client = oauth.Client(self.consumer) #Get the request token resp, content = client.request(Splitwise.REQUEST_TOKEN_URL, "POST") if Splitwise.isDebug(): print(resp, content) #Check if the response is correct if resp['status'] != '200': raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status']) request_token = dict(parse_qsl(content.decode("utf-8"))) return "%s?oauth_token=%s" % (Splitwise.AUTHORIZE_URL, request_token['oauth_token']), request_token['oauth_token_secret']
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier): token = oauth.Token(oauth_token,oauth_token_secret) token.set_verifier(oauth_verifier) client = oauth.Client(self.consumer, token) resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST") if Splitwise.isDebug(): print(resp, content) #Check if the response is correct if resp['status'] != '200': raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status']) access_token = dict(parse_qsl(content.decode("utf-8"))) return access_token
def get_tweets(twitter_handle): CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0' CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6' ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh' ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4' consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET) access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) client = oauth.Client(consumer, access_token) endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100" response, data = client.request(endpoint) tweet_data = json.loads(data.decode('utf-8')) tweets = [] for tweet in tweet_data['statuses']: tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n')) return tweets #example # print(get_tweets("Cmdr_Hadfield"))
def accessToken(self): """Return the access token generated by the authenticating server. If token is already in the session that one will be used. Otherwise the token is fetched from the auth server. """ if self.session.access_token: # return the token (TODO: does it expire?) return self.session.access_token if self.session.request_token: # Exchange the request token with an authorization token. token = self.session.request_token self.session.request_token = None # Build an authorized client # OAuth1.0a put the verifier! token.set_verifier(self.request.vars.oauth_verifier) client = oauth.Client(self.consumer, token) resp, content = client.request(self.access_token_url, "POST") if str(resp['status']) != '200': self.session.request_token = None self.globals['redirect'](self.globals[ 'URL'](f='user', args='logout')) self.session.access_token = oauth.Token.from_string(content) return self.session.access_token self.session.access_token = None return None
def __oauth_login(self, next): '''This method redirects the user to the authenticating form on authentication server if the authentication code and the authentication token are not available to the application yet. Once the authentication code has been received this method is called to set the access token into the session by calling accessToken() ''' if not self.accessToken(): # setup the client client = oauth.Client(self.consumer, None, timeout=self.socket_timeout) # Get a request token. # oauth_callback *is REQUIRED* for OAuth1.0a # putting it in the body seems to work. callback_url = self.__redirect_uri(next) data = urlencode(dict(oauth_callback=callback_url)) resp, content = client.request(self.token_url, "POST", body=data) if resp['status'] != '200': self.session.request_token = None self.globals['redirect'](self.globals[ 'URL'](f='user', args='logout')) # Store the request token in session. request_token = self.session.request_token = oauth.Token.from_string(content) # Redirect the user to the authentication URL and pass the callback url. data = urlencode(dict(oauth_token=request_token.key, oauth_callback=callback_url)) auth_request_url = self.auth_url + '?' + data HTTP = self.globals['HTTP'] raise HTTP(302, "You are not authenticated: you are being redirected to the <a href='" + auth_request_url + "'> authentication server</a>", Location=auth_request_url) return None
def _post_patched_request(lti_key, secret, body, url, method, content_type): # pylint: disable=too-many-arguments """ Authorization header needs to be capitalized for some LTI clients this function ensures that header is capitalized :param body: body of the call :param client: OAuth Client :param url: outcome url :return: response """ consumer = oauth2.Consumer(key=lti_key, secret=secret) client = oauth2.Client(consumer) import httplib2 http = httplib2.Http # pylint: disable=protected-access normalize = http._normalize_headers def my_normalize(self, headers): """ This function patches Authorization header """ ret = normalize(self, headers) if 'authorization' in ret: ret['Authorization'] = ret.pop('authorization') return ret http._normalize_headers = my_normalize monkey_patch_function = normalize response, content = client.request( url, method, body=body.encode("utf8"), headers={'Content-Type': content_type}) http = httplib2.Http # pylint: disable=protected-access http._normalize_headers = monkey_patch_function return response, content
def test_init_passes_kwargs_to_httplib2(self): class Blah(): pass consumer = oauth.Consumer('token', 'secret') # httplib2 options client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True) self.assertNotEqual(client.cache, None) self.assertEqual(client.timeout, 3)
def test_access_token_get(self): """Test getting an access token via GET.""" client = oauth.Client(self.consumer, None) resp, content = client.request(self._uri('request_token'), "GET") self.assertEqual(int(resp['status']), 200)
def test_access_token_post(self): """Test getting an access token via POST.""" client = oauth.Client(self.consumer, None) resp, content = client.request(self._uri('request_token'), "POST") self.assertEqual(int(resp['status']), 200) res = dict(parse_qsl(content)) self.assertTrue(b'oauth_token' in res) self.assertTrue(b'oauth_token_secret' in res)
def test_multipart_post_does_not_alter_body(self, mockHttpRequest): random_result = random.randint(1,100) data = { 'rand-%d'%random.randint(1,100):random.randint(1,100), } content_type, body = self.create_simple_multipart_data(data) client = oauth.Client(self.consumer, None) uri = self._uri('two_legged') def mockrequest(cl, ur, **kw): self.assertTrue(cl is client) self.assertTrue(ur is uri) self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) self.assertEqual(kw['body'], body) self.assertEqual(kw['connection_type'], None) self.assertEqual(kw['method'], 'POST') self.assertEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) self.assertTrue(isinstance(kw['headers'], dict)) return random_result mockHttpRequest.side_effect = mockrequest result = client.request(uri, 'POST', headers={'Content-Type':content_type}, body=body) self.assertEqual(result, random_result)
def test_url_with_query_string(self, mockHttpRequest): uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf' client = oauth.Client(self.consumer, None) random_result = random.randint(1,100) def mockrequest(cl, ur, **kw): self.assertTrue(cl is client) self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) self.assertEqual(kw['body'], b'') self.assertEqual(kw['connection_type'], None) self.assertEqual(kw['method'], 'GET') self.assertEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) self.assertTrue(isinstance(kw['headers'], dict)) req = oauth.Request.from_consumer_and_token(self.consumer, None, http_method='GET', http_url=uri, parameters={}) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None) expected = parse_qsl( urlparse(req.to_url()).query) actual = parse_qsl(urlparse(ur).query) self.assertEqual(len(expected), len(actual)) actual = dict(actual) for key, value in expected: if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'): self.assertEqual(actual[key], value) return random_result mockHttpRequest.side_effect = mockrequest client.request(uri, 'GET')
def test_multiple_values_for_a_key(self, mockReqConstructor, mockHttpRequest): client = oauth.Client(self.consumer, None) request = oauth.Request("GET", "http://example.com/fetch.php", parameters={'multi': ['1', '2']}) mockReqConstructor.return_value = request client.request('http://whatever', 'POST', body='multi=1&multi=2') self.assertEqual(mockReqConstructor.call_count, 1) self.assertEqual(mockReqConstructor.call_args[1]['parameters'], {'multi': ['1', '2']}) self.assertTrue('multi=1' in mockHttpRequest.call_args[1]['body']) self.assertTrue('multi=2' in mockHttpRequest.call_args[1]['body'])
def get_initial_oauth_tokens(self): url = 'https://api.twitter.com/oauth/request_token' client = oauth2.Client(self.consumer) resp, content = client.request(url, 'GET') if resp.status != 200: raise Error('{} from Twitter'.format(resp.status)) oauth_values = dict(parse_qsl(content.decode('utf-8'))) self.oauth_token = oauth_values.get('oauth_token') self.oauth_token_secret = oauth_values.get('oauth_token_secret') if not self.oauth_token or not self.oauth_token_secret: raise Error('No oauth_token or oauth_token_secret from Twitter') return (self.oauth_token, self.oauth_token_secret,)
def __init__( self, consumer_key, consumer_secret, user_token, user_secret, logger): logger.info("OAuth:\nConsumer Key:{}\nConsumer Secret:{}\nUser Token:{}\nUser Secret:{}".format(consumer_key,consumer_secret,user_token,user_secret)) consumer = oauth.Consumer(consumer_key,consumer_secret) client = oauth.Client(consumer) access_token = oauth.Token(key=user_token, secret=user_secret) client = oauth.Client(consumer,access_token,timeout=MediaConnection.timeout) self.client = client super().__init__(logger)
def twitter_oauth1(user_id, token_nickname, country_filter): consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret) client = oauth.Client(consumer) # Step 1: Get a request token. This is a temporary token that is used for # having the user authorize an access token and to sign the request to obtain # said access token. resp, content = client.request(request_token_url, "GET") if resp['status'] != '200': raise Exception("Invalid response %s." % resp['status']) request_token = dict(urlparse.parse_qsl(content)) auth_tokens = db.OAuthToken.query( db.OAuthToken.user_id == user_id, db.OAuthToken.token_nickname == token_nickname, db.OAuthToken.application == db.APP_TWITTER ).fetch(1) if auth_tokens: auth_token = auth_tokens[0] else: auth_token = db.OAuthToken() auth_token.user_id = user_id auth_token.token_nickname = token_nickname auth_token.application = db.APP_TWITTER auth_token.temp_oauth_token = request_token['oauth_token'] auth_token.temp_oauth_token_secret = request_token['oauth_token_secret'] if country_filter: auth_token.country_filters += country_filter.upper() auth_token.put() # Step 2: Redirect to the provider. Since this is a CLI script we do not # redirect. In a web application you would redirect the user to the URL # below. return "%s?oauth_token=%s" % (authorize_url, request_token['oauth_token']) # user comes to: # /sign-in-with-twitter/? # oauth_token=NPcudxy0yU5T3tBzho7iCotZ3cnetKwcTIRlX0iwRl0& # oauth_verifier=uw7NjWHT6OJ1MpJOXsHfNxoAhPKpgI8BlYDhxEjIBY
def request_2legged(url, http_method="GET"): client = oauth.Client(_consumer()) response, content = client.request( url, headers = {"Content-Type":"application/x-www-form-urlencoded"}, body="country=%s" % api_settings.country, method = http_method ) return response, content
def request_access_token(token): client = oauth.Client(_consumer(), token=token) response, content = client.request( ACCESS_TOKEN_URL, headers={"Content-Type":"application/x-www-form-urlencoded"} ) return _token_from_response_content(content)
def request_3legged(url, access_token, http_method="GET", body=''): ''' Once you have an access_token authorized by a customer, execute a request on their behalf ''' client = oauth.Client(_consumer(), token=access_token) response = client.request( url, headers={"Content-Type":"application/x-www-form-urlencoded"}, method=http_method, body=body ) return response
def oauthReq(self, url, http_method="GET", post_body=None, http_headers=None): config = self.parseConfig() consumer = oauth.Consumer(key=config.get('consumer_key'), secret=config.get('consumer_secret')) token = oauth.Token(key=config.get('access_token'), secret=config.get('access_token_secret')) client = oauth.Client(consumer, token) resp, content = client.request( url, method=http_method, body=post_body or '', headers=http_headers ) return content