我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用requests_oauthlib.OAuth1Session()。
def get_authorization_url(self, callback_uri): session = OAuth1Session( settings.OAUTH_CONSUMER_KEY, client_secret=settings.OAUTH_CONSUMER_SECRET, callback_uri=callback_uri, ) try: url = settings.API_HOST + settings.OAUTH_TOKEN_PATH response = session.fetch_request_token(url) except (ValueError, TokenRequestDenied, ConnectionError) as err: raise AuthenticatorError(err) else: self.token = response.get('oauth_token') self.secret = response.get('oauth_token_secret') url = settings.API_HOST + settings.OAUTH_AUTHORIZATION_PATH authorization_url = session.authorization_url(url) LOGGER.log(logging.INFO, 'Initial token {}, secret {}'.format( self.token, self.secret)) return authorization_url
def set_access_token(self, authorization_url): session = OAuth1Session( settings.OAUTH_CONSUMER_KEY, settings.OAUTH_CONSUMER_SECRET, resource_owner_key=self.token, resource_owner_secret=self.secret, ) session.parse_authorization_response(authorization_url) url = settings.API_HOST + settings.OAUTH_ACCESS_TOKEN_PATH try: response = session.fetch_access_token(url) except (TokenRequestDenied, ConnectionError) as err: raise AuthenticatorError(err) else: self.token = response.get('oauth_token') self.secret = response.get('oauth_token_secret') LOGGER.log(logging.INFO, 'Updated token {}, secret {}'.format( self.token, self.secret))
def __init__(self, consumer_key, consumer_secret, callback=None): if type(consumer_key) == six.text_type: consumer_key = consumer_key.encode('ascii') if type(consumer_secret) == six.text_type: consumer_secret = consumer_secret.encode('ascii') self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.access_token = None self.access_token_secret = None self.callback = callback self.username = None self.oauth = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri=self.callback)
def get_access_token(self, verifier=None): """ After user has authorized the request token, get access token with user supplied verifier. """ try: url = self._get_oauth_url('access_token') self.oauth = OAuth1Session(self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.request_token['oauth_token'], resource_owner_secret=self.request_token['oauth_token_secret'], verifier=verifier, callback_uri=self.callback) resp = self.oauth.fetch_access_token(url) self.access_token = resp['oauth_token'] self.access_token_secret = resp['oauth_token_secret'] return self.access_token, self.access_token_secret except Exception as e: raise TweepError(e)
def get_authentication_url(self): oauth = OAuth1Session( self.client_id, client_secret=self.client_secret, ) token = oauth.fetch_request_token(self.request_token_url) cache.set( 'oa-token-%s' % token['oauth_token'], token, timeout=3600) self._request.session['oa_token'] = token['oauth_token'] authorization_url = oauth.authorization_url( self.authorization_base_url, ) return authorization_url
def __init__(self, consumer_key, consumer_secret, domain='https://www.schoology.com', three_legged=False, request_token=None, request_token_secret=None, access_token=None, access_token_secret=None): self.API_ROOT = 'https://api.schoology.com/v1' self.DOMAIN_ROOT = domain self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.request_token = request_token self.request_token_secret = request_token_secret self.access_token = access_token self.access_token_secret = access_token_secret self.oauth = requests_oauthlib.OAuth1Session(self.consumer_key, self.consumer_secret) self.three_legged = three_legged
def get_auth_token(self): """ Retrieves an auth_session_token using DEP server token data prepared as an OAuth1Session() instance earlier on. """ # Retrieve session auth token get_session = self.dep_prep('session', 'get', authsession=self.oauth) response = self.oauth.send(get_session) # Extract the auth session token from the JSON reply token = response.json()['auth_session_token'] # The token happens to contain the UNIX timestamp of when it was generated # so we save it for later reference. timestamp = token[:10] # Roll a human-readable timestamp as well. ts_readable = datetime.datetime.fromtimestamp( int(timestamp)).strftime( '%Y-%m-%d %H:%M:%S') print "Token generated at %s" % ts_readable return token, timestamp
def get_trends(self, local): """ Method to get the trending hashtags. :type local: str :rtype: list of str """ session_string = "https://api.twitter.com/1.1/trends/place.json?id=" local_id = self.get_local_identifier()[local] session_string += local_id session = OAuth1Session(ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret) response = session.get(session_string) if response.__dict__['status_code'] == 200: local_trends = json.loads(response.text)[0]["trends"] hashtags = [trend["name"] for trend in local_trends if trend["name"][0] == '#'] else: hashtags = [] return hashtags
def authorize(filepath=None): """ Create an authorization object for use with the requests library. Takes the path to a yaml-encoded file containing twitter API keys. :param filepath: :type filepath: str :returns: OAuth1Session """ # Try to find the file if no path was given # find_keyfile returns a generator and .next() gives the first match try: filepath = filepath or next(find_keyfile()) except StopIteration: raise Exception("No Keyfile found - please place keys.yaml with your tokens in the project directory or pass a custom filepath to the authorize() function") # Load credentials from keyfile with open(filepath, 'r') as f: keys = yaml.load(f) # Create authentication object auth_object = OAuth1Session(client_key=keys["client_key"], client_secret=keys["client_secret"], resource_owner_key=keys["resource_owner_key"], resource_owner_secret=keys["resource_owner_secret"]) return auth_object
def doTweet(name, title, sturl): CK = "" # Consumer Key CS = "" # Consumer Secret AT = "" # Access Token AS = "" # Accesss Token Secert # ????????UR url = "https://api.twitter.com/1.1/statuses/update.json" # ?????? params = {"status": datetime.now().strftime("?%Y/%m/%d %H:%M?\n") + name + "?????????????\n" + title + "\n\n" + sturl} # OAuth??? POST method ??? twitter = OAuth1Session(CK, CS, AT, AS) req = twitter.post(url, params=params) # ???????? if req.status_code == 200: print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[SUCCESS] Tweet success") else: print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[FAIL] Tweet fail Error: %d" % req.status_code) # ???????tweet??????
def doTweetTime(name, title, sturl, time): CK = "" # Consumer Key CS = "" # Consumer Secret AT = "" # Access Token AS = "" # Accesss Token Secert # ????????UR url = "https://api.twitter.com/1.1/statuses/update.json" # ?????? params = {"status": datetime.now().strftime("?%Y/%m/%d %H:%M") + " ????" + str(time) + "????\n" + name + "?????\n" + title + "\n\n" + sturl} # OAuth??? POST method ??? twitter = OAuth1Session(CK, CS, AT, AS) req = twitter.post(url, params=params) # ???????? if req.status_code == 200: print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[SUCCESS] Tweet success") else: print(datetime.now().strftime("[%Y/%m/%d %H:%M:%S]") + "[FAIL] Tweet fail Error: %d" % req.status_code)
def fetch_access_token(self, verifier, token=None): """Step 3: Given the verifier from fitbit, and optionally a token from step 1 (not necessary if using the same FitbitOAuthClient object) calls fitbit again and returns an access token object. Extract the needed information from that and save it to use in future API calls. """ if token: self.resource_owner_key = token.get('oauth_token') self.resource_owner_secret = token.get('oauth_token_secret') self.oauth = OAuth1Session( self.client_key, client_secret=self.client_secret, resource_owner_key=self.resource_owner_key, resource_owner_secret=self.resource_owner_secret, verifier=verifier) response = self.oauth.fetch_access_token(self.access_token_url) self.user_id = response.get('encoded_user_id') self.resource_owner_key = response.get('oauth_token') self.resource_owner_secret = response.get('oauth_token_secret') return response
def test_fetch_request_token(self): # fetch_request_token needs to make a request and then build a token from the response fb = Fitbit(**self.client_kwargs) with mock.patch.object(OAuth1Session, 'fetch_request_token') as frt: frt.return_value = { 'oauth_callback_confirmed': 'true', 'oauth_token': 'FAKE_OAUTH_TOKEN', 'oauth_token_secret': 'FAKE_OAUTH_TOKEN_SECRET'} retval = fb.client.fetch_request_token() self.assertEqual(1, frt.call_count) # Got the right return value self.assertEqual('true', retval.get('oauth_callback_confirmed')) self.assertEqual('FAKE_OAUTH_TOKEN', retval.get('oauth_token')) self.assertEqual('FAKE_OAUTH_TOKEN_SECRET', retval.get('oauth_token_secret'))
def test_fetch_access_token(self): kwargs = self.client_kwargs kwargs['resource_owner_key'] = '' kwargs['resource_owner_secret'] = '' fb = Fitbit(**kwargs) fake_verifier = "FAKEVERIFIER" with mock.patch.object(OAuth1Session, 'fetch_access_token') as fat: fat.return_value = { 'encoded_user_id': 'FAKE_USER_ID', 'oauth_token': 'FAKE_RETURNED_KEY', 'oauth_token_secret': 'FAKE_RETURNED_SECRET' } retval = fb.client.fetch_access_token(fake_verifier) self.assertEqual("FAKE_RETURNED_KEY", retval['oauth_token']) self.assertEqual("FAKE_RETURNED_SECRET", retval['oauth_token_secret']) self.assertEqual('FAKE_USER_ID', fb.client.user_id)
def yelp_request(host, path, url_params=None): """Format a proper OAuth request to the API""" url_params = url_params or {} url = host + path yelp = OAuth1Session( CONSUMER_KEY, client_secret=CONSUMER_SECRET, resource_owner_key=TOKEN, resource_owner_secret=TOKEN_SECRET) print u'Querying Yelp API...' response = yelp.get(url, params=url_params) return response.json()
def get_session(self): session = OAuth1Session( settings.OAUTH_CONSUMER_KEY, client_secret=settings.OAUTH_CONSUMER_SECRET, resource_owner_key=self.token, resource_owner_secret=self.secret, ) return session
def __init__(self, client_key, client_secret=None, token=None, token_secret=None, callback_uri=None, verifier=None, proxies=None, https=True): super(Fanfou, self).__init__(OAuth1Session(client_key, client_secret=client_secret, resource_owner_key=token, resource_owner_secret=token_secret, callback_uri=callback_uri, verifier=verifier), "https://api.fanfou.com" if https else "http://api.fanfou.com", objlist=['search', 'blocks', 'users', 'account', 'saved_searches', 'photos', 'trends', 'followers', 'favorites', 'friendships', 'friends', 'statuses', 'direct_messages'], postfix="json", proxies=proxies) self.auth._client = HttpsAuth(self.auth.auth) self.auth.auth = self.auth._client
def get(self, request, *args, **kwargs): from requests_oauthlib import OAuth1Session config = SocialConfig.get_solo() oauth_token = request.GET.get('oauth_token') oauth_verifier = request.GET.get('oauth_verifier') oauth_client = OAuth1Session( client_key=config.twitter_client_id, client_secret=config.twitter_client_secret, resource_owner_key=oauth_token, resource_owner_secret=config.twitter_access_token_secret, verifier=oauth_verifier ) try: answer = oauth_client.fetch_access_token('https://api.twitter.com/oauth/access_token') except ValueError: raise Http404 if answer and 'oauth_token' in answer: SocialConfig.objects.update(twitter_access_token=answer['oauth_token']) add_message(request, SUCCESS, _('Twitter access_token updated successfully!')) return redirect('admin:social_networks_socialconfig_change') else: return HttpResponse(answer)
def get_user_data(self): oauth = OAuth1Session( self.client_id, client_secret=self.client_secret, ) oauth_response = oauth.parse_authorization_response( self._request.build_absolute_uri(self._request.get_full_path())) verifier = oauth_response.get('oauth_verifier') resource_owner = cache.get( 'oa-token-%s' % self._request.session.pop('oa_token') ) oauth = OAuth1Session( self.client_id, client_secret=self.client_secret, resource_owner_key=resource_owner.get('oauth_token'), resource_owner_secret=resource_owner.get('oauth_token_secret'), verifier=verifier, ) oauth_tokens = oauth.fetch_access_token(self.access_token_url) resource_owner_key = oauth_tokens.get('oauth_token') resource_owner_secret = oauth_tokens.get('oauth_token_secret') oauth = OAuth1Session( self.client_id, client_secret=self.client_secret, resource_owner_key=resource_owner_key, resource_owner_secret=resource_owner_secret, ) data = oauth.get( 'https://api.twitter.com/1.1/account/verify_credentials.json' '?include_email=true', ).json() return { 'email': data.get('email'), 'full_name': data.get('name'), }
def _do_oauth(self): request_token_url = self.host + "/oauth/request_token" authorization_url = self.host + "/oauth/authorize" access_token_url = self.host + "/oauth/access_token" oauth_session = OAuth1Session(self.api_key[self.cur_api_key], client_secret=self.api_secret[self.cur_api_key], callback_uri="paste_this") # TODO: use 'oob' oauth_session.fetch_request_token(request_token_url) redirect_url = oauth_session.authorization_url(authorization_url) print "Flickr needs user authentication" print "--------------------------------" print "Visit this site:" # Flickr permissions: # read - permission to read private information # write - permission to add, edit and delete photo metadata (includes 'read') # delete - permission to delete photos (includes 'write' and 'read') print redirect_url+"&perms=write" redirect_response = raw_input('Paste the FULL URL here:') oauth_session.parse_authorization_response(redirect_response) oauth_session.fetch_access_token(access_token_url) with open(self.oauth_file, 'w') as f: pickle.dump(oauth_session, f) return oauth_session
def __init__(self, client_key, client_secret, resource_owner_key, resource_owner_secret): '''__init_() ''' self.client_key = client_key self.client_secret = client_secret self.resource_owner_key = resource_owner_key self.resource_owner_secret = resource_owner_secret self.base_url_prod = r'https://etws.etrade.com' self.base_url_dev = r'https://etwssandbox.etrade.com' self.session = OAuth1Session(self.client_key, self.client_secret, self.resource_owner_key, self.resource_owner_secret, signature_type='AUTH_HEADER')
def __init__(self, client_key, client_secret, resource_owner_key, resource_owner_secret): '''__init__(client_key, client_secret) param: client_key type: str description: etrade client key param: client_secret type: str description: etrade client secret param: resource_owner_key type: str description: OAuth authentication token key param: resource_owner_secret type: str description: OAuth authentication token secret''' self.client_key = client_key self.client_secret = client_secret self.resource_owner_key = resource_owner_key self.resource_owner_secret = resource_owner_secret self.base_url_prod = r'https://etws.etrade.com' self.base_url_dev = r'https://etwssandbox.etrade.com' self.session = OAuth1Session(self.client_key, self.client_secret, self.resource_owner_key, self.resource_owner_secret, signature_type='AUTH_HEADER')
def __init__(self, client_key, client_secret, resource_owner_key, resource_owner_secret): '''__init__(client_key, client_secret) param: client_key type: str description: etrade client key param: client_secret type: str description: etrade client secret param: resource_owner_key type: str description: OAuth authentication token key param: resource_owner_secret type: str description: OAuth authentication token secret''' self.client_key = client_key self.client_secret = client_secret self.resource_owner_key = resource_owner_key self.resource_owner_secret = resource_owner_secret self.renew_access_token_url = r'https://etws.etrade.com/oauth/renew_access_token' self.revoke_access_token_url = r'https://etws.etrade.com/oauth/revoke_access_token' self.session = OAuth1Session(self.client_key, self.client_secret, self.resource_owner_key, self.resource_owner_secret, signature_type='AUTH_HEADER')
def authorize(self): if self.authorized or not self.three_legged: return True access_token_url = self.API_ROOT + '/oauth/access_token' self.oauth = requests_oauthlib.OAuth1Session(self.consumer_key, self.consumer_secret, resource_owner_key=self.request_token, resource_owner_secret=self.request_token_secret) try: oauth_tokens = self._fetch_token(access_token_url, self.oauth) except TokenRequestDenied: return False self.access_token = oauth_tokens.get('oauth_token') self.access_token_secret = oauth_tokens.get('oauth_token_secret') return self.access_token is not None
def init_stoken(self, stoken): """ Loads the required stoken.json file from disk. """ with open(stoken) as data_file: self.mytoken = json.load(data_file) # Verify that the stoken data is somewhat sane, i.e. has the required # keys and their values start with expected prepends. try: for k, prefix in (('consumer_secret', 'CS_'), ('access_token', 'AT_'), ('consumer_key', 'CK_'), ('access_secret', 'AS_')): if not self.mytoken.get(k).startswith(prefix): print 'Error parsing stoken file: bad value for:\n%s = %s\n' % (k, self.mytoken[k]) sys.exit(-1) except AttributeError: print 'Error parsing stoken file: missing key for:\n%s\n' % (k) sys.exit(-1) # Set the required OAuth1 keys from the source stoken self.oauth = OAuth1Session(client_key=self.mytoken['consumer_key'], client_secret=self.mytoken['consumer_secret'], resource_owner_key=self.mytoken['access_token'], resource_owner_secret=self.mytoken['access_secret'], realm='ADM')
def dep_prep(self, query, method, authsession=None, token=None, params=False): """ Sets up common headers for DEP commands using the 'requests' Request class to combine our auth token, required headers and other data to generate a correct HTTP request to be sent to the DEP API. Required parameters: - query (The API request to use) - method (The HTTP method to use: GET/PUT/POST) - token (The auth session token retrieved by get_auth_token()) Optional parameters: - authsession (expects an OAuth1Session instance) - params (query string to send instead of JSON data) """ req = Request(method, self.dep_api_url + query) prep = None # Check whether we're requesting an auth session token or doing a regular # API call with DEP. if authsession: prep = authsession.prepare_request(req) # Regular API calls require the X-ADM-Auth-Session header to be set elif token: prep = req.prepare() prep.headers['X-ADM-Auth-Session'] = token # If we received no token or token is None we have a problem, halt. else: print "No token found, we must exit now..." sys.exit(-1) # Common required headers for DEP API calls, we use v2 as v1 is deprecated prep.headers['X-Server-Protocol-Version'] = '2' # A few (or just one) calls use a query string instead of JSON so we skip # setting the Content-Type header for those. if not params: prep.headers['Content-Type'] = 'application/json;charset=UTF8' return prep
def _connect(self): logging.info("creating http session") self.client = OAuth1Session( client_key=self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.access_token, resource_owner_secret=self.access_token_secret )
def Main(args): # get access keys from a config file config = ConfigParser() config.read(args.config) ConsumerKey = config.get('AccessKeys','ConsumerKey') ConsumerSecret = config.get('AccessKeys','ConsumerSecret') AccessToken = config.get('AccessKeys','AccessToken') AccessTokenSecret = config.get('AccessKeys','AccessTokenSecret') # open a session session = OAuth1Session(ConsumerKey, ConsumerSecret, AccessToken, AccessTokenSecret) # collect users from the queries user_search = GETUsersSearch(session) user_search.setParams(' '.join(args.queries), target_count=args.count) user_search.waitReady() result = user_search.call() logger.info('obtained %d users' % len(result)) if args.dump: logger.info('writing raw data to file %s' % args.dump) json.dump(result, open(args.dump,'w'), indent=2) if args.output: logger.info('writing screen names to file %s' % args.output) with open(args.output,'w') as f: for user in result: f.write(user['screen_name'] + '\n') else: for user in result: sys.stdout.write(user['screen_name'] + '\n')
def __init__(self, client_key, client_secret, resource_owner_key=None, resource_owner_secret=None, user_id=None, callback_uri=None, *args, **kwargs): """ Create a FitbitOauthClient object. Specify the first 5 parameters if you have them to access user data. Specify just the first 2 parameters to access anonymous data and start the set up for user authorization. Set callback_uri to a URL and when the user has granted us access at the fitbit site, fitbit will redirect them to the URL you passed. This is how we get back the magic verifier string from fitbit if we're a web app. If we don't pass it, then fitbit will just display the verifier string for the user to copy and we'll have to ask them to paste it for us and read it that way. """ self.session = requests.Session() self.client_key = client_key self.client_secret = client_secret self.resource_owner_key = resource_owner_key self.resource_owner_secret = resource_owner_secret if user_id: self.user_id = user_id params = {'client_secret': client_secret} if callback_uri: params['callback_uri'] = callback_uri if self.resource_owner_key and self.resource_owner_secret: params['resource_owner_key'] = self.resource_owner_key params['resource_owner_secret'] = self.resource_owner_secret self.oauth = OAuth1Session(client_key, **params)
def test_authorize_token_url(self): # authorize_token_url calls oauth and returns a URL fb = Fitbit(**self.client_kwargs) with mock.patch.object(OAuth1Session, 'authorization_url') as au: au.return_value = 'FAKEURL' retval = fb.client.authorize_token_url() self.assertEqual(1, au.call_count) self.assertEqual("FAKEURL", retval)
def oauth_request(): try: o = OAuth1Session(const.CONSUMER_KEY, const.CONSUMER_SECRET) req = o.fetch_request_token("http://fanfou.com/oauth/request_token") ov = request.url_root[:-1] + url_for(".oauth_verify") session['req'] = req auth = o.authorization_url("http://fanfou.com/oauth/authorize", oauth_callback=ov) except ValueError: session['error_msg'] = "???????????" return redirect(url_for('.xauth')) return redirect(auth)
def oauth_verify(): try: req = session['req'] o = OAuth1Session(const.CONSUMER_KEY, const.CONSUMER_SECRET, req['oauth_token'], req['oauth_token_secret'], verifier=req['oauth_token']) ac = o.fetch_access_token("http://fanfou.com/oauth/access_token") session['req'] = ac user = o.get("http://api.fanfou.com/account/verify_credentials.json?mode=lite").json() except: session['error_msg'] = "?????????" return redirect(url_for('.xauth')) try: try: ff_auth = FFAuth.query.equal_to('uniqueID', user['unique_id']).first() except LeanCloudError as err: if err.code == 101: ff_auth = FFAuth() ff_auth.set('username', user['id']) ff_auth.set('nickname', user['name']) ff_auth.set('uniqueID', user['unique_id']) ff_auth.set('token', ac['oauth_token']) ff_auth.set('secret', ac['oauth_token_secret']) ff_auth.save() except LeanCloudError: session['error_msg'] = "????????" return redirect(url_for('.xauth')) login_user(ff_auth, True) return redirect(url_for('main.index'))
def get_access_token(consumer_key, consumer_secret): oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri='oob') print('Requesting temp token from Twitter') try: resp = oauth_client.fetch_request_token(REQUEST_TOKEN_URL) except ValueError: print('Invalid respond from Twitter requesting temp token: %s' % e) return url = oauth_client.authorization_url(AUTHORIZATION_URL) print('') print('I will try to start a browser to visit the following Twitter page') print('if a browser will not start, copy the URL to your browser') print('and retrieve the pincode to be used') print('in the next step to obtaining an Authentication Token:') print('') print(url) print('') webbrowser.open(url) pincode = input('Pincode? ') print('') print('Generating and signing request for an access token') print('') oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret, resource_owner_key=resp.get('oauth_token'), resource_owner_secret=resp.get('oauth_token_secret'), verifier=pincode ) try: resp = oauth_client.fetch_access_token(ACCESS_TOKEN_URL) except ValueError: print('Invalid respond from Twitter requesting access token: %s' % e) return print('Your Twitter Access Token key: %s' % resp.get('oauth_token')) print(' Access Token secret: %s' % resp.get('oauth_token_secret')) print('')
def get_request_token(self): '''get_request_token() -> auth url some params handled by requests_oauthlib but put in doc string for clarity into the API. param: oauth_consumer_key type: str description: the value used by the consumer to identify itself to the service provider. param: oauth_timestamp type: int description: the date and time of the request, in epoch time. must be accurate within five minutes. param: oauth_nonce type: str description: a nonce, as discribed in the authorization guide roughly, an arbitrary or random value that cannot be used again with the same timestamp. param: oauth_signature_method type: str description: the signature method used by the consumer to sign the request. the only supported value is 'HMAC-SHA1'. param: oauth_signature type: str description: signature generated with the shared secret and token secret using the specified oauth_signature_method as described in OAuth documentation. param: oauth_callback type: str description: callback information, as described elsewhere. must always be set to 'oob' whether using a callback or not rtype: str description: Etrade autherization url''' # Set up session self.session = OAuth1Session(self.consumer_key, self.consumer_secret, callback_uri=self.callback_url, signature_type='AUTH_HEADER') # get request token self.session.fetch_request_token(self.req_token_url) # get authorization url #etrade format: url?key&token authorization_url = self.session.authorization_url(self.auth_token_url) akey = self.session.parse_authorization_response(authorization_url) # store oauth_token self.resource_owner_key = akey['oauth_token'] formated_auth_url = '%s?key=%s&token=%s' % (self.auth_token_url, self.consumer_key, akey['oauth_token']) self.verifier_url = formated_auth_url logger.debug(formated_auth_url) return formated_auth_url
def get_access_token(consumer_key, consumer_secret): REQUEST_TOKEN_URL = 'https://api.twitter.com/oauth/request_token' ACCESS_TOKEN_URL = 'https://api.twitter.com/oauth/access_token' AUTHORIZATION_URL = 'https://api.twitter.com/oauth/authorize' oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret, callback_uri='oob') print('\nRequesting temp token from Twitter...\n') try: resp = oauth_client.fetch_request_token(REQUEST_TOKEN_URL) except ValueError as e: raise ValueError( 'Invalid response from Twitter requesting temp token: {0}'.format( e)) url = oauth_client.authorization_url(AUTHORIZATION_URL) print('I will try to start a browser to visit the following Twitter page ' 'if a browser will not start, copy the URL to your browser ' 'and retrieve the pincode to be used ' 'in the next step to obtaining an Authentication Token: \n' '\n\t{0}'.format(url)) webbrowser.open(url) pincode = raw_input('\nEnter your pincode? ') print('\nGenerating and signing request for an access token...\n') oauth_client = OAuth1Session(consumer_key, client_secret=consumer_secret, resource_owner_key=resp.get('oauth_token'), resource_owner_secret=resp.get( 'oauth_token_secret'), verifier=pincode) try: resp = oauth_client.fetch_access_token(ACCESS_TOKEN_URL) except ValueError as e: msg = ('Invalid response from Twitter requesting ' 'temp token: {0}').format(e) raise ValueError(msg) # # print('''Your tokens/keys are as follows: # consumer_key = {ck} # consumer_secret = {cs} # access_token_key = {atk} # access_token_secret = {ats}'''.format( # ck=consumer_key, # cs=consumer_secret, # atk=resp.get('oauth_token'), # ats=resp.get('oauth_token_secret'))) return resp.get('oauth_token'), resp.get('oauth_token_secret')