我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用google.appengine.api.urlfetch.POST。
def pushtxn(raw_tx): '''Insight send raw tx API''' url = PUSH_TX_URL payload = urllib.urlencode({ "rawtx": raw_tx }) result = urlfetch.fetch(url, method=urlfetch.POST, payload=payload ) if result.status_code == 200: j = json.loads(result.content) txid = j.get('txid') return txid, raw_tx else: msg = 'Error accessing insight API:'+str(result.status_code)+" "+str(result.content) ErrorNotification.new(msg) return None, msg
def exportItems( module, target, importKey, startCursor, endCursor): Skel = skeletonByKind( module ) query = Skel().all().cursor( startCursor, endCursor ) for item in query.run(250): flatItem = DbTransfer.genDict( item ) formFields = { "e": pickle.dumps(flatItem).encode("HEX"), "key": importKey } result = urlfetch.fetch( url=target, payload=urllib.urlencode(formFields), method=urlfetch.POST, headers={'Content-Type': 'application/x-www-form-urlencoded'}) if startCursor == endCursor: try: utils.sendEMailToAdmins("Export of kind %s finished" % module, "ViUR finished to export kind %s to %s.\n" % (module, target)) except: #OverQuota, whatever pass # --- import ---
def check_language(text): if len(text) > MAX_LENGTH: logging.info("trimming text from %s to %s", len(text), MAX_LENGTH) text = text[:MAX_LENGTH] base_url = 'https://www.googleapis.com/language/translate/v2/detect' params = {'key': API_KEY, 'q': text} form_data = urls.urlencode(params) if urllib2_fallback: request = urllib2.Request(base_url, form_data, {'X-HTTP-Method-Override': 'GET'}) response_content = urllib2.urlopen(request).read() else: result = urlfetch.fetch(url=base_url, payload=form_data, method=urlfetch.POST, headers={'X-HTTP-Method-Override': 'GET'}) if result.status_code != 200: error = "result status code is %s for content %s" % (result.status_code, result.content) logging.error(error) raise Exception("Error in translation: %s" % error) response_content = result.content json_content = json.loads(response_content) real_results = json_content['data']['detections'][0][0] logging.info("text classification returned %s", real_results) if real_results['confidence'] > 0.10: return real_results['language'] else: return None
def auth_callback_provider( self ): params = { 'code': self.request.get( 'code' ), 'client_id': self.get_auth_request_client_id(), 'client_secret': self.get_client_secret(), 'redirect_uri': self.domain_name[ :-1 ] + self.get_auth_callback(), 'grant_type': 'authorization_code', } urlParams = enki.libutil.urlencode( params ) url = self.token_endpoint() result = self.urlfetch_safe( url = url, payload = urlParams, method = urlfetch.POST, headers = { 'Content-Type': 'application/x-www-form-urlencoded' } ) self.process_token_result( result )
def get_access_token(force=False): """Tries to obtain access token from memcache and, if it fails, obtains a new set and stores in memcache. See https://dev.twitter.com/oauth/application-only. Deleting the memcache key `access_token` will trigger a token refresh. """ token = memcache.get('access_token') if force or token is None: logging.warning('Needed to fetch access_token') encoded_key = urllib.quote_plus(CUSTOMER_KEY) encoded_secret = urllib.quote_plus(CUSTOMER_SECRET) encoded_credentials = base64.b64encode( "{}:{}".format(encoded_key, encoded_secret)) response = urlfetch.fetch( 'https://api.twitter.com/oauth2/token', payload='grant_type=client_credentials', method=urlfetch.POST, headers={'Authorization': 'Basic ' + encoded_credentials}) if response.status_code == urlfetch.httplib.OK: response_data = json.loads(response.content) token = response_data['access_token'] memcache.set('access_token', token, 2592000) # 30 days return token
def urlread(url, data=None, headers=None): if data is not None: if headers is None: headers = {"Content-type": "application/x-www-form-urlencoded"} method = urlfetch.POST else: if headers is None: headers = {} method = urlfetch.GET result = urlfetch.fetch(url, method=method, payload=data, headers=headers) if result.status_code == 200: return result.content else: raise urllib2.URLError("fetch error url=%s, code=%d" % (url, result.status_code))
def get_request_token(base): ''' Get request token ''' data = urllib.urlencode({ 'consumer_key': POCKET_CONSUMER_KEY, 'redirect_uri': base + POCKET_FINISH_REDIRECT }) logging.debug(data) res = urlfetch.fetch( url=POCKET_OAUTH_REQUEST, method=urlfetch.POST, payload=data, validate_certificate=True) code = redirect = None logging.debug(res.status_code) if res.status_code == 200: result = res.content if 'code=' in result: code = result.replace('code=','') redirect = POCKET_AUTHORIZE_REDIR + '?request_token=%s&redirect_uri=%s' % (code, base + POCKET_FINISH_REDIRECT) return (code, redirect)
def get_access_token(code): ''' Get request token ''' data = urllib.urlencode({ 'consumer_key': POCKET_CONSUMER_KEY, 'code': code }) logging.debug(data) res = urlfetch.fetch( url=POCKET_OAUTH_AUTHORIZE, method=urlfetch.POST, payload=data, validate_certificate=True) code = redirect = None logging.debug(res.status_code) if res.status_code == 200: result = res.content data = urlparse.parse_qs(result) access_token = data.get('access_token', [None])[0] return access_token
def fetch(url, data=None, headers=None, cookie=Cookie.SimpleCookie(), user_agent='Mozilla/5.0'): headers = headers or {} if data is not None: data = urllib.urlencode(data) if user_agent: headers['User-agent'] = user_agent headers['Cookie'] = ' '.join( ['%s=%s;' % (c.key, c.value) for c in cookie.values()]) try: from google.appengine.api import urlfetch except ImportError: req = urllib2.Request(url, data, headers) html = urllib2.urlopen(req).read() else: method = ((data is None) and urlfetch.GET) or urlfetch.POST while url is not None: response = urlfetch.fetch(url=url, payload=data, method=method, headers=headers, allow_truncated=False, follow_redirects=False, deadline=10) # next request will be a get, so no need to send the data again data = None method = urlfetch.GET # load cookies from the response cookie.load(response.headers.get('set-cookie', '')) url = response.headers.get('location') html = response.content return html
def obtain_bearer_token(host, path): """Given a bearer token, send a GET request to the API. Args: host (str): The domain host of the API. path (str): The path of the API after the domain. params (dict): An optional set of query parameters in the request. Returns: str: OAuth bearer token, obtained using client_id and client_secret. Raises: HTTPError: An error occurs from the HTTP request. """ url = '{0}{1}'.format(host, quote(path.encode('utf8'))) data = urlencode({ 'client_id': CLIENT_ID, 'client_secret': CLIENT_SECRET, 'grant_type': GRANT_TYPE, }) print('@@@@@@@@@' + CLIENT_ID) headers = { 'content-type': 'application/x-www-form-urlencoded', } result = urlfetch.fetch( url=url, payload=data, method=urlfetch.POST, headers=headers) print('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@' + result.content) return "BIO6_LpbIcFkeKDB9SsSAONt3lE2IwrdiTxUeq-Ag1MKOzSc4m-8QyPjdV6WmI27ySuLEKv7czHoJmJjFHrCyjfgxucTvKPpJG9JCsg_08KCz4J-WrEfeaiACoJ2WXYx"
def fetch(url, data=None, headers=None, cookie=Cookie.SimpleCookie(), user_agent='Mozilla/5.0'): headers = headers or {} if not data is None: data = urllib.urlencode(data) if user_agent: headers['User-agent'] = user_agent headers['Cookie'] = ' '.join( ['%s=%s;' % (c.key, c.value) for c in cookie.values()]) try: from google.appengine.api import urlfetch except ImportError: req = urllib2.Request(url, data, headers) html = urllib2.urlopen(req).read() else: method = ((data is None) and urlfetch.GET) or urlfetch.POST while url is not None: response = urlfetch.fetch(url=url, payload=data, method=method, headers=headers, allow_truncated=False, follow_redirects=False, deadline=10) # next request will be a get, so no need to send the data again data = None method = urlfetch.GET # load cookies from the response cookie.load(response.headers.get('set-cookie', '')) url = response.headers.get('location') html = response.content return html
def sendData(form_field): url = "https://api.line.me/v2/bot/message/push" result = urlfetch.fetch( url=url, payload=json.dumps(form_field, ensure_ascii=False), method=urlfetch.POST, headers={ 'Content-type': 'application/json', 'Authorization': 'Bearer ' + const.ChannelAccessToken } ) if result.status_code == 200: logging.debug(result.content) else: logging.debug(result.content)
def sendData(ifttt_event, ifttt_key, senderID, from_id, event_type, msg_data): if ifttt_key == None: send2Line.sendText(str(senderID), const.MSG_NONREGISTRATION) return if msg_data.count('\n'): msg_data = u"<pre>" + msg_data + u"</pre>" form_fields = { "value1": from_id, "value2": event_type, "value3": msg_data, } logging.debug(form_fields) logging.debug(ifttt_key) requestStr = json.dumps(form_fields, ensure_ascii=False) # requestStr=requestStr.replace('\n', '\\n').replace('\r', '') url = "https://maker.ifttt.com/trigger/" + ifttt_event + "/with/key/" + ifttt_key result = urlfetch.fetch( url=url, payload=requestStr, method=urlfetch.POST, headers={ 'Content-Type': ' application/json' } ) logging.debug(result.status_code) if result.status_code == 200: logging.debug(result.content) elif result.status_code == 401: send2Line.sendText(str(senderID), const.MSG_MAKERKEY_FAILED) logging.debug(result.content) else: logging.debug(result.content)
def telegram_post(data, deadline=3): return urlfetch.fetch(url=TELEGRAM_URL_SEND, payload=data, method=urlfetch.POST, headers=JSON_HEADER, deadline=deadline)
def telegram_query(uid, deadline=3): data = json.dumps({'chat_id': uid, 'action': 'typing'}) return urlfetch.fetch(url=TELEGRAM_URL_CHAT_ACTION, payload=data, method=urlfetch.POST, headers=JSON_HEADER, deadline=deadline)
def telegram_photo(data, deadline=3): return urlfetch.fetch(url=TELEGRAM_URL_SEND_PHOTO, payload=data, method=urlfetch.POST, headers=JSON_HEADER, deadline=deadline)
def send_typing(uid): data = json.dumps({'chat_id': uid, 'action': 'typing'}) try: rpc = urlfetch.create_rpc() urlfetch.make_fetch_call(rpc, url=TELEGRAM_URL_CHAT_ACTION, payload=data, method=urlfetch.POST, headers=JSON_HEADER) except: return
def get(self): taskqueue.add(url='/promo', method="POST")
def fromClient(self, valuesCache, name, data): """ Reads a value from the client. If this value is valid for this bone, store this value and return None. Otherwise our previous value is left unchanged and an error-message is returned. :param name: Our name in the skeleton :type name: String :param data: *User-supplied* request-data :type data: Dict :returns: None or String """ if request.current.get().isDevServer: #We dont enforce captchas on dev server return( None ) user = utils.getCurrentUser() if user and "root" in user["access"]: # Don't bother trusted users with this (not supported by admin/vi anyways) return( None ) if not "recaptcha_challenge_field" in data.keys() or not "recaptcha_response_field" in data.keys(): return( u"No Captcha given!" ) data = { "privatekey": self.privateKey, "remoteip": request.current.get().request.remote_addr, "challenge": data["recaptcha_challenge_field"], "response": data["recaptcha_response_field"] } response = urlfetch.fetch( url="http://www.google.com/recaptcha/api/verify", payload=urllib.urlencode( data ), method=urlfetch.POST, headers={"Content-Type": "application/x-www-form-urlencoded"} ) if str(response.content).strip().lower().startswith("true"): return( None ) return( u"Invalid Captcha" )
def get(self): # [START urlfetch-post] try: form_data = urllib.urlencode(UrlPostHandler.form_fields) headers = {'Content-Type': 'application/x-www-form-urlencoded'} result = urlfetch.fetch( url='http://localhost:8080/submit_form', payload=form_data, method=urlfetch.POST, headers=headers) self.response.write(result.content) except urlfetch.Error: logging.exception('Caught exception fetching url') # [END urlfetch-post]
def __init__(self, host, port=None, strict=False, timeout=None): from google.appengine.api import urlfetch self._fetch = urlfetch.fetch self._method_map = { 'GET': urlfetch.GET, 'POST': urlfetch.POST, 'HEAD': urlfetch.HEAD, 'PUT': urlfetch.PUT, 'DELETE': urlfetch.DELETE, 'PATCH': urlfetch.PATCH, } self.host = host self.port = port self._method = self._url = None self._body = '' self.headers = [] if not isinstance(timeout, (float, int, long)): timeout = None self.timeout = timeout
def _MakeRemoteSyncCall(self, service, call, request, response): """Send an RPC to a remote_api endpoint.""" request_pb = remote_api_pb.Request() request_pb.set_service_name(service) request_pb.set_method(call) request_pb.set_request(request.Encode()) response_pb = remote_api_pb.Response() encoded_request = request_pb.Encode() try: urlfetch_response = urlfetch.fetch(self.remote_url, encoded_request, urlfetch.POST, self.extra_headers, follow_redirects=False, deadline=10) except Exception, e: logging.exception('Fetch failed to %s', self.remote_url) raise FetchFailed(e) if urlfetch_response.status_code != 200: logging.error('Fetch failed to %s; Status %s; body %s', self.remote_url, urlfetch_response.status_code, urlfetch_response.content) raise FetchFailed(urlfetch_response.status_code) response_pb.ParseFromString(urlfetch_response.content) if response_pb.has_application_error(): error_pb = response_pb.application_error() raise apiproxy_errors.ApplicationError(error_pb.code(), error_pb.detail()) elif response_pb.has_exception(): raise pickle.loads(response_pb.exception()) elif response_pb.has_java_exception(): raise UnknownJavaServerError('An unknown error has occured in the ' 'Java remote_api handler for this call.') else: response.ParseFromString(response_pb.response())
def __init__(self, host, port=None, strict=None, timeout=_GLOBAL_DEFAULT_TIMEOUT, source_address=None, context=None): # net.proto.ProcotolBuffer relies on httplib so importing urlfetch at the # module level causes a failure on prod. That means the import needs to be # lazy. from google.appengine.api import urlfetch self._fetch = urlfetch.fetch self._method_map = { 'GET': urlfetch.GET, 'POST': urlfetch.POST, 'HEAD': urlfetch.HEAD, 'PUT': urlfetch.PUT, 'DELETE': urlfetch.DELETE, 'PATCH': urlfetch.PATCH, } self.host = host self.port = port # With urllib2 in Python 2.6, an object can be passed here. # The default is set to socket.GLOBAL_DEFAULT_TIMEOUT which is an object. # We only accept float, int or long values, otherwise it can be # silently ignored. if not isinstance(timeout, (float, int, long)): timeout = None self.timeout = timeout # Both 'strict' and 'source_address' are ignored. self._method = self._url = None self._body = '' self.headers = []
def auth_callback_provider( self ): params = { 'openid.ns': '', 'openid.op_endpoint': '', 'openid.claimed_id': '', 'openid.identity': '', 'openid.return_to': '', 'openid.response_nonce': '', 'openid.assoc_handle': '', 'openid.signed': '', 'openid.sig': '', } for key in params: params[ key ] = self.request.get( key ) params[ 'openid.mode' ] = 'check_authentication' # param {'openid.claimed_id': u'http://steamcommunity.com/openid/id/7****************'} claimedId = str( params[ 'openid.claimed_id' ])[ len( 'http://steamcommunity.com/openid/id/' ): ] loginInfo = { 'provider_name': self.get_provider_name( ), 'provider_uid': claimedId, 'email': '', 'email_verified': '' } urlParams = enki.libutil.urlencode( params ) fullURL = 'https://steamcommunity.com/openid/login' result = self.urlfetch_safe( url = fullURL, payload = urlParams, method = urlfetch.POST ) if 'ns:http://specs.openid.net/auth/2.0\nis_valid:true\n' in result.content: # only if is_valid do we trust the loginInfo self.provider_authenticated_callback( loginInfo ) #===== TWITTER =========================================================================================================
def auth_sign( self, normalised_url, ordered_params, token_secret = '', method_get = False ): # note: create signature see https://dev.twitter.com/oauth/overview/creating-signatures params_to_sign = enki.libutil.urlencode( ordered_params ) oauth_signature_string = '' if method_get: oauth_signature_string = 'GET&' + percent_encode( normalised_url ) + '&' + percent_encode( params_to_sign ) else: oauth_signature_string = 'POST&' + percent_encode( normalised_url ) + '&' + percent_encode( params_to_sign ) key = percent_encode( settings.secrets.CLIENT_SECRET_TWITTER ) + '&' + token_secret hmac_hash = hmac.new( key, oauth_signature_string, hashlib.sha1 ) oauth_signature = base64.b64encode( hmac_hash.digest()) return oauth_signature
def auth_request( self ): # STEP 1 # note: these parameters need to be sorted alphabetically by key. They are therefore a list of tuples and not a dictionary. params = [( 'oauth_callback' , self.domain_name[ :-1 ] + self.get_auth_callback()), ( 'oauth_consumer_key' , settings.secrets.CLIENT_ID_TWITTER ), ( 'oauth_nonce' , webapp2_extras.security.generate_random_string( length = 42, pool = webapp2_extras.security.ALPHANUMERIC ).encode( 'utf-8' )), ( 'oauth_signature_method' , "HMAC-SHA1" ), ( 'oauth_timestamp' , str( int( time.time()))), ( 'oauth_version' , "1.0" )] normalised_url = 'https://api.twitter.com/oauth/request_token/' oauth_signature = self.auth_sign( normalised_url, params ) params.append(( 'oauth_signature', oauth_signature )) url_params = enki.libutil.urlencode( params ) result = self.urlfetch_safe( url = normalised_url, payload = url_params, method = urlfetch.POST ) response = self.process_result_as_query_string( result ) # STEP 2 if response.get( 'oauth_callback_confirmed' ) != 'true' : self.abort( 401 ) return else: oauth_token = response.get( 'oauth_token' ) self.session[ 'twitter_oauth_token' ] = oauth_token self.session[ 'twitter_oauth_token_secret' ] = response.get( 'oauth_token_secret' ) url_redirect_params = enki.libutil.urlencode([( 'oauth_token', oauth_token )]) url_redirect = 'https://api.twitter.com/oauth/authenticate?' + url_redirect_params self.redirect( url_redirect ) return
def get_download_URL( self, enkiDL_URL, secret, item_to_download, ip_addr ): form_fields = { 'item' : item_to_download, 'secret' : secret, 'ip_addr' : ip_addr } form_data = enki.libutil.urlencode( form_fields ) try: result = urlfetch.fetch( url = enkiDL_URL, payload = form_data, method = urlfetch.POST ) if result.status_code == 200: token = result.content self.download_url = enkiDL_URL + 'download?token=' + str( token ) + '&item=' + str( item_to_download ) else: self.error = 1 return except urlfetch.DownloadError: self.error = 2 return
def send_attachment_message(sender, attachment_type, payload): fb_sender_id = sender['id'] content = { 'recipient': { 'id': fb_sender_id }, 'message': { 'attachment': { 'type': attachment_type, 'payload': payload } } } headers = { 'Content-Type': 'application/json' } payload = json.dumps(content) logging.debug(payload) url = 'https://graph.facebook.com/v2.6/me/messages?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN if config.PRODUCTION: req = urlfetch.fetch( url, payload, urlfetch.POST, headers ) logging.debug(req.content)
def send_fb_message(payload): if config.PRODUCTION: try: req = urlfetch.fetch( 'https://graph.facebook.com/v2.6/me/messages?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN, payload, urlfetch.POST, {'Content-Type': 'application/json'} ) logging.debug(req.content) except urlfetch.Error as e: logging.error(e.message)
def request(self, operation, url, data=None, headers=None): """Performs an HTTP call to the server, supports GET, POST, PUT, and DELETE. Usage example, perform and HTTP GET on http://www.google.com/: import atom.http client = atom.http.HttpClient() http_response = client.request('GET', 'http://www.google.com/') Args: operation: str The HTTP operation to be performed. This is usually one of 'GET', 'POST', 'PUT', or 'DELETE' data: filestream, list of parts, or other object which can be converted to a string. Should be set to None when performing a GET or DELETE. If data is a file-like object which can be read, this method will read a chunk of 100K bytes at a time and send them. If the data is a list of parts to be sent, each part will be evaluated and sent. url: The full URL to which the request should be sent. Can be a string or atom.url.Url. headers: dict of strings. HTTP headers which should be sent in the request. """ all_headers = self.headers.copy() if headers: all_headers.update(headers) # Construct the full payload. # Assume that data is None or a string. data_str = data if data: if isinstance(data, list): # If data is a list of different objects, convert them all to strings # and join them together. converted_parts = [__ConvertDataPart(x) for x in data] data_str = ''.join(converted_parts) else: data_str = __ConvertDataPart(data) # If the list of headers does not include a Content-Length, attempt to # calculate it based on the data object. if data and 'Content-Length' not in all_headers: all_headers['Content-Length'] = len(data_str) # Set the content type to the default value if none was set. if 'Content-Type' not in all_headers: all_headers['Content-Type'] = 'application/atom+xml' # Lookup the urlfetch operation which corresponds to the desired HTTP verb. if operation == 'GET': method = urlfetch.GET elif operation == 'POST': method = urlfetch.POST elif operation == 'PUT': method = urlfetch.PUT elif operation == 'DELETE': method = urlfetch.DELETE else: method = None return HttpResponse(urlfetch.Fetch(url=str(url), payload=data_str, method=method, headers=all_headers))
def impersonate(self, user_id=DEFAULT): """ To use this make a POST to `http://..../impersonate request.post_vars.user_id=<id>` Set request.post_vars.user_id to 0 to restore original user. requires impersonator is logged in and:: has_permission('impersonate', 'auth_user', user_id) """ request = current.request session = current.session auth = session.auth table_user = self.table_user() if not self.is_logged_in(): raise HTTP(401, "Not Authorized") current_id = auth.user.id requested_id = user_id user = None if user_id is DEFAULT: user_id = current.request.post_vars.user_id if user_id and user_id != self.user.id and user_id != '0': if not self.has_permission('impersonate', self.table_user(), user_id): raise HTTP(403, "Forbidden") user = table_user(user_id) if not user: raise HTTP(401, "Not Authorized") auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL) auth.user.update( table_user._filter_fields(user, True)) self.user = auth.user self.update_groups() log = self.messages['impersonate_log'] self.log_event(log, dict(id=current_id, other_id=auth.user.id)) self.run_login_onaccept() elif user_id in (0, '0'): if self.is_impersonating(): session.clear() session.update(pickle.loads(auth.impersonator)) self.user = session.auth.user self.update_groups() self.run_login_onaccept() return None if requested_id is DEFAULT and not request.post_vars: return SQLFORM.factory(Field('user_id', 'integer')) elif not user: return None else: return SQLFORM(table_user, user.id, readonly=True)
def impersonate(self, user_id=DEFAULT): """ To use this make a POST to `http://..../impersonate request.post_vars.user_id=<id>` Set request.post_vars.user_id to 0 to restore original user. requires impersonator is logged in and:: has_permission('impersonate', 'auth_user', user_id) """ request = current.request session = current.session auth = session.auth table_user = self.table_user() if not self.is_logged_in(): raise HTTP(401, "Not Authorized") current_id = auth.user.id requested_id = user_id if user_id is DEFAULT: user_id = current.request.post_vars.user_id if user_id and user_id != self.user.id and user_id != '0': if not self.has_permission('impersonate', self.table_user(), user_id): raise HTTP(403, "Forbidden") user = table_user(user_id) if not user: raise HTTP(401, "Not Authorized") auth.impersonator = pickle.dumps(session, pickle.HIGHEST_PROTOCOL) auth.user.update( table_user._filter_fields(user, True)) self.user = auth.user self.update_groups() log = self.messages['impersonate_log'] self.log_event(log, dict(id=current_id, other_id=auth.user.id)) self.run_login_onaccept() elif user_id in (0, '0'): if self.is_impersonating(): session.clear() session.update(pickle.loads(auth.impersonator)) self.user = session.auth.user self.update_groups() self.run_login_onaccept() return None if requested_id is DEFAULT and not request.post_vars: return SQLFORM.factory(Field('user_id', 'integer')) return SQLFORM(table_user, user.id, readonly=True)
def auth_callback_provider( self ): # STEP 3 oauth_verifier = self.request.get( 'oauth_verifier' ) params = [( 'oauth_consumer_key' , settings.secrets.CLIENT_ID_TWITTER ), ( 'oauth_nonce' , webapp2_extras.security.generate_random_string( length = 42, pool = webapp2_extras.security.ALPHANUMERIC ).encode( 'utf-8' )), ( 'oauth_signature_method' , "HMAC-SHA1" ), ( 'oauth_timestamp' , str( int( time.time()))), ( 'oauth_token', self.session.get( 'twitter_oauth_token' )), ( 'oauth_version' , "1.0" )] normalised_url = 'https://api.twitter.com/oauth/access_token/' oauth_signature = self.auth_sign( normalised_url, params, self.session.get( 'twitter_oauth_token_secret') ) params.append(( 'oauth_signature', oauth_signature )) params.append(( 'oauth_verifier', oauth_verifier )) url_params = enki.libutil.urlencode( params ) result = self.urlfetch_safe( url = normalised_url, payload = url_params, method = urlfetch.POST ) response = self.process_result_as_query_string( result ) oauth_token = response.get( 'oauth_token' ) oauth_token_secret = response.get('oauth_token_secret') user_id = response.get( 'user_id') if user_id and oauth_token: #get email address if we can verify_params = [('include_email', 'true'), ('include_entities','false'), ('oauth_consumer_key', settings.secrets.CLIENT_ID_TWITTER ), ('oauth_nonce', webapp2_extras.security.generate_random_string( length = 42, pool = webapp2_extras.security.ALPHANUMERIC ).encode( 'utf-8' )), ('oauth_signature_method', "HMAC-SHA1"), ('oauth_timestamp', str(int(time.time()))), ('oauth_token', oauth_token ), ('oauth_version', "1.0"), ('skip_status', 'true')] verify_oauth_signature = self.auth_sign('https://api.twitter.com/1.1/account/verify_credentials.json', verify_params,oauth_token_secret, method_get=True ) verify_params.append(('oauth_signature', verify_oauth_signature)) verify_url_params = enki.libutil.urlencode( verify_params ) full_url = 'https://api.twitter.com/1.1/account/verify_credentials.json?' + verify_url_params verify_credentials_result_json = self.urlfetch_safe( url = full_url, method = urlfetch.GET ) verify_credentials_result = self.process_result_as_JSON(verify_credentials_result_json) response['email'] = verify_credentials_result['email'] response['email_verified'] = True loginInfoSettings = { 'provider_uid': 'user_id', 'email': 'email', 'email_verified': 'email_verified' } loginInfo = self.process_login_info( loginInfoSettings, response ) self.provider_authenticated_callback( loginInfo ) else: self.abort( 401 ) return
def set_welcome_message(fb_page_id): url = 'https://graph.facebook.com/v2.6/' + fb_page_id + '/thread_settings?access_token=' + config.FACEBOOK_PAGE_ACCESS_TOKEN if config.PRODUCTION: content = { 'setting_type': 'call_to_actions', 'thread_state': 'new_thread', 'call_to_actions': [ { 'message': { 'text': 'Hello there!', 'attachment': { 'type': 'template', 'payload': { 'template_type': 'generic', 'elements': [ { 'title': 'Welcome to %s' % config.FACEBOOK_BOT_NAME, 'item_url': 'https://gilacoolbot.appspot.com', 'image_url': 'http://messengerdemo.parseapp.com/img/rift.png', 'subtitle': 'This is a subtitle', 'buttons': [ { 'type': 'web_url', 'title': 'View website', 'url': 'https://gilacoolbot.appspot.com' }, { 'type': 'postback', 'title': 'Start chatting', 'payload': 'DEVELOPER_DEFINED_PAYLOAD', } ] } ] } } } } ] } headers = { 'Content-Type': 'application/json' } payload = json.dumps(content) req = urlfetch.fetch( url, payload, urlfetch.POST, headers ) logging.debug(req.content)