我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用oauth2.Token()。
def decode_oauth_token_secret(self): if not self.token_cookie: raise Error('No token cookie given') try: payload = jwt.decode(self.token_cookie, self.token_secret, algorithm = 'HS256') except: raise Error('Failed to retrieve oauth_token_secret from token') self.oauth_token_secret = payload.get('data', {}).get('id', None) if not self.oauth_token_secret: raise Error('Token does not have an oauth_token_secret') return self.oauth_token_secret
def oauth_login(self, url, oauth_token, oauth_token_secret, consumer_key='anonymous', consumer_secret='anonymous'): """Authenticate using the OAUTH method. This only works with IMAP servers that support OAUTH (e.g. Gmail). """ if oauth_module: token = oauth_module.Token(oauth_token, oauth_token_secret) consumer = oauth_module.Consumer(consumer_key, consumer_secret) xoauth_callable = lambda x: oauth_module.build_xoauth_string(url, consumer, token) return self._command_and_check('authenticate', 'XOAUTH', xoauth_callable, unpack=True) else: raise self.Error( 'The optional oauth2 package is needed for OAUTH authentication')
def get_token_and_auth_url(self, callback_url=None): """First step is to get the token and then send the request that provides the auth URL Returns a tuple of token and the authorisation URL. """ client = oauth.Client(self.consumer) params = {} params['oauth_callback'] = callback_url or 'oob' request = oauth.Request(parameters=params) url = REQUEST_TOKEN_URL resp, content = client.request(url, "POST", request.to_postdata()) if resp.get('status') == '200': token = oauth.Token.from_string(content) yql_logger.debug("token: %s", token) data = dict(parse_qsl(content)) yql_logger.debug("data: %s", data) return token, data['xoauth_request_auth_url'] else: raise YQLError(resp, content, url)
def get_token_and_auth_url(self, callback_url=None): """First step is to get the token and then send the request that provides the auth URL Returns a tuple of token and the authorisation URL. """ client = oauth.Client(self.consumer) params = {} params['oauth_callback'] = callback_url or 'oob' request = oauth.Request(parameters=params) url = REQUEST_TOKEN_URL resp, content = client.request(url, "POST", request.to_postdata()) if resp.get('status') == '200': token = oauth.Token.from_string(content) yql_logger.debug("token: %s", token) data = dict(parse_qsl(content)) yql_logger.debug("data: %s", data) return token, data['xoauth_request_auth_url'] else: raise YQLError, (resp, content, url)
def test_from_string(self): self.assertRaises(ValueError, lambda: oauth.Token.from_string('')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('blahblahblah')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('blah=blah')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=asfdasf')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token_secret=')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=asfdasf')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=&oauth_token_secret=')) self.assertRaises(ValueError, lambda: oauth.Token.from_string('oauth_token=tooken%26oauth_token_secret=seecret')) string = self.token.to_string() new = oauth.Token.from_string(string) self._compare_tokens(new) self.token.set_callback('http://www.example.com/my-callback') string = self.token.to_string() new = oauth.Token.from_string(string) self._compare_tokens(new)
def test_from_token_and_callback(self): url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") req = oauth.Request.from_token_and_callback(tok) self.assertFalse('oauth_callback' in req) self.assertEqual(req['oauth_token'], tok.key) req = oauth.Request.from_token_and_callback(tok, callback=url) self.assertTrue('oauth_callback' in req) self.assertEqual(req['oauth_callback'], url)
def setUp(self): url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } self.consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") self.token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = self.token.key params['oauth_consumer_key'] = self.consumer.key self.request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() self.request.sign_request(signature_method, self.consumer, self.token)
def test_init(self): class Blah(): pass try: client = oauth.Client(Blah()) self.fail("Client.__init__() accepted invalid Consumer.") except ValueError: pass consumer = oauth.Consumer('token', 'secret') try: client = oauth.Client(consumer, Blah()) self.fail("Client.__init__() accepted invalid Token.") except ValueError: pass
def make_request(self, url, method="GET", body="", headers=None): """Makes a request to the TradeKing API.""" consumer = Consumer(key=TRADEKING_CONSUMER_KEY, secret=TRADEKING_CONSUMER_SECRET) token = Token(key=TRADEKING_ACCESS_TOKEN, secret=TRADEKING_ACCESS_TOKEN_SECRET) client = Client(consumer, token) self.logs.debug("TradeKing request: %s %s %s %s" % (url, method, body, headers)) response, content = client.request(url, method=method, body=body, headers=headers) self.logs.debug("TradeKing response: %s %s" % (response, content)) try: return loads(content) except ValueError: self.logs.error("Failed to decode JSON response: %s" % content) return None
def get_user_information(self): token = oauth2.Token(self.oauth_token, self.oauth_token_secret) token.set_verifier(self.oauth_verifier) client = oauth2.Client(self.consumer, token) url = 'https://api.twitter.com/oauth/access_token' resp, content = client.request(url, 'POST') if resp.status != 200: raise Error('{} from Twitter'.format(resp.status)) provider_user = dict(parse_qsl(content.decode('utf-8'))) if 'user_id' not in provider_user: raise Error('No user_id from Twitter') self.status = 200 self.user_id = provider_user.get('user_id') self.user_name = provider_user.get('screen_name', None) return (self.user_id, self.user_name,)
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier): """ Func for final leg of three-legged oauth, takes token and secret returned in oauth_callback GET dict and exchanges them for the permanent access token and secret returns an access_token dict with two keys, oauth_token and oauth_token_secret """ token = oauth.Token(oauth_token, oauth_token_secret) token.set_verifier(oauth_verifier) consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) client = oauth.Client(consumer, token) # Now we fire the request at access token url instead of request token resp, content = client.request(self.ACCESS_TOKEN_URL, "POST") if resp['status'] != '200': raise Exception("Invalid response %s." % resp['status']) access_token = dict(urllib.parse.parse_qsl(content)) # urllib returns bytes, which will need to be decoded using the string.decode() method before use access_token = {key.decode(): value.decode() for key, value in access_token.items()} # Return the token dict containing token and secret return access_token
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret): self.access_token_key = access_token_key self.access_token_secret = access_token_secret self.consumer_key = consumer_key self.consumer_secret = consumer_secret _debug = 0 self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret) self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret) self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() self.http_handler = urllib.HTTPHandler(debuglevel=_debug) self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def __init__(self, consumer_key, consumer_secret="", oauth_token="", oauth_secret="", host="https://api.tumblr.com", proxy_url=None): self.host = host self.consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret) self.token = oauth.Token(key=oauth_token, secret=oauth_secret) self.proxy_url = proxy_url if proxy_url: print("Generating Proxy From proxy_url") self.proxy_info = httplib2.proxy_info_from_url("https://" + proxy_url, 'http') self.proxy_info.proxy_rdns = True # uri = urlparse(proxy_url) # self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP,uri.hostname,uri.port,proxy_rdns=True) else: print("Generating proxy from ENV") proxy_url = os.environ.get('HTTPS_PROXY', None) if proxy_url: uri = urlparse(proxy_url) self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP, uri.hostname, uri.port, proxy_rdns=True) else: self.proxy_info = None
def twitter_oauth2(oauth_token, oauth_verifier): auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1) if not auth_tokens: return None auth_token = auth_tokens[0] # Step 3: Once the consumer has redirected the user back to the oauth_callback # URL you can request the access token the user has approved. You use the # request token to sign this request. After this is done you throw away the # request token and use the access token returned. You should store this # access token somewhere safe, like a database, for future use. token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret) token.set_verifier(oauth_verifier) consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret) client = oauth.Client(consumer, token) resp, content = client.request(access_token_url, "POST") access_token = dict(urlparse.parse_qsl(content)) auth_token.oauth_token = access_token['oauth_token'] auth_token.oauth_token_secret = access_token['oauth_token_secret'] auth_token.valid_token = True auth_token.time_between_posts = 5 * 60 # every 5 minutes auth_token.put() return auth_token
def __init__(self,consumer_key,consumer_secret,access_token=None): """ Initializes the splitwise class. Sets consumer and access token Args: consumer_key (str) : Consumer Key provided by Spliwise consumer_secret (str): Consumer Secret provided by Splitwise access_token (:obj: `dict`) Access Token is a combination of oauth_token and oauth_token_secret Returns: A Splitwise Object """ self.consumer = oauth.Consumer(consumer_key, consumer_secret) #If access token is present then set the Access token if access_token: self.setAccessToken(access_token)
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier): token = oauth.Token(oauth_token,oauth_token_secret) token.set_verifier(oauth_verifier) client = oauth.Client(self.consumer, token) resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST") if Splitwise.isDebug(): print(resp, content) #Check if the response is correct if resp['status'] != '200': raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status']) access_token = dict(parse_qsl(content.decode("utf-8"))) return access_token
def get_tweets(twitter_handle): CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0' CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6' ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh' ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4' consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET) access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) client = oauth.Client(consumer, access_token) endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100" response, data = client.request(endpoint) tweet_data = json.loads(data.decode('utf-8')) tweets = [] for tweet in tweet_data['statuses']: tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n')) return tweets #example # print(get_tweets("Cmdr_Hadfield"))
def authenticate(self, url, consumer, token): if consumer is not None and not isinstance(consumer, oauth2.Consumer): raise ValueError("Invalid consumer.") if token is not None and not isinstance(token, oauth2.Token): raise ValueError("Invalid token.") self.docmd('AUTH', 'XOAUTH %s' % \ base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
def authenticate(self, url, consumer, token): if consumer is not None and not isinstance(consumer, oauth2.Consumer): raise ValueError("Invalid consumer.") if token is not None and not isinstance(token, oauth2.Token): raise ValueError("Invalid token.") imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH', lambda x: oauth2.build_xoauth_string(url, consumer, token))
def setUp(self): self.key = 'my-key' self.secret = 'my-secret' self.token = oauth.Token(self.key, self.secret)
def test_basic(self): self.assertRaises(ValueError, lambda: oauth.Token(None, None)) self.assertRaises(ValueError, lambda: oauth.Token('asf', None)) self.assertRaises(ValueError, lambda: oauth.Token(None, 'dasf'))
def test_unset_consumer_and_token(self): consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') token = oauth.Token('my_key', 'my_secret') request = oauth.Request("GET", "http://example.com/fetch.php") request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, token) self.assertEqual(consumer.key, request['oauth_consumer_key']) self.assertEqual(token.key, request['oauth_token'])
def test_no_url_set(self): consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') token = oauth.Token('my_key', 'my_secret') request = oauth.Request() self.assertRaises(ValueError, request.sign_request, oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_from_consumer_and_token(self): url = "http://sp.example.com/" tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") tok.set_verifier('this_is_a_test_verifier') con = oauth.Consumer(key="con-test-key", secret="con-test-secret") req = oauth.Request.from_consumer_and_token(con, token=tok, http_method="GET", http_url=url) self.assertEqual(req['oauth_token'], tok.key) self.assertEqual(req['oauth_consumer_key'], con.key) self.assertEqual(tok.verifier, req['oauth_verifier'])
def test_no_version(self): url = "http://sp.example.com/" params = { 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } self.consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") self.token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = self.token.key params['oauth_consumer_key'] = self.consumer.key self.request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() self.request.sign_request(signature_method, self.consumer, self.token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) parameters = server.verify_request(self.request, self.consumer, self.token)
def test_invalid_version(self): url = "http://sp.example.com/" params = { 'oauth_version': '222.9922', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['foo','bar'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() request.sign_request(signature_method, consumer, token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_missing_signature(self): url = "http://sp.example.com/" params = { 'oauth_version': '1.0', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() request.sign_request(signature_method, consumer, token) del request['oauth_signature'] server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.MissingSignature, server.verify_request, request, consumer, token) # Request Token: http://oauth-sandbox.sevengoslings.net/request_token # Auth: http://oauth-sandbox.sevengoslings.net/authorize # Access Token: http://oauth-sandbox.sevengoslings.net/access_token # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged # Key: bd37aed57e15df53 # Secret: 0e9e6413a9ef49510a4f68ed02cd
def __init__( self, consumer_key, consumer_secret, token, token_secret ): self.consumer = oauth2.Consumer(consumer_key, consumer_secret) self.token = oauth2.Token(token, token_secret)
def is_valid_request( self, request: t.Any, parameters: t.MutableMapping[str, str] = {}, fake_method: t.Any = None, handle_error: bool = True ) -> bool: ''' Validates an OAuth request using the python-oauth2 library: https://github.com/simplegeo/python-oauth2 ''' def handle(e: oauth2.Error) -> bool: if handle_error: return False else: raise e try: method, url, headers, parameters = self.parse_request( request, parameters, fake_method ) oauth_request = oauth2.Request.from_request( method, url, headers=headers, parameters=parameters ) oauth2.Token self.oauth_server.verify_request( oauth_request, self.oauth_consumer, {} ) except oauth2.Error as e: return handle(e) except ValueError as e: return handle(e) # Signature was valid return True
def SetCredentials(self, consumer_key, consumer_secret, access_token_key=None, access_token_secret=None): '''Set the consumer_key and consumer_secret for this instance Args: consumer_key: The consumer_key of the twitter account. consumer_secret: The consumer_secret for the twitter account. access_token_key: The oAuth access token key value you retrieved from running get_access_token.py. access_token_secret: The oAuth access token's secret, also retrieved from the get_access_token.py run. ''' self._consumer_key = consumer_key self._consumer_secret = consumer_secret self._access_token_key = access_token_key self._access_token_secret = access_token_secret self._oauth_consumer = None if consumer_key is not None and consumer_secret is not None and \ access_token_key is not None and access_token_secret is not None: self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT() self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret) self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def __init__( self, consumer_key, consumer_secret, user_token, user_secret, logger): logger.info("OAuth:\nConsumer Key:{}\nConsumer Secret:{}\nUser Token:{}\nUser Secret:{}".format(consumer_key,consumer_secret,user_token,user_secret)) consumer = oauth.Consumer(consumer_key,consumer_secret) client = oauth.Client(consumer) access_token = oauth.Token(key=user_token, secret=user_secret) client = oauth.Client(consumer,access_token,timeout=MediaConnection.timeout) self.client = client super().__init__(logger)