我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用requests_oauthlib.OAuth1()。
def __init__( self, credFile=None, client_key=None, proxy_url=None): """Initialize with your MuseScore application credentials""" self.proxies = {'https': proxy_url} if proxy_url else None auth_type='oAuth1' if credFile and os.path.isfile(credFile): with open("credentials.json") as json_file: cred = json.load(json_file) self.auth = OAuth1(cred["client_key"], client_secret=cred["client_secret"], resource_owner_key=cred["resource_owner_key"], resource_owner_secret=cred["resource_owner_secret"]) elif client_key: self.auth = None self.client_key = client_key else: raise Exception('At least a client key is needed')
def __init__(self, api_key, api_secret, oauth_token=None, default_timeout=None): self.log = logging.getLogger('%s.%s' % (self.__class__.__module__, self.__class__.__name__)) assert isinstance(api_key, six.text_type), 'api_key must be unicode string' assert isinstance(api_secret, six.text_type), 'api_secret must be unicode string' token = None secret = None if oauth_token.token: token = oauth_token.token.token secret = oauth_token.token.token_secret self.oauth = OAuth1(api_key, api_secret, token, secret, signature_type='auth_header') self.oauth_token = oauth_token self.auth_http_server = None self.requested_permissions = None self.default_timeout = default_timeout
def verify_oauth(access_token,access_token_secret): # consumer key and secret. these keys are generated when creating # our twitter app on the twitter developer portal. They are needed # when verifing twitter credentials consumer_key='kbwf3Qbb6MfjXiPirco5j3UYe' consumer_secret='1Lr0HOTxQBLU22FsH2CH9RFBtnQCHwjAsidZrctvYoy9yIDX7n' # url of twitter api that verifys a users credentials url = 'https://api.twitter.com/1.1/account/verify_credentials.json' # oAuth1 method that creates an authorization object to hand to # twitters api auth = OAuth1(consumer_key, consumer_secret, access_token, access_token_secret) # gets request from twitter api # returns json result from request return requests.get(url, auth=auth) # main method
def post_outcome_request(self, **kwargs): ''' POST an OAuth signed request to the Tool Consumer. ''' if not self.has_required_attributes(): raise InvalidLTIConfigError( 'OutcomeRequest does not have all required attributes') header_oauth = OAuth1(self.consumer_key, self.consumer_secret, signature_type=SIGNATURE_TYPE_AUTH_HEADER, force_include_body=True, **kwargs) headers = {'Content-type': 'application/xml'} resp = requests.post(self.lis_outcome_service_url, auth=header_oauth, data=self.generate_request_xml(), headers=headers) outcome_resp = OutcomeResponse.from_post_response(resp, resp.content) self.outcome_response = outcome_resp return self.outcome_response
def generate_launch_request(self, **kwargs): """ returns a Oauth v1 "signed" requests.PreparedRequest instance """ if not self.has_required_params(): raise InvalidLTIConfigError( 'Consumer\'s launch params missing one of ' + str(LAUNCH_PARAMS_REQUIRED) ) params = self.to_params() r = Request('POST', self.launch_url, data=params).prepare() sign = OAuth1(self.consumer_key, self.consumer_secret, signature_type=SIGNATURE_TYPE_BODY, **kwargs) return sign(r)
def _get_request_token(self): """ Obtain a temporary request token to authorize an access token and to sign the request to obtain the access token """ if self.request_token is None: get_params = {} if self.parameters: get_params.update(self.parameters) get_params['oauth_callback'] \ = self.request.build_absolute_uri(self.callback_url) rt_url = self.request_token_url + '?' + urlencode(get_params) oauth = OAuth1(self.consumer_key, client_secret=self.consumer_secret) response = requests.post(url=rt_url, auth=oauth) if response.status_code not in [200, 201]: raise OAuthError( _('Invalid response while obtaining request token' ' from "%s".') % get_token_prefix( self.request_token_url)) self.request_token = dict(parse_qsl(response.text)) self.request.session['oauth_%s_request_token' % get_token_prefix( self.request_token_url)] = self.request_token return self.request_token
def query(self, url, method="GET", params=dict(), headers=dict()): """ Request a API endpoint at ``url`` with ``params`` being either the POST or GET data. """ access_token = self._get_at_from_session() oauth = OAuth1( self.consumer_key, client_secret=self.secret_key, resource_owner_key=access_token['oauth_token'], resource_owner_secret=access_token['oauth_token_secret']) response = getattr(requests, method.lower())(url, auth=oauth, headers=headers, params=params) if response.status_code != 200: raise OAuthError( _('No access to private resources at "%s".') % get_token_prefix(self.request_token_url)) return response.text
def make_request(self, method, url, data=None, params=None, headers=None, timeout=60): if headers is None: headers = {'x-li-format': 'json', 'Content-Type': 'application/json'} else: headers.update({'x-li-format': 'json', 'Content-Type': 'application/json'}) if params is None: params = {} kw = dict(data=data, params=params, headers=headers, timeout=timeout) if isinstance(self.authentication, LinkedInDeveloperAuthentication): # Let requests_oauthlib.OAuth1 do *all* of the work here auth = OAuth1(self.authentication.consumer_key, self.authentication.consumer_secret, self.authentication.user_token, self.authentication.user_secret) kw.update({'auth': auth}) else: params.update({'oauth2_access_token': self.authentication.token.access_token}) return requests.request(method.upper(), url, **kw)
def get_user_info(data) -> object: oauth_token = data.get('oauth_token') oauth_token_secret = data.get('oauth_token_secret') userinfo = {} userinfo['oauth_token'] = oauth_token userinfo['oauth_token_secret'] = oauth_token_secret url = "http://api.openstreetmap.org/api/0.6/user/details" auth = OAuth1(BaseConfig.OSM_CONSUMER_KEY, BaseConfig.OSM_CONSUMER_SECRET, oauth_token, oauth_token_secret) resp = requests.get(url, auth=auth) tree = ElementTree.fromstring(resp.content) for child in tree: if child.tag == 'user': userinfo['display_name'] = child.attrib.get('display_name') userinfo['id'] = child.attrib.get('id') for innerChild in child: if innerChild.tag == 'img': userinfo['img'] = innerChild.attrib.get('href') return userinfo
def clean_oauth_auth(self, access_token): """Override of oauth_auth since Xing doesn't like callback_uri and oauth_verifier on authenticated API calls""" key, secret = self.get_key_and_secret() resource_owner_key = access_token.get('oauth_token') resource_owner_secret = access_token.get('oauth_token_secret') if not resource_owner_key: raise AuthTokenError(self, 'Missing oauth_token') if not resource_owner_secret: raise AuthTokenError(self, 'Missing oauth_token_secret') # decoding='utf-8' produces errors with python-requests on Python3 # since the final URL will be of type bytes decoding = None if six.PY3 else 'utf-8' return OAuth1(key, secret, resource_owner_key=resource_owner_key, resource_owner_secret=resource_owner_secret, signature_type=SIGNATURE_TYPE_AUTH_HEADER, decoding=decoding)
def unauthorized_token(self): """Return request for unauthorized token (first stage)""" params = self.request_token_extra_arguments() params.update(self.get_scope_argument()) key, secret = self.get_key_and_secret() # decoding='utf-8' produces errors with python-requests on Python3 # since the final URL will be of type bytes decoding = None if six.PY3 else 'utf-8' state = self.get_or_create_state() response = self.request( self.REQUEST_TOKEN_URL, params=params, auth=OAuth1(key, secret, callback_uri=self.get_redirect_uri(state), decoding=decoding), method=self.REQUEST_TOKEN_METHOD ) content = response.content if response.encoding or response.apparent_encoding: content = content.decode(response.encoding or response.apparent_encoding) else: content = response.content.decode() return content
def oauth_auth(self, token=None, oauth_verifier=None, signature_type=SIGNATURE_TYPE_AUTH_HEADER): key, secret = self.get_key_and_secret() oauth_verifier = oauth_verifier or self.data.get('oauth_verifier') if token: resource_owner_key = token.get('oauth_token') resource_owner_secret = token.get('oauth_token_secret') if not resource_owner_key: raise AuthTokenError(self, 'Missing oauth_token') if not resource_owner_secret: raise AuthTokenError(self, 'Missing oauth_token_secret') else: resource_owner_key = None resource_owner_secret = None # decoding='utf-8' produces errors with python-requests on Python3 # since the final URL will be of type bytes decoding = None if six.PY3 else 'utf-8' state = self.get_or_create_state() return OAuth1(key, secret, resource_owner_key=resource_owner_key, resource_owner_secret=resource_owner_secret, callback_uri=self.get_redirect_uri(state), verifier=oauth_verifier, signature_type=signature_type, decoding=decoding)
def unauthorized_token_request(self): """Return request for unauthorized token (first stage)""" params = self.request_token_extra_arguments() params.update(self.get_scope_argument()) key, secret = self.get_key_and_secret() # decoding='utf-8' produces errors with python-requests on Python3 # since the final URL will be of type bytes decoding = None if six.PY3 else 'utf-8' state = self.get_or_create_state() auth = OAuth1( key, secret, callback_uri=self.get_redirect_uri(state), decoding=decoding, signature_method=SIGNATURE_HMAC, signature_type=SIGNATURE_TYPE_QUERY ) url = self.REQUEST_TOKEN_URL + '?' + urlencode(params) url, _, _ = auth.client.sign(url) return url
def oauth_auth(self, token=None, oauth_verifier=None, signature_type=SIGNATURE_TYPE_AUTH_HEADER): key, secret = self.get_key_and_secret() oauth_verifier = oauth_verifier or self.data.get('oauth_verifier') token = token or {} # decoding='utf-8' produces errors with python-requests on Python3 # since the final URL will be of type bytes decoding = None if six.PY3 else 'utf-8' state = self.get_or_create_state() return OAuth1(key, secret, resource_owner_key=None, resource_owner_secret=None, callback_uri=self.get_redirect_uri(state), verifier=oauth_verifier, signature_type=signature_type, decoding=decoding)
def _get_request_token(self): """ Obtain a temporary request token to authorize an access token and to sign the request to obtain the access token """ if self.request_token is None: get_params = {} if self.parameters: get_params.update(self.parameters) get_params['oauth_callback'] \ = self.request.build_absolute_uri(self.callback_url) rt_url = self.request_token_url + '?' + urlencode(get_params) oauth = OAuth1(self.consumer_key, client_secret=self.consumer_secret) response = requests.post(url=rt_url, auth=oauth) if response.status_code not in [200, 201]: raise OAuthError( _('Invalid response while obtaining request token from "%s".') % get_token_prefix(self.request_token_url)) self.request_token = dict(parse_qsl(response.text)) self.request.session['oauth_%s_request_token' % get_token_prefix(self.request_token_url)] = self.request_token return self.request_token
def lambda_handler(event, context): # If being invoked by a dynamodb trigger if 'Records' in event: tweeters = dynamo_triggered_new_users(event) status = new_user_status else: # If being invoked by the cron we scan the table tweeters = token_table.scan()['Items'] for tweeter in tweeters: auth = OAuth1( creds['CONSUMER_KEY'], creds['CONSUMER_SECRET'], tweeter['oauth_token'], tweeter['oauth_token_secret'] ) resp = requests.post( "https://api.twitter.com/1.1/statuses/update.json", data={'status': status}, auth=auth ) if status == 200: print("Tweeted from " + tweeter['screen_name']) else: print(resp.text)
def bind_auth(self, **kwargs): kwargs['auth'] = OAuth1( six.text_type(settings.BITBUCKET_CONSUMER_KEY), six.text_type(settings.BITBUCKET_CONSUMER_SECRET), self.auth.tokens['oauth_token'], self.auth.tokens['oauth_token_secret'], signature_type='auth_header' ) return kwargs
def __init__(self, longitude=-0.418, latitude=39.360, units=weatherservice.Units()): WeatherService.__init__(self, longitude, latitude, units) self.oauth = OAuth1(API_KEY, SHARED_SECRET) self.woeid = geocodeapi.get_woeid(latitude, longitude)
def do_search(term='Food', location='San Francisco'): base_url = 'https://api.yelp.com/v2/search' term = term.replace(' ', '+') location = location.replace(' ', '+') url = "{base_url}?term={term}&location={location}".format(base_url=base_url, term=term, location=location) auth = OAuth1(consumer_key, consumer_secret, token, token_secret) r = requests.get(url, auth=auth) return r.json(), r.text
def __init__(self, consumer_key=None, consumer_secret=None, oauth_token=None, oauth_secret=None): self.consumer_key = consumer_key or TWITTER_CONSUMER_KEY self.consumer_secret = consumer_secret or TWITTER_CONSUMER_SECRET self.oauth_token = oauth_token or OAUTH_TOKEN self.oauth_token_secret = oauth_secret or OAUTH_TOKEN_SECRET self.auth=OAuth1(self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.oauth_token, resource_owner_secret=self.oauth_token_secret)
def requestQBO(method, url, context, payload=None): if settings.oauth_flag == 2: headers = {'Accept': 'application/json', 'User-Agent': 'PythonSampleApp2.0', 'Authorization': 'Bearer '+settings.access_token_oauth2} req = requests.request(method,url, headers=headers, json=payload) else: headers = {'Accept': 'application/json', 'content-type': 'application/json; charset=utf-8', 'User-Agent': 'PythonSampleApp2.0'} auth=OAuth1(context.consumerToken, context.consumerSecret, context.accessToken, context.accessSecret) req = requests.request(method,url, auth=auth, headers=headers, json=payload) return req
def get_oauth(): client_key = get_client_key(request) client_secret = Config().oauth_secret if not client_key or not client_secret: return None return OAuth1(client_key=client_key, client_secret=client_secret)
def auth(): return OAuth1(settings.API_500PX_KEY, client_secret=settings.API_500PX_SECRET)
def register_proxy(self, tool_profile): register_url = self.find_registration_url() r = Request("POST", register_url, data=json.dumps(tool_profile, indent=4), headers={'Content-Type':'application/vnd.ims.lti.v2.toolproxy+json'}).prepare() sign = OAuth1(self.launch_params['reg_key'], self.launch_params['reg_password'], signature_type=SIGNATURE_TYPE_AUTH_HEADER, force_include_body=True) signed = sign(r) return signed
def edit_wiki_page(page_name, content, summary=None): access_token = flask.session.get('access_token', None) auth = OAuth1( app.config['CONSUMER_KEY'], app.config['CONSUMER_SECRET'], access_token['key'], access_token['secret']) # Get token r = requests.get('https://en.wikipedia.org/w/api.php', params={ 'action':'query', 'meta':'tokens', 'format': 'json', }, auth=auth) r.raise_for_status() token = r.json()['query']['tokens']['csrftoken'] r = requests.post('https://en.wikipedia.org/w/api.php', data={ 'action':'edit', 'title': page_name, 'text': content, 'summary': summary, 'format': 'json', 'token': token, 'watchlist': 'nochange', }, auth=auth) r.raise_for_status()
def create_oauth1(consumer_key, consumer_secret, private_key, passphrase): from Crypto.PublicKey import RSA from requests_oauthlib import OAuth1 with open(private_key, 'rb') as fd: rsa_key = RSA.importKey(fd.read(), passphrase) return OAuth1(client_key=consumer_key, client_secret=consumer_secret, signature_method='RSA-SHA1', rsa_key=rsa_key)
def get_oauth1session(consumer_key, consumer_secret, private_key, passphrase): from Crypto.PublicKey import RSA from requests_oauthlib import OAuth1 with open(private_key, 'r') as fd: rsa_key = RSA.importKey(fd.read(), passphrase) return OAuth1(client_key=consumer_key, client_secret=consumer_secret, signature_method='RSA-SHA1', rsa_key=rsa_key)
def apply_auth(self): return OAuth1(self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=self.access_token, resource_owner_secret=self.access_token_secret, decoding=None)
def __init__( self, consumer_key=None, consumer_secret=None, access_token_key=None, access_token_secret=None, auth_type='oAuth1', proxy_url=None): """Initialize with your Twitter application credentials""" self.proxies = {'https': proxy_url} if proxy_url else None if auth_type == 'oAuth1': if not all([consumer_key, consumer_secret, access_token_key, access_token_secret]): raise Exception('Missing authentication parameter') self.auth = OAuth1( consumer_key, consumer_secret, access_token_key, access_token_secret) elif auth_type == 'oAuth2': if not all([consumer_key, consumer_secret]): raise Exception('Missing authentication parameter') self.auth = OAuth2( consumer_key, consumer_secret, proxies=self.proxies) else: raise Exception('Unknown oAuth version')
def get_access_token(self): """ Obtain the access token to access private resources at the API endpoint. """ if self.access_token is None: request_token = self._get_rt_from_session() oauth = OAuth1( self.consumer_key, client_secret=self.consumer_secret, resource_owner_key=request_token['oauth_token'], resource_owner_secret=request_token['oauth_token_secret']) at_url = self.access_token_url # Passing along oauth_verifier is required according to: # http://groups.google.com/group/twitter-development-talk/browse_frm/thread/472500cfe9e7cdb9# # Though, the custom oauth_callback seems to work without it? oauth_verifier = get_request_param(self.request, 'oauth_verifier') if oauth_verifier: at_url = at_url + '?' + urlencode( {'oauth_verifier': oauth_verifier}) response = requests.post(url=at_url, auth=oauth) if response.status_code not in [200, 201]: raise OAuthError( _('Invalid response while obtaining access token' ' from "%s".') % get_token_prefix( self.request_token_url)) self.access_token = dict(parse_qsl(response.text)) self.request.session['oauth_%s_access_token' % get_token_prefix( self.request_token_url)] = self.access_token return self.access_token
def request(req_data, req_context, method): headers = {'Accept': 'application/json', 'content-type': 'application/json; charset=utf-8', 'User-Agent': 'PythonSampleApp1'} payload = req_data['payload'] url = req_data['url'] auth = OAuth1(req_context.consumer_key, req_context.consumer_secret, req_context.access_key, req_context.access_secret) if method == 'POST': response_data = do_post(url, headers, payload, auth) return response_data # else GET method
def request(method, path, retry=True, **params): response = session.request(method, 'https://api.twitter.com/1.1/' + path + '.json', params=params, auth=requests_oauthlib.OAuth1(CONSUMER_KEY, secret.CONSUMER_SECRET, ACCESS_TOKEN, secret.ACCESS_TOKEN_SECRET)) response.raise_for_status() return response.json()
def SetCredentials(self, consumer_key, consumer_secret, access_token_key=None, access_token_secret=None, application_only_auth=False): """Set the consumer_key and consumer_secret for this instance Args: consumer_key: The consumer_key of the twitter account. consumer_secret: The consumer_secret for the twitter account. access_token_key: The oAuth access token key value you retrieved from running get_access_token.py. access_token_secret: The oAuth access token's secret, also retrieved from the get_access_token.py run. application_only_auth: Whether to generate a bearer token and use Application-Only Auth """ self._consumer_key = consumer_key self._consumer_secret = consumer_secret self._access_token_key = access_token_key self._access_token_secret = access_token_secret if application_only_auth: self._bearer_token = self.GetAppOnlyAuthToken(consumer_key, consumer_secret) self.__auth = OAuth2(token=self._bearer_token) else: auth_list = [consumer_key, consumer_secret, access_token_key, access_token_secret] if all(auth_list): self.__auth = OAuth1(consumer_key, consumer_secret, access_token_key, access_token_secret) self._config = None
def make(): oauth = OAuth1(apiKey, client_secret = apiSecret ) response = requests.post(url=requestTokenUrl, auth=oauth) OAuthSecret = str(parse_qs(urlparse('?'+str(response.text)).query)['oauth_token_secret']) OAuthToken = str(parse_qs(urlparse('?'+str(response.text)).query)['oauth_token']) OAuthSecret = OAuthSecret[2:-2] OAuthToken = OAuthToken[2:-2] # MacOS try: chrome_path = 'open -a /Applications/Google\ Chrome.app %s' webbrowser.get(chrome_path).open("https://api.twitter.com/oauth/authenticate?oauth_token=" + str(OAuthToken)) except: print "You don't use mac!" # Windows try: chrome_path = 'C:\Program Files (x86)\Google\Chrome\Application\chrome.exe %s' webbrowser.get(chrome_path).open("https://api.twitter.com/oauth/authenticate?oauth_token=" + str(OAuthToken)) except: print "You don't use windows!" # Linux try: chrome_path = '/usr/bin/google-chrome %s' webbrowser.get(chrome_path).open("https://api.twitter.com/oauth/authenticate?oauth_token=" + str(OAuthToken)) except: print "You don't use linux!" return json.dumps({"response":"true","message":"Please Authorize the TwitterRefresh App to Proceed Further"})
def _load_saved_auth(self): if os.path.isfile(etrade_config.auth_file_path): with open(etrade_config.auth_file_path, 'r') as f: auth_props = json.load(f) self._resource_owner_key = auth_props['resourceOwnerKey'] self._resource_owner_secret = auth_props['resourceOwnerSecret'] self._last_used = datetime.strptime(auth_props['utcTime'], _time_format) else: self._write_auth_txt() self._authorization = OAuth1(etrade_config.oauth_consumer_key, client_secret=etrade_config.oath_consumer_secret, resource_owner_key=self._resource_owner_key, resource_owner_secret=self._resource_owner_secret)
def get_current(self): """ Get a current OAuth1, renew if necessary :return: the current authorization :rtype: OAuth1 """ self._load_saved_auth() utc_now = datetime.utcnow() # can reuse if not more than 2 hours since last use and in the same day if self._authorization and self._last_used + timedelta(hours=2) > utc_now and \ self._last_used.day == utc_now.day and not etrade_config.changed: # if self._authorization and not etrade_config.changed: pass print('can use existing authorization properties') else: self.reauthorize() # update last used time self._last_used = datetime.utcnow() self._write_auth_txt() # return the authorization, new or reused return self._authorization
def twitter(): request_token_url = 'https://api.twitter.com/oauth/request_token' access_token_url = 'https://api.twitter.com/oauth/access_token' if request.json.get('oauth_token') and request.json.get('oauth_verifier'): auth = OAuth1(app.config['TWITTER_CONSUMER_KEY'], client_secret=app.config['TWITTER_CONSUMER_SECRET'], resource_owner_key=request.json.get('oauth_token'), verifier=request.json.get('oauth_verifier')) r = requests.post(access_token_url, auth=auth) profile = dict(parse_qsl(r.text)) user = User.query.filter_by(twitter=profile['user_id']).first() if user: token = create_token(user) return jsonify(token=token) u = User(twitter=profile['user_id'], display_name=profile['screen_name']) db.session.add(u) db.session.commit() token = create_token(u) return jsonify(token=token) else: oauth = OAuth1(app.config['TWITTER_CONSUMER_KEY'], client_secret=app.config['TWITTER_CONSUMER_SECRET'], callback_uri=request.json['redirectUri']) r = requests.post(request_token_url, auth=oauth) oauth_token = dict(parse_qsl(r.text)) return jsonify(oauth_token)