我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用oauth2.Request()。
def __two_legged_request(self, parameters=None, method=None): """Sign a request for two-legged authentication""" params = self.get_base_params() if parameters: params.update(parameters) url = self.endpoint yql_logger.debug("params: %s", params) yql_logger.debug("endpoint_url: %s", url) if not method: method = "GET" consumer = oauth.Consumer(self.api_key, self.secret) request = oauth.Request(method=method, url=url, parameters=params) sig = self.get_signature(url) yql_logger.debug("signature: %s", sig) request.sign_request(sig, consumer, None) return request
def get_token_and_auth_url(self, callback_url=None): """First step is to get the token and then send the request that provides the auth URL Returns a tuple of token and the authorisation URL. """ client = oauth.Client(self.consumer) params = {} params['oauth_callback'] = callback_url or 'oob' request = oauth.Request(parameters=params) url = REQUEST_TOKEN_URL resp, content = client.request(url, "POST", request.to_postdata()) if resp.get('status') == '200': token = oauth.Token.from_string(content) yql_logger.debug("token: %s", token) data = dict(parse_qsl(content)) yql_logger.debug("data: %s", data) return token, data['xoauth_request_auth_url'] else: raise YQLError(resp, content, url)
def __two_legged_request(self, resource_url, parameters=None, method=None): """Sign a request for two-legged authentication""" params = self.get_base_params() if parameters: params.update(parameters) yql_logger.debug("params: %s", params) yql_logger.debug("resource_url: %s", resource_url) if not method: method = "GET" consumer = oauth.Consumer(self.api_key, self.secret) request = oauth.Request(method=method, url=resource_url, parameters=params) request.sign_request(self.hmac_sha1_signature, consumer, None) return request
def get_token_and_auth_url(self, callback_url=None): """First step is to get the token and then send the request that provides the auth URL Returns a tuple of token and the authorisation URL. """ client = oauth.Client(self.consumer) params = {} params['oauth_callback'] = callback_url or 'oob' request = oauth.Request(parameters=params) url = REQUEST_TOKEN_URL resp, content = client.request(url, "POST", request.to_postdata()) if resp.get('status') == '200': token = oauth.Token.from_string(content) yql_logger.debug("token: %s", token) data = dict(parse_qsl(content)) yql_logger.debug("data: %s", data) return token, data['xoauth_request_auth_url'] else: raise YQLError, (resp, content, url)
def test_get_nonoauth_parameters(self): oauth_params = { 'oauth_consumer': 'asdfasdfasdf' } other_params = { u('foo'): u('baz'), u('bar'): u('foo'), u('multi'): [u('FOO'), u('BAR')], u('uni_utf8'): u(b'\xae', 'latin1'), u('uni_unicode'): _UGLYPH, u('uni_unicode_2'): u(b'\xc3\xa5\xc3\x85\xc3\xb8\xc3\x98', 'latin1'), # 'åÅøØ' } params = oauth_params params.update(other_params) req = oauth.Request("GET", "http://example.com", params) self.assertEqual(other_params, req.get_nonoauth_parameters())
def test_to_postdata(self): realm = "http://sp.example.com/" params = { 'multi': ['FOO','BAR'], 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", realm, params) flat = [('multi','FOO'),('multi','BAR')] del params['multi'] flat.extend(params.items()) kf = lambda x: x[0] self.assertEqual( sorted(flat, key=kf), sorted(parse_qsl(req.to_postdata()), key=kf))
def test_to_url(self): url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", url, params) exp = urlparse("%s?%s" % (url, urlencode(params))) res = urlparse(req.to_url()) self.assertEqual(exp.scheme, res.scheme) self.assertEqual(exp.netloc, res.netloc) self.assertEqual(exp.path, res.path) exp_parsed = parse_qs(exp.query) res_parsed = parse_qs(res.query) self.assertEqual(exp_parsed, res_parsed)
def test_get_normalized_parameters_from_url(self): # example copied from # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js # which in turns says that it was copied from # http://oauth.net/core/1.0/#sig_base_example . url = ("http://photos.example.net/photos?file=vacation.jpg" "&oauth_consumer_key=dpf43f3p2l4k3l03" "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1" "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk" "&oauth_version=1.0&size=original") req = oauth.Request("GET", url) res = req.get_normalized_parameters() expected = ('file=vacation.jpg&oauth_consumer_key=dpf43f3p2l4k3l03' '&oauth_nonce=kllo9940pd9333jh' '&oauth_signature_method=HMAC-SHA1' '&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk' '&oauth_version=1.0&size=original') self.assertEqual(expected, res)
def test_get_normalized_parameters_ignores_auth_signature(self): url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_signature': "some-random-signature-%d" % random.randint(1000, 2000), 'oauth_token': "ad180jjd733klru7", } req = oauth.Request("GET", url, params) res = req.get_normalized_parameters() self.assertNotEqual(urlencode(sorted(params.items())), res) foo = params.copy() del foo["oauth_signature"] self.assertEqual(urlencode(sorted(foo.items())), res)
def test_from_token_and_callback(self): url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") req = oauth.Request.from_token_and_callback(tok) self.assertFalse('oauth_callback' in req) self.assertEqual(req['oauth_token'], tok.key) req = oauth.Request.from_token_and_callback(tok, callback=url) self.assertTrue('oauth_callback' in req) self.assertEqual(req['oauth_callback'], url)
def setUp(self): url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } self.consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") self.token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = self.token.key params['oauth_consumer_key'] = self.consumer.key self.request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() self.request.sign_request(signature_method, self.consumer, self.token)
def sign_request(self, url, url_params={}): oauth_request = oauth2.Request( method="GET", url=url, parameters=url_params ) oauth_request.update( { 'oauth_nonce': oauth2.generate_nonce(), 'oauth_timestamp': oauth2.generate_timestamp(), 'oauth_token': self.token.key, 'oauth_consumer_key': self.consumer.key } ) oauth_request.sign_request( oauth2.SignatureMethod_HMAC_SHA1(), self.consumer, self.token ) return oauth_request.to_url()
def refresh_token(self, token): """Access Tokens only last for one hour from the point of being issued. When a token has expired it needs to be refreshed this method takes an expired token and refreshes it. token parameter can be either a token object or a token string. """ if not hasattr(token, "key"): token = YahooToken.from_string(token) params = self.get_base_params() params['oauth_token'] = token.key params['oauth_token_secret'] = token.secret params['oauth_session_handle'] = token.session_handle oauth_request = oauth.Request.from_consumer_and_token( self.consumer, token=token, http_url=ACCESS_TOKEN_URL, http_method="POST", parameters=params) yql_logger.debug("oauth_request: %s", oauth_request) oauth_request.sign_request( self.hmac_sha1_signature, self.consumer, token) url = oauth_request.to_url() yql_logger.debug("oauth_url: %s", url) postdata = oauth_request.to_postdata() yql_logger.debug("oauth_postdata: %s", postdata) resp, content = self.http.request(url, "POST", postdata) if resp.get('status') == '200': access_token = YahooToken.from_string(content) yql_logger.debug("oauth_access_token: %s", access_token) access_token.timestamp = oauth_request['oauth_timestamp'] return access_token else: raise YQLError(resp, content, url)
def get_uri(self, query, params=None, **kwargs): """Get the the request url""" if isinstance(query, basestring): query = YQLQuery(query) query_params = self.get_query_params(query, params, **kwargs) token = kwargs.get("token") if hasattr(token, "yahoo_guid"): query_params["oauth_yahoo_guid"] = getattr(token, "yahoo_guid") if not token: raise ValueError("Without a token three-legged-auth cannot be" " carried out") yql_logger.debug("query_params: %s", query_params) http_method = query.get_http_method() url = self.endpoint oauth_request = oauth.Request.from_consumer_and_token( self.consumer, http_url=url, token=token, parameters=query_params, http_method=http_method) yql_logger.debug("oauth_request: %s", oauth_request) # Sign request sig = self.get_signature(url) oauth_request.sign_request(sig, self.consumer, token) yql_logger.debug("oauth_signed_request: %s", oauth_request) url = oauth_request.to_url() url = clean_url(url) return url.replace('+', '%20').replace('%7E', '~')
def refresh_token(self, token): """Access Tokens only last for one hour from the point of being issued. When a token has expired it needs to be refreshed this method takes an expired token and refreshes it. token parameter can be either a token object or a token string. """ if not hasattr(token, "key"): token = YahooToken.from_string(token) params = self.get_base_params() params['oauth_token'] = token.key params['oauth_token_secret'] = token.secret params['oauth_session_handle'] = token.session_handle oauth_request = oauth.Request.from_consumer_and_token( self.consumer, token=token, http_url=ACCESS_TOKEN_URL, http_method="POST", parameters=params) yql_logger.debug("oauth_request: %s", oauth_request) oauth_request.sign_request( self.hmac_sha1_signature, self.consumer, token) url = oauth_request.to_url() yql_logger.debug("oauth_url: %s", url) postdata = oauth_request.to_postdata() yql_logger.debug("oauth_postdata: %s", postdata) resp, content = self.http.request(url, "POST", postdata) if resp.get('status') == '200': access_token = YahooToken.from_string(content) yql_logger.debug("oauth_access_token: %s", access_token) access_token.timestamp = oauth_request['oauth_timestamp'] return access_token else: raise YQLError, (resp, content, url)
def get_uri(self, query, params=None, **kwargs): """Get the the request url""" query_params = self.get_query_params(query, params, **kwargs) token = kwargs.get("token") if hasattr(token, "yahoo_guid"): query_params["oauth_yahoo_guid"] = getattr(token, "yahoo_guid") if not token: raise ValueError, "Without a token three-legged-auth cannot be"\ " carried out" yql_logger.debug("query_params: %s", query_params) http_method = get_http_method(query) oauth_request = oauth.Request.from_consumer_and_token( self.consumer, http_url=self.uri, token=token, parameters=query_params, http_method=http_method) yql_logger.debug("oauth_request: %s", oauth_request) # Sign request oauth_request.sign_request( self.hmac_sha1_signature, self.consumer, token) yql_logger.debug("oauth_signed_request: %s", oauth_request) uri = "%s?%s" % (self.uri, oauth_request.to_postdata()) return uri.replace('+', '%20').replace('%7E', '~')
def test__init__(self): method = "GET" req = oauth.Request(method) self.assertFalse('url' in req.__dict__) self.assertFalse('normalized_url' in req.__dict__) self.assertRaises(AttributeError, getattr, req, 'url') self.assertRaises(AttributeError, getattr, req, 'normalized_url')
def test_setter(self): url = "http://example.com" method = "GET" req = oauth.Request(method, url) self.assertEqual(req.url, url) self.assertEqual(req.normalized_url, url) req.url = url + '/?foo=bar' self.assertEqual(req.url, url + '/?foo=bar') self.assertEqual(req.normalized_url, url + '/') req.url = None self.assertEqual(req.url, None) self.assertEqual(req.normalized_url, None)
def test_deleter(self): url = "http://example.com" method = "GET" req = oauth.Request(method, url) del req.url self.assertRaises(AttributeError, getattr, req, 'url')
def test_url(self): url1 = "http://example.com:80/foo.php" url2 = "https://example.com:443/foo.php" exp1 = "http://example.com/foo.php" exp2 = "https://example.com/foo.php" method = "GET" req = oauth.Request(method, url1) self.assertEqual(req.normalized_url, exp1) self.assertEqual(req.url, url1) req = oauth.Request(method, url2) self.assertEqual(req.normalized_url, exp2) self.assertEqual(req.url, url2)
def test_bad_url(self): request = oauth.Request() try: request.url = "ftp://example.com" self.fail("Invalid URL scheme was accepted.") except ValueError: pass
def test_no_url_set(self): consumer = oauth.Consumer('my_consumer_key', 'my_consumer_secret') token = oauth.Token('my_key', 'my_secret') request = oauth.Request() self.assertRaises(ValueError, request.sign_request, oauth.SignatureMethod_HMAC_SHA1(), consumer, token)
def test_url_query(self): url = ("https://www.google.com/m8/feeds/contacts/default/full/?alt=json&max-contacts=10") normalized_url = urlunparse(urlparse(url)[:3] + (None, None, None)) method = "GET" req = oauth.Request(method, url) self.assertEqual(req.url, url) self.assertEqual(req.normalized_url, normalized_url)
def test_get_parameter(self): url = "http://example.com" method = "GET" params = {'oauth_consumer' : 'asdf'} req = oauth.Request(method, url, parameters=params) self.assertEqual(req.get_parameter('oauth_consumer'), 'asdf') self.assertRaises(oauth.Error, req.get_parameter, 'blah')
def test_to_url_nonascii(self): url = "http://sp.example.com/" params = { 'nonasciithing': u'q\xbfu\xe9 ,aasp u?..a.s', 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", url, params) res = urlparse(req.to_url()) params['nonasciithing'] = params['nonasciithing'].encode('utf-8') exp = urlparse("%s?%s" % (url, urlencode(params))) self.assertEquals(exp.netloc, res.netloc) self.assertEquals(exp.path, res.path) a = parse_qs(exp.query) b = parse_qs(res.query) self.assertEquals(a, b)
def test_to_header(self): realm = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", realm, params) header, value = list(req.to_header(realm).items())[0] parts = value.split('OAuth ') vars = parts[1].split(', ') self.assertTrue(len(vars), (len(params) + 1)) res = {} for v in vars: var, val = v.split('=') res[var] = unquote(val.strip('"')) self.assertEqual(realm, res['realm']) del res['realm'] self.assertTrue(len(res), len(params)) for key, val in res.items(): self.assertEqual(val, params.get(key))
def test_to_postdata_nonascii(self): realm = "http://sp.example.com/" params = { 'nonasciithing': u('q\xbfu\xe9 ,aasp u?..a.s', 'latin1'), 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", realm, params) self.assertReallyEqual( req.to_postdata(), ('nonasciithing=q%C2%BFu%C3%A9%20%2Caasp%20u%3F..a.s' '&oauth_consumer_key=0685bd9184jfhq22' '&oauth_nonce=4572616e48616d6d65724c61686176' '&oauth_signature=wOJIO9A2W5mFwDgiDvZbTSMK%252FPY%253D' '&oauth_signature_method=HMAC-SHA1' '&oauth_timestamp=137131200' '&oauth_token=ad180jjd733klru7' '&oauth_version=1.0' ))
def test_to_url_with_query(self): url = ("https://www.google.com/m8/feeds/contacts/default/full/" "?alt=json&max-contacts=10") params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", url, params) # Note: the url above already has query parameters, so append new # ones with & exp = urlparse("%s&%s" % (url, urlencode(params))) res = urlparse(req.to_url()) self.assertEqual(exp.scheme, res.scheme) self.assertEqual(exp.netloc, res.netloc) self.assertEqual(exp.path, res.path) exp_q = parse_qs(exp.query) res_q = parse_qs(res.query) self.assertTrue('alt' in res_q) self.assertTrue('max-contacts' in res_q) self.assertEqual(res_q['alt'], ['json']) self.assertEqual(res_q['max-contacts'], ['10']) self.assertEqual(exp_q, res_q)
def test_signature_base_string_bytes_nonascii_nonutf8(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = (b'http://api.simplegeo.com:80/1.0/places/address.json' b'?q=monkeys&category=animal' b'&address=41+Decatur+St,+San+Francisc') + _B2766 + b',+CA' req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( #XXX req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_bytes_nonascii_nonutf8_urlencoded(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = (b'http://api.simplegeo.com:80/1.0/places/address.json' b'?q=monkeys&category=animal' b'&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA') req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_unicode_nonascii_nonutf8_url_encoded(self): consumer = oauth.Consumer('consumer_token', 'consumer_secret') url = u('http://api.simplegeo.com:80/1.0/places/address.json' '?q=monkeys&category=animal' '&address=41+Decatur+St,+San+Francisc%E2%9D%A6,+CA') req = oauth.Request("GET", url) self.assertReallyEqual( req.normalized_url, u('http://api.simplegeo.com/1.0/places/address.json')) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), consumer, None) self.assertReallyEqual( req['oauth_signature'], b'WhufgeZKyYpKsI70GZaiDaYwl6g=')
def test_signature_base_string_with_query(self): url = ("https://www.google.com/m8/feeds/contacts/default/full/" "?alt=json&max-contacts=10") params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", url, params) self.assertEqual( req.normalized_url, 'https://www.google.com/m8/feeds/contacts/default/full/') self.assertEqual(req.url, url) normalized_params = parse_qsl(req.get_normalized_parameters()) self.assertTrue(len(normalized_params), len(params) + 2) normalized_params = dict(normalized_params) for key, value in params.items(): if key == 'oauth_signature': continue self.assertEqual(value, normalized_params[key]) self.assertEqual(normalized_params['alt'], 'json') self.assertEqual(normalized_params['max-contacts'], '10')
def test_get_normalized_parameters_empty(self): url = "http://sp.example.com/?empty=" req = oauth.Request("GET", url) res = req.get_normalized_parameters() expected='empty=' self.assertEqual(expected, res)
def test_get_normalized_parameters_multiple(self): url = "http://example.com/v2/search/videos?oauth_nonce=79815175&oauth_timestamp=1295397962&oauth_consumer_key=mykey&oauth_signature_method=HMAC-SHA1&oauth_version=1.0&offset=10&oauth_signature=spWLI%2FGQjid7sQVd5%2FarahRxzJg%3D&tag=one&tag=two" req = oauth.Request("GET", url) res = req.get_normalized_parameters() expected='oauth_consumer_key=mykey&oauth_nonce=79815175&oauth_signature_method=HMAC-SHA1&oauth_timestamp=1295397962&oauth_version=1.0&offset=10&tag=one&tag=two' self.assertEqual(expected, res)
def test_signing_base(self): # example copied from # https://github.com/ciaranj/node-oauth/blob/master/tests/oauth.js # which in turns says that it was copied from # http://oauth.net/core/1.0/#sig_base_example . url = ("http://photos.example.net/photos?file=vacation.jpg" "&oauth_consumer_key=dpf43f3p2l4k3l03" "&oauth_nonce=kllo9940pd9333jh&oauth_signature_method=HMAC-SHA1" "&oauth_timestamp=1191242096&oauth_token=nnch734d00sl2jdk" "&oauth_version=1.0&size=original") req = oauth.Request("GET", url) sm = oauth.SignatureMethod_HMAC_SHA1() consumer = oauth.Consumer('dpf43f3p2l4k3l03', 'foo') key, raw = sm.signing_base(req, consumer, None) expected = b('GET&http%3A%2F%2Fphotos.example.net%2Fphotos' '&file%3Dvacation.jpg' '%26oauth_consumer_key%3Ddpf43f3p2l4k3l03' '%26oauth_nonce%3Dkllo9940pd9333jh' '%26oauth_signature_method%3DHMAC-SHA1' '%26oauth_timestamp%3D1191242096' '%26oauth_token%3Dnnch734d00sl2jdk' '%26oauth_version%3D1.0%26size%3Doriginal') self.assertEqual(expected, raw)
def test_get_normalized_parameters(self): url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'multi': ['FOO','BAR', _UGLYPH, b'\xc2\xae'], 'multi_same': ['FOO','FOO'], 'uni_utf8_bytes': b'\xc2\xae', 'uni_unicode_object': _UGLYPH } req = oauth.Request("GET", url, params) res = req.get_normalized_parameters() expected = ('multi=BAR&multi=FOO&multi=%C2%AE&multi=%C2%AE' '&multi_same=FOO&multi_same=FOO' '&oauth_consumer_key=0685bd9184jfhq22' '&oauth_nonce=4572616e48616d6d65724c61686176' '&oauth_signature_method=HMAC-SHA1' '&oauth_timestamp=137131200' '&oauth_token=ad180jjd733klru7' '&oauth_version=1.0' '&uni_unicode_object=%C2%AE&uni_utf8_bytes=%C2%AE') self.assertEqual(expected, res)
def test_get_normalized_string_escapes_spaces_properly(self): url = "http://sp.example.com/" params = { "some_random_data": random.randint(100, 1000), "data": "This data with a random number (%d) has spaces!" % random.randint(1000, 2000), } req = oauth.Request("GET", url, params) res = req.get_normalized_parameters() expected = urlencode(sorted(params.items())).replace('+', '%20') self.assertEqual(expected, res)
def test_from_request_works_with_wsgi(self): """Make sure WSGI header HTTP_AUTHORIZATION is detected correctly.""" url = "http://sp.example.com/" params = { 'oauth_version': "1.0", 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': "137131200", 'oauth_consumer_key': "0685bd9184jfhq22", 'oauth_signature_method': "HMAC-SHA1", 'oauth_token': "ad180jjd733klru7", 'oauth_signature': "wOJIO9A2W5mFwDgiDvZbTSMK%2FPY%3D", } req = oauth.Request("GET", url, params) headers = req.to_header() # Munge the headers headers['HTTP_AUTHORIZATION'] = headers['Authorization'] del headers['Authorization'] # Test from the headers req = oauth.Request.from_request("GET", url, headers) self.assertEqual(req.method, "GET") self.assertEqual(req.url, url) self.assertEqual(params, req.copy())
def test_from_consumer_and_token(self): url = "http://sp.example.com/" tok = oauth.Token(key="tok-test-key", secret="tok-test-secret") tok.set_verifier('this_is_a_test_verifier') con = oauth.Consumer(key="con-test-key", secret="con-test-secret") req = oauth.Request.from_consumer_and_token(con, token=tok, http_method="GET", http_url=url) self.assertEqual(req['oauth_token'], tok.key) self.assertEqual(req['oauth_consumer_key'], con.key) self.assertEqual(tok.verifier, req['oauth_verifier'])
def test_no_version(self): url = "http://sp.example.com/" params = { 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } self.consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") self.token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = self.token.key params['oauth_consumer_key'] = self.consumer.key self.request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() self.request.sign_request(signature_method, self.consumer, self.token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) parameters = server.verify_request(self.request, self.consumer, self.token)
def test_invalid_signature_method(self): url = "http://sp.example.com/" params = { 'oauth_version': '1.0', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = SignatureMethod_Bad() request.sign_request(signature_method, consumer, token) server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.Error, server.verify_request, request, consumer, token)
def test_missing_signature(self): url = "http://sp.example.com/" params = { 'oauth_version': '1.0', 'oauth_nonce': "4572616e48616d6d65724c61686176", 'oauth_timestamp': int(time.time()), 'bar': 'blerg', 'multi': ['FOO','BAR'], 'foo': 59 } consumer = oauth.Consumer(key="consumer-key", secret="consumer-secret") token = oauth.Token(key="token-key", secret="token-secret") params['oauth_token'] = token.key params['oauth_consumer_key'] = consumer.key request = oauth.Request(method="GET", url=url, parameters=params) signature_method = oauth.SignatureMethod_HMAC_SHA1() request.sign_request(signature_method, consumer, token) del request['oauth_signature'] server = oauth.Server() server.add_signature_method(oauth.SignatureMethod_HMAC_SHA1()) self.assertRaises(oauth.MissingSignature, server.verify_request, request, consumer, token) # Request Token: http://oauth-sandbox.sevengoslings.net/request_token # Auth: http://oauth-sandbox.sevengoslings.net/authorize # Access Token: http://oauth-sandbox.sevengoslings.net/access_token # Two-legged: http://oauth-sandbox.sevengoslings.net/two_legged # Three-legged: http://oauth-sandbox.sevengoslings.net/three_legged # Key: bd37aed57e15df53 # Secret: 0e9e6413a9ef49510a4f68ed02cd
def test_url_with_query_string(self, mockHttpRequest): uri = 'http://example.com/foo/bar/?show=thundercats&character=snarf' client = oauth.Client(self.consumer, None) random_result = random.randint(1,100) def mockrequest(cl, ur, **kw): self.assertTrue(cl is client) self.assertEqual(frozenset(kw.keys()), frozenset(['method', 'body', 'redirections', 'connection_type', 'headers'])) self.assertEqual(kw['body'], b'') self.assertEqual(kw['connection_type'], None) self.assertEqual(kw['method'], 'GET') self.assertEqual(kw['redirections'], httplib2.DEFAULT_MAX_REDIRECTS) self.assertTrue(isinstance(kw['headers'], dict)) req = oauth.Request.from_consumer_and_token(self.consumer, None, http_method='GET', http_url=uri, parameters={}) req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), self.consumer, None) expected = parse_qsl( urlparse(req.to_url()).query) actual = parse_qsl(urlparse(ur).query) self.assertEqual(len(expected), len(actual)) actual = dict(actual) for key, value in expected: if key not in ('oauth_signature', 'oauth_nonce', 'oauth_timestamp'): self.assertEqual(actual[key], value) return random_result mockHttpRequest.side_effect = mockrequest client.request(uri, 'GET')