我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用oauth2.SignatureMethod_HMAC_SHA1()。
def test_set_signature_method(self): consumer = oauth.Consumer('key', 'secret') client = oauth.Client(consumer) class Blah: pass try: client.set_signature_method(Blah()) self.fail("Client.set_signature_method() accepted invalid method.") except ValueError: pass m = oauth.SignatureMethod_HMAC_SHA1() client.set_signature_method(m) self.assertEqual(m, client.method)
def sign_request(self, url, url_params={}): oauth_request = oauth2.Request( method="GET", url=url, parameters=url_params ) oauth_request.update( { 'oauth_nonce': oauth2.generate_nonce(), 'oauth_timestamp': oauth2.generate_timestamp(), 'oauth_token': self.token.key, 'oauth_consumer_key': self.consumer.key } ) oauth_request.sign_request( oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, self.token ) return oauth_request.to_url()
def verifySignature(self, secret): """See L{IOAuthCredentials#verifySignature}.""" consumer = Consumer(key=self.consumerKey, secret=secret) oauthRequest = Request.from_request( self.method, self.url, headers=self.headers, query_string=self.arguments) # verify the request has been oauth authorized, we only support # HMAC-SHA1, reject OAuth signatures if they use a different method if self.signatureMethod != 'HMAC-SHA1': raise NotImplementedError( 'Unknown signature method: %s' % self.signatureMethod) signatureMethod = SignatureMethod_HMAC_SHA1() result = signatureMethod.check(oauthRequest, consumer, None, self.signature) return result
def __init__(self, access_token_key, access_token_secret, consumer_key, consumer_secret): self.access_token_key = access_token_key self.access_token_secret = access_token_secret self.consumer_key = consumer_key self.consumer_secret = consumer_secret _debug = 0 self.oauth_token = oauth.Token(key=self.access_token_key, secret=self.access_token_secret) self.oauth_consumer = oauth.Consumer(key=self.consumer_key, secret=self.consumer_secret) self.signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() self.http_handler = urllib.HTTPHandler(debuglevel=_debug) self.https_handler = urllib.HTTPSHandler(debuglevel=_debug)
def signed_request(url, access_token=None): ''' return a signed request. Usually not used, save for specific functions like deriving a preview-clip URL. ''' consumer = _consumer() req = oauth.Request.from_consumer_and_token( consumer, http_url=url, is_form_encoded=True, parameters={'country':api_settings.country}) signing_method = oauth.SignatureMethod_HMAC_SHA1() req.sign_request(signing_method, consumer, access_token) return req
def __init__(self, api_key, shared_secret, httplib2_inst=None): """Override init to ensure required args""" super(TwoLegged, self).__init__(api_key, shared_secret, httplib2_inst) self.endpoint = PRIVATE_ENDPOINT self.hmac_sha1_signature = oauth.SignatureMethod_HMAC_SHA1() self.plaintext_signature = oauth.SignatureMethod_PLAINTEXT()
def __init__(self, api_key, shared_secret, httplib2_inst=None): """Override init to ensure required args""" super(TwoLegged, self).__init__(api_key, shared_secret, httplib2_inst) self.endpoint = PRIVATE_ENDPOINT self.scheme = HTTPS_SCHEME self.hmac_sha1_signature = oauth.SignatureMethod_HMAC_SHA1() self.plaintext_signature = oauth.SignatureMethod_PLAINTEXT()
def test_unset_consumer_and_token(self): consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') token = oauth.Token('my_key', 'my_secret') request = oauth.Request("GET", "http://example.com/fetch.php") request.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, token) self.assertEqual(consumer.key, request['oauth_consumer_key']) self.assertEqual(token.key, request['oauth_token'])
def test_no_url_set(self): consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') token = oauth.Token('my_key', 'my_secret') request = oauth.Request() self.assertRaises(ValueError, request.sign_request, oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_signature_base_unicode_nonascii(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = u('http://api.simplegeo.com:80/1.0/places/address.json' '?q=monkeys&category=animal' '&address=41+Decatur+St,+San+Francisc') + _U2766 + u(',+CA') req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_string_bytes_nonascii_nonutf8(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = (b'http://api.simplegeo.com:80/1.0/places/address.json' b'?q=monkeys&category=animal' b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA' req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( #XXX req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = (b'http://api.simplegeo.com:80/1.0/places/address.json' b'?q=monkeys&category=animal' b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA') req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signing_base(self): # example copied from # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js # which in turns says that it was copied from # http://oauth.net/core/1.0/#sig_base_example . url = ("http://photos.example.net/photos?file=vacation.jpg" "&oauth_consumer_key=dpf43f3p2l4k3l03" "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1" "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk" "&oauth_version=1.0&size=original") req = oauth.Request("GET", url) sm = oauth.SignatureMethod_HMAC_SHA1() consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo') key, raw = sm.signing_base(req, consumer, None) expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos' '&file%3Dvacation.jpg' '%26oauth_consumer_key%3Ddpf43f3p2l4k3l03' '%26oauth_nonce%3Dkllo9940pd9333jh' '%26oauth_signature_method%3DHMAC-SHA1' '%26oauth_timestamp%3D1191242096' '%26oauth_token%3Dnnch734d00sl2jdk' '%26oauth_version%3D1.0%26size%3Doriginal') self.assertEqual(expected, raw)
def test_init(self): server = oauth.Server(signature_methods={'HMAC-SHA1' : oauth.SignatureMethod_HMAC_SHA1()}) self.assertTrue('HMAC-SHA1' in server.signature_methods) self.assertTrue(isinstance(server.signature_methods['HMAC-SHA1'], oauth.SignatureMethod_HMAC_SHA1)) server = oauth.Server() self.assertEqual(server.signature_methods, {})
def test_add_signature_method(self): server = oauth.Server() res = server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertTrue(len(res) == 1) self.assertTrue('HMAC-SHA1' in res) self.assertTrue(isinstance(res['HMAC-SHA1'], oauth.SignatureMethod_HMAC_SHA1)) res = server.add_signature_method(oauth.SignatureMethod_PLAINTEXT()) self.assertTrue(len(res) == 2) self.assertTrue('PLAINTEXT' in res) self.assertTrue(isinstance(res['PLAINTEXT'], oauth.SignatureMethod_PLAINTEXT))
def test_verify_request(self): server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) parameters = server.verify_request(self.request, self.consumer, self.token) self.assertTrue('bar' in parameters) self.assertTrue('foo' in parameters) self.assertTrue('multi' in parameters) self.assertEqual(parameters['bar'], 'blerg') self.assertEqual(parameters['foo'], 59) self.assertEqual(parameters['multi'], ['FOO','BAR'])
def test_verify_request_invalid_signature(self): server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.request['oauth_signature'] = 'BOGUS' self.assertRaises(oauth.Error, server.verify_request, self.request, self.consumer, self.token)
def test_verify_request_invalid_timestamp(self): server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.request['oauth_timestamp'] -= 86400 self.assertRaises(oauth.Error, server.verify_request, self.request, self.consumer, self.token)
def test_invalid_version(self): url = "http://sp.example.com/" params = { 'oauth_version': '222.9922', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['foo','bar'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() request.sign_request(signature_method, consumer, token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_invalid_signature_method(self): url = "http://sp.example.com/" params = { 'oauth_version': '1.0', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = SignatureMethod_Bad() request.sign_request(signature_method, consumer, token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_missing_signature(self): url = "http://sp.example.com/" params = { 'oauth_version': '1.0', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() request.sign_request(signature_method, consumer, token) del request['oauth_signature'] server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.MissingSignature, server.verify_request, request, consumer, token) # Request Token: http://oauth-sandbox.sevengoslings.net/request_token # Auth: http://oauth-sandbox.sevengoslings.net/authorize # Access Token: http://oauth-sandbox.sevengoslings.net/access_token # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged # Key: bd37aed57e15df53 # Secret: 0e9e6413a9ef49510a4f68ed02cd
def test_url_with_query_string(self, mockHttpRequest): uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf' client = oauth.Client(self.consumer, None) random_result = random.randint(1,100) def mockrequest(cl, ur, **kw): self.assertTrue(cl is client) self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) self.assertEqual(kw['body'], b'') self.assertEqual(kw['connection_type'], None) self.assertEqual(kw['method'], 'GET') self.assertEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) self.assertTrue(isinstance(kw['headers'], dict)) req = oauth.Request.from_consumer_and_token(self.consumer, None, http_method='GET', http_url=uri, parameters={}) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None) expected = parse_qsl( urlparse(req.to_url()).query) actual = parse_qsl(urlparse(ur).query) self.assertEqual(len(expected), len(actual)) actual = dict(actual) for key, value in expected: if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'): self.assertEqual(actual[key], value) return random_result mockHttpRequest.side_effect = mockrequest client.request(uri, 'GET')
def __init__(self, key: str, secret: str) -> None: super(RequestValidatorMixin, self).__init__() self.consumer_key = key self.consumer_secret = secret self.oauth_server = oauth2.Server() signature_method = oauth2.SignatureMethod_HMAC_SHA1() self.oauth_server.add_signature_method(signature_method) self.oauth_consumer = oauth2.Consumer( self.consumer_key, self.consumer_secret )
def SetCredentials(self, consumer_key, consumer_secret, access_token_key=None, access_token_secret=None): '''Set the consumer_key and consumer_secret for this instance Args: consumer_key: The consumer_key of the twitter account. consumer_secret: The consumer_secret for the twitter account. access_token_key: The oAuth access token key value you retrieved from running get_access_token.py. access_token_secret: The oAuth access token's secret, also retrieved from the get_access_token.py run. ''' self._consumer_key = consumer_key self._consumer_secret = consumer_secret self._access_token_key = access_token_key self._access_token_secret = access_token_secret self._oauth_consumer = None if consumer_key is not None and consumer_secret is not None and \ access_token_key is not None and access_token_secret is not None: self._signature_method_plaintext = oauth.SignatureMethod_PLAINTEXT() self._signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() self._oauth_token = oauth.Token(key=access_token_key, secret=access_token_secret) self._oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
def post_multipart(self, url, params, files): """ Generates and issues a multipart request for data files :param url: a string, the url you are requesting :param params: a dict, a key-value of all the parameters :param files: a list, the list of tuples for your data :returns: a dict parsed from the JSON response """ # combine the parameters with the generated oauth params params = dict(params.items() + self.generate_oauth_params().items()) faux_req = oauth.Request(method="POST", url=url, parameters=params) faux_req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, self.token) params = dict(parse_qsl(faux_req.to_postdata())) content_type, body = self.encode_multipart_formdata(params, files) headers = {'Content-Type': content_type, 'Content-Length': str(len(body))} # Do a bytearray of the body and everything seems ok r = urllib2.Request(url, body, headers) if self.proxy_url: proxy = urllib2.ProxyHandler({'http': self.proxy_url, 'https': self.proxy_url}) opener = urllib2.build_opener(proxy) urllib2.install_opener(opener) content = urllib2.urlopen(r).read() return self.json_parse(content)
def get_preview_url(api_key, api_secret, sdl_id): consumer = oauth.Consumer(api_key, api_secret) request_url = "http://previews.7digital.com/clip/%s" % sdl_id req = oauth.Request(method="GET", url=request_url, is_form_encoded=True) req['oauth_timestamp'] = oauth.Request.make_timestamp() req['oauth_nonce'] = oauth.Request.make_nonce() req['country'] = 'US' sig_method = oauth.SignatureMethod_HMAC_SHA1() req.sign_request(sig_method, consumer, token=None) url = req.to_url() return url, False
def superSecureRequest(self, host, path, url_params=None): """Prepares OAuth authentication and sends the request to the API. Args: host (str): The domain host of the API. path (str): The path of the API after the domain. url_params (dict): An optional set of query parameters in the request. Returns: dict: The JSON response from the request. Raises: urllib2.HTTPError: An error occurs from the HTTP request. """ url_params = url_params or {} url = 'https://{0}{1}?'.format(host, urllib.quote(path.encode('utf8'))) consumer = oauth2.Consumer(self.oath_consumer_key, self.oath_consumer_secret) oauth_request = oauth2.Request( method="GET", url=url, parameters=url_params) oauth_request.update( { 'oauth_nonce': oauth2.generate_nonce(), 'oauth_timestamp': oauth2.generate_timestamp(), 'oauth_token': self.oath_token, 'oauth_consumer_key': self.oath_consumer_key } ) token = oauth2.Token(self.oath_token, self.oath_token_secret) oauth_request.sign_request( oauth2.SignatureMethod_HMAC_SHA1(), consumer, token) signed_url = oauth_request.to_url() conn = urllib2.urlopen(signed_url, None) try: response = json.loads(conn.read().decode()) finally: conn.close() return response