我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用oauth2.Consumer()。
def oauth_login(self, url, oauth_token, oauth_token_secret, consumer_key='anonymous', consumer_secret='anonymous'): """Authenticate using the OAUTH method. This only works with IMAP servers that support OAUTH (e.g. Gmail). """ if oauth_module: token = oauth_module.Token(oauth_token, oauth_token_secret) consumer = oauth_module.Consumer(consumer_key, consumer_secret) xoauth_callable = lambda x: oauth_module.build_xoauth_string(url, consumer, token) return self._command_and_check('authenticate', 'XOAUTH', xoauth_callable, unpack=True) else: raise self.Error( 'The optional oauth2 package is needed for OAUTH authentication')
def __two_legged_request(self, parameters=None, method=None): """Sign a request for two-legged authentication""" params = self.get_base_params() if parameters: params.update(parameters) url = self.endpoint yql_logger.debug("params: %s", params) yql_logger.debug("endpoint_url: %s", url) if not method: method = "GET" consumer = oauth.Consumer(self.api_key, self.secret) request = oauth.Request(method=method, url=url, parameters=params) sig = self.get_signature(url) yql_logger.debug("signature: %s", sig) request.sign_request(sig, consumer, None) return request
def __two_legged_request(self, resource_url, parameters=None, method=None): """Sign a request for two-legged authentication""" params = self.get_base_params() if parameters: params.update(parameters) yql_logger.debug("params: %s", params) yql_logger.debug("resource_url: %s", resource_url) if not method: method = "GET" consumer = oauth.Consumer(self.api_key, self.secret) request = oauth.Request(method=method, url=resource_url, parameters=params) request.sign_request(self.hmac_sha1_signature, consumer, None) return request
def test_set_signature_method(self): consumer = oauth.Consumer('key', 'secret') client = oauth.Client(consumer) class Blah: pass try: client.set_signature_method(Blah()) self.fail("Client.set_signature_method() accepted invalid method.") except ValueError: pass m = oauth.SignatureMethod_HMAC_SHA1() client.set_signature_method(m) self.assertEqual(m, client.method)
def make_request(self, url, method="GET", body="", headers=None): """Makes a request to the TradeKing API.""" consumer = Consumer(key=TRADEKING_CONSUMER_KEY, secret=TRADEKING_CONSUMER_SECRET) token = Token(key=TRADEKING_ACCESS_TOKEN, secret=TRADEKING_ACCESS_TOKEN_SECRET) client = Client(consumer, token) self.logs.debug("TradeKing request: %s %s %s %s" % (url, method, body, headers)) response, content = client.request(url, method=method, body=body, headers=headers) self.logs.debug("TradeKing response: %s %s" % (response, content)) try: return loads(content) except ValueError: self.logs.error("Failed to decode JSON response: %s" % content) return None
def verifySignature(self, secret): """See L{IOAuthCredentials#verifySignature}.""" consumer = Consumer(key=self.consumerKey, secret=secret) oauthRequest = Request.from_request( self.method, self.url, headers=self.headers, query_string=self.arguments) # verify the request has been oauth authorized, we only support # HMAC-SHA1, reject OAuth signatures if they use a different method if self.signatureMethod != 'HMAC-SHA1': raise NotImplementedError( 'Unknown signature method: %s' % self.signatureMethod) signatureMethod = SignatureMethod_HMAC_SHA1() result = signatureMethod.check(oauthRequest, consumer, None, self.signature) return result
def get_request_token(self): """ Function for generating initial request token for three-legged oauth Takes no input, returns a request_token dict with two keys, oauth_token and oauth_token_secret """ callback_url = self.request.build_absolute_uri(reverse('bot-authorize', args=(self.object.pk, ))) consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) client = oauth.Client(consumer) resp, content = client.request(self.REQUEST_TOKEN_URL, "POST", body=urllib.parse.urlencode({'oauth_callback': callback_url})) if resp['status'] != '200': raise Exception("Invalid response %s." % resp['status']) request_token = dict(urllib.parse.parse_qsl(content)) # urllib returns bytes, which will need to be decoded using the string.decode() method before use request_token = {key.decode(): value.decode() for key, value in request_token.items()} # Return the token dict containing token and secret return request_token
def get_access_token(self, oauth_token, oauth_token_secret, oauth_verifier): """ Func for final leg of three-legged oauth, takes token and secret returned in oauth_callback GET dict and exchanges them for the permanent access token and secret returns an access_token dict with two keys, oauth_token and oauth_token_secret """ token = oauth.Token(oauth_token, oauth_token_secret) token.set_verifier(oauth_verifier) consumer = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) client = oauth.Client(consumer, token) # Now we fire the request at access token url instead of request token resp, content = client.request(self.ACCESS_TOKEN_URL, "POST") if resp['status'] != '200': raise Exception("Invalid response %s." % resp['status']) access_token = dict(urllib.parse.parse_qsl(content)) # urllib returns bytes, which will need to be decoded using the string.decode() method before use access_token = {key.decode(): value.decode() for key, value in access_token.items()} # Return the token dict containing token and secret return access_token
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret): self.access_token_key = access_token_key self.access_token_secret = access_token_secret self.consumer_key = consumer_key self.consumer_secret = consumer_secret _debug = 0 self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret) self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret) self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() self.http_handler = urllib.HTTPHandler(debuglevel=_debug) self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def __init__(self, consumer_key, consumer_secret="", oauth_token="", oauth_secret="", host="https://api.tumblr.com", proxy_url=None): self.host = host self.consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret) self.token = oauth.Token(key=oauth_token, secret=oauth_secret) self.proxy_url = proxy_url if proxy_url: print("Generating Proxy From proxy_url") self.proxy_info = httplib2.proxy_info_from_url("https://" + proxy_url, 'http') self.proxy_info.proxy_rdns = True # uri = urlparse(proxy_url) # self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP,uri.hostname,uri.port,proxy_rdns=True) else: print("Generating proxy from ENV") proxy_url = os.environ.get('HTTPS_PROXY', None) if proxy_url: uri = urlparse(proxy_url) self.proxy_info = ProxyInfo(socks.PROXY_TYPE_HTTP, uri.hostname, uri.port, proxy_rdns=True) else: self.proxy_info = None
def twitter_oauth2(oauth_token, oauth_verifier): auth_tokens = db.OAuthToken.query(db.OAuthToken.temp_oauth_token == oauth_token, db.OAuthToken.application == db.APP_TWITTER).fetch(1) if not auth_tokens: return None auth_token = auth_tokens[0] # Step 3: Once the consumer has redirected the user back to the oauth_callback # URL you can request the access token the user has approved. You use the # request token to sign this request. After this is done you throw away the # request token and use the access token returned. You should store this # access token somewhere safe, like a database, for future use. token = oauth.Token(oauth_token, auth_token.temp_oauth_token_secret) token.set_verifier(oauth_verifier) consumer = oauth.Consumer(auth.consumer_key, auth.consumer_secret) client = oauth.Client(consumer, token) resp, content = client.request(access_token_url, "POST") access_token = dict(urlparse.parse_qsl(content)) auth_token.oauth_token = access_token['oauth_token'] auth_token.oauth_token_secret = access_token['oauth_token_secret'] auth_token.valid_token = True auth_token.time_between_posts = 5 * 60 # every 5 minutes auth_token.put() return auth_token
def __init__(self,consumer_key,consumer_secret,access_token=None): """ Initializes the splitwise class. Sets consumer and access token Args: consumer_key (str) : Consumer Key provided by Spliwise consumer_secret (str): Consumer Secret provided by Splitwise access_token (:obj: `dict`) Access Token is a combination of oauth_token and oauth_token_secret Returns: A Splitwise Object """ self.consumer = oauth.Consumer(consumer_key, consumer_secret) #If access token is present then set the Access token if access_token: self.setAccessToken(access_token)
def get_tweets(twitter_handle): CONSUMER_KEY = 'YurZ2aWgVEYVhE5c60XyWRGG0' CONSUMER_SECRET = 'TIaSl3xnVTNUzHrwz84lOycTYkwe9l1NocwkB4hGbd2ngMufn6' ACCESS_KEY = '564268368-tRdDLN3O9EKzlaY28NzHCgNsYJX59YRngv3qjjJh' ACCESS_SECRET = 'VUjRU2ftlmnUdqDtppMiV6LLqT83ZbKDDUjcpWtrT1PG4' consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET) access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) client = oauth.Client(consumer, access_token) endpoint = "https://api.twitter.com/1.1/search/tweets.json?q=from%3A" + twitter_handle + "&result_type=all&count=100" response, data = client.request(endpoint) tweet_data = json.loads(data.decode('utf-8')) tweets = [] for tweet in tweet_data['statuses']: tweets.append(re.sub(r'https:\/\/t\.co\/.{10}', '', tweet['text'] + ' \n')) return tweets #example # print(get_tweets("Cmdr_Hadfield"))
def __init__(self, api_key, shared_secret, httplib2_inst=None): """Override init to add consumer""" super(ThreeLegged, self).__init__( api_key, shared_secret, httplib2_inst) self.consumer = oauth.Consumer(self.api_key, self.secret)
def __init__(self, g, client_id, client_secret, auth_url, token_url, access_token_url, socket_timeout=60): self.globals = g self.client_id = client_id self.client_secret = client_secret self.code = None self.request = current.request self.session = current.session self.auth_url = auth_url self.token_url = token_url self.access_token_url = access_token_url self.socket_timeout = socket_timeout # consumer init self.consumer = oauth.Consumer(self.client_id, self.client_secret)
def _post_patched_request(lti_key, secret, body, url, method, content_type): # pylint: disable=too-many-arguments """ Authorization header needs to be capitalized for some LTI clients this function ensures that header is capitalized :param body: body of the call :param client: OAuth Client :param url: outcome url :return: response """ consumer = oauth2.Consumer(key=lti_key, secret=secret) client = oauth2.Client(consumer) import httplib2 http = httplib2.Http # pylint: disable=protected-access normalize = http._normalize_headers def my_normalize(self, headers): """ This function patches Authorization header """ ret = normalize(self, headers) if 'authorization' in ret: ret['Authorization'] = ret.pop('authorization') return ret http._normalize_headers = my_normalize monkey_patch_function = normalize response, content = client.request( url, method, body=body.encode("utf8"), headers={'Content-Type': content_type}) http = httplib2.Http # pylint: disable=protected-access http._normalize_headers = monkey_patch_function return response, content
def authenticate(self, url, consumer, token): if consumer is not None and not isinstance(consumer, oauth2.Consumer): raise ValueError("Invalid consumer.") if token is not None and not isinstance(token, oauth2.Token): raise ValueError("Invalid token.") self.docmd('AUTH', 'XOAUTH %s' % \ base64.b64encode(oauth2.build_xoauth_string(url, consumer, token)))
def authenticate(self, url, consumer, token): if consumer is not None and not isinstance(consumer, oauth2.Consumer): raise ValueError("Invalid consumer.") if token is not None and not isinstance(token, oauth2.Token): raise ValueError("Invalid token.") imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH', lambda x: oauth2.build_xoauth_string(url, consumer, token))
def __init__(self, api_key, shared_secret, httplib2_inst=None): """Override init to add consumer""" super(ThreeLegged, self).__init__( api_key, shared_secret, httplib2_inst) self.scheme = HTTP_SCHEME self.endpoint = PRIVATE_ENDPOINT self.consumer = oauth.Consumer(self.api_key, self.secret)
def setUp(self): self.key = 'my-key' self.secret = 'my-secret' self.consumer = oauth.Consumer(key=self.key, secret=self.secret)
def test_basic(self): self.assertRaises(ValueError, lambda: oauth.Consumer(None, None)) self.assertRaises(ValueError, lambda: oauth.Consumer('asf', None)) self.assertRaises(ValueError, lambda: oauth.Consumer(None, 'dasf'))
def test_no_url_set(self): consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') token = oauth.Token('my_key', 'my_secret') request = oauth.Request() self.assertRaises(ValueError, request.sign_request, oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_signature_base_unicode_nonascii(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = u('http://api.simplegeo.com:80/1.0/places/address.json' '?q=monkeys&category=animal' '&address=41+Decatur+St,+San+Francisc') + _U2766 + u(',+CA') req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_string_bytes_nonascii_nonutf8(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = (b'http://api.simplegeo.com:80/1.0/places/address.json' b'?q=monkeys&category=animal' b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA' req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( #XXX req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = (b'http://api.simplegeo.com:80/1.0/places/address.json' b'?q=monkeys&category=animal' b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA') req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_unicode_nonascii_nonutf8_url_encoded(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = u('http://api.simplegeo.com:80/1.0/places/address.json' '?q=monkeys&category=animal' '&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA') req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_from_consumer_and_token(self): url = "http://sp.example.com/" tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") tok.set_verifier('this_is_a_test_verifier') con = oauth.Consumer(key="con-test-key", secret="con-test-secret") req = oauth.Request.from_consumer_and_token(con, token=tok, http_method="GET", http_url=url) self.assertEqual(req['oauth_token'], tok.key) self.assertEqual(req['oauth_consumer_key'], con.key) self.assertEqual(tok.verifier, req['oauth_verifier'])
def test_no_version(self): url = "http://sp.example.com/" params = { 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } self.consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") self.token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = self.token.key params['oauth_consumer_key'] = self.consumer.key self.request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() self.request.sign_request(signature_method, self.consumer, self.token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) parameters = server.verify_request(self.request, self.consumer, self.token)
def test_invalid_version(self): url = "http://sp.example.com/" params = { 'oauth_version': '222.9922', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['foo','bar'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() request.sign_request(signature_method, consumer, token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_invalid_signature_method(self): url = "http://sp.example.com/" params = { 'oauth_version': '1.0', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = SignatureMethod_Bad() request.sign_request(signature_method, consumer, token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_missing_signature(self): url = "http://sp.example.com/" params = { 'oauth_version': '1.0', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() request.sign_request(signature_method, consumer, token) del request['oauth_signature'] server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.MissingSignature, server.verify_request, request, consumer, token) # Request Token: http://oauth-sandbox.sevengoslings.net/request_token # Auth: http://oauth-sandbox.sevengoslings.net/authorize # Access Token: http://oauth-sandbox.sevengoslings.net/access_token # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged # Key: bd37aed57e15df53 # Secret: 0e9e6413a9ef49510a4f68ed02cd
def setUp(self): self.consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret) self.body = { 'foo': 'bar', 'bar': 'foo', 'multi': ['FOO','BAR'], 'blah': 599999 }
def test_init_passes_kwargs_to_httplib2(self): class Blah(): pass consumer = oauth.Consumer('token', 'secret') # httplib2 options client = oauth.Client(consumer, None, cache='.cache', timeout=3, disable_ssl_certificate_validation=True) self.assertNotEqual(client.cache, None) self.assertEqual(client.timeout, 3)
def __init__( self, consumer_key, consumer_secret, token, token_secret ): self.consumer = oauth2.Consumer(consumer_key, consumer_secret) self.token = oauth2.Token(token, token_secret)
def post_replace_result( self, score: str, result_data: t.Mapping[str, str] = None ) -> 'OutcomeResponse': ''' POSTs the given score to the Tool Consumer with a replaceResult. OPTIONAL: result_data must be a dictionary Note: ONLY ONE of these values can be in the dict at a time, due to the Canvas specification. :param str text: text :param str url: url :param str launchUrl: The lti launch url ''' self.operation = REPLACE_REQUEST self.score = score self.result_data = result_data if result_data is not None: if len(result_data) > 1: error_msg = ( 'Dictionary result_data can only have one entry. ' '{0} entries were found.'.format(len(result_data)) ) raise ValueError(error_msg) elif any(a in result_data for a in ['url', 'text', 'launchUrl']): return self.post_outcome_request() else: error_msg = ( 'Dictionary result_data can only have the key ' '"text" or the key "url".' ) raise ValueError(error_msg) else: return self.post_outcome_request()
def post_delete_result(self) -> 'OutcomeResponse': ''' POSTs a deleteRequest to the Tool Consumer. ''' self.operation = DELETE_REQUEST return self.post_outcome_request()
def post_read_result(self) -> 'OutcomeResponse': ''' POSTS a readResult to the Tool Consumer. ''' self.operation = READ_REQUEST return self.post_outcome_request()
def __init__(self, key: str, secret: str) -> None: super(RequestValidatorMixin, self).__init__() self.consumer_key = key self.consumer_secret = secret self.oauth_server = oauth2.Server() signature_method = oauth2.SignatureMethod_HMAC_SHA1() self.oauth_server.add_signature_method(signature_method) self.oauth_consumer = oauth2.Consumer( self.consumer_key, self.consumer_secret )
def __init__(self, request_url, params, token_secret, token_cookie): self.request_url = request_url self.params = params self.token_secret = token_secret self.token_cookie = token_cookie self.status = False self.user_id = None self.user_name = None self.set_token_cookie = None self.oauth_verifier = params.get('oauth_verifier', None) self.oauth_token = params.get('oauth_token', None) self.oauth_token_secret = None self.consumer_key = os.environ.get('TWITTER_CONSUMER_KEY', None) if not self.consumer_key: raise Error('No TWITTER_CONSUMER_KEY environment value') self.consumer_secret = os.environ.get('TWITTER_CONSUMER_SECRET', None) if not self.consumer_secret: raise Error('No TWITTER_CONSUMER_SECRET environment value') self.consumer = oauth2.Consumer( key = self.consumer_key, secret = self.consumer_secret) if params.get('login') == 'start': self.start() elif self.oauth_verifier and self.oauth_token: self.finish() else: raise InvalidUsage('Invalid request')
def SetCredentials(self, consumer_key, consumer_secret, access_token_key=None, access_token_secret=None): '''Set the consumer_key and consumer_secret for this instance Args: consumer_key: The consumer_key of the twitter account. consumer_secret: The consumer_secret for the twitter account. access_token_key: The oAuth access token key value you retrieved from running get_access_token.py. access_token_secret: The oAuth access token's secret, also retrieved from the get_access_token.py run. ''' self._consumer_key = consumer_key self._consumer_secret = consumer_secret self._access_token_key = access_token_key self._access_token_secret = access_token_secret self._oauth_consumer = None if consumer_key is not None and consumer_secret is not None and \ access_token_key is not None and access_token_secret is not None: self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT() self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret) self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)