我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用facebook.GraphAPI()。
def get_current_user(self): cookies = dict((n, self.cookies[n].value) for n in self.cookies.keys()) cookie = facebook.get_user_from_cookie( cookies, options.facebook_app_id, options.facebook_app_secret) if not cookie: return None user = self.db.get( "SELECT * FROM users WHERE id = %s", cookie["uid"]) if not user: # TODO: Make this fetch async rather than blocking graph = facebook.GraphAPI(cookie["access_token"]) profile = graph.get_object("me") self.db.execute( "REPLACE INTO users (id, name, profile_url, access_token) " "VALUES (%s,%s,%s,%s)", profile["id"], profile["name"], profile["link"], cookie["access_token"]) user = self.db.get( "SELECT * FROM users WHERE id = %s", profile["id"]) elif user.access_token != cookie["access_token"]: self.db.execute( "UPDATE users SET access_token = %s WHERE id = %s", cookie["access_token"], user.id) return user
def do_init(self): app_id = self.get_configuration('facebook_app_id') app_secret = self.get_configuration('facebook_app_secret') app_access_token = self.get_configuration('facebook_access_token') if app_id and app_secret and app_access_token: params = { 'client_id': app_id, 'client_secret': app_secret, 'grant_type': 'fb_exchange_token', 'fb_exchange_token': app_access_token } r = self.requests.get("https://graph.facebook.com/oauth/access_token", params=params) if r.ok: oauth_access_token = r.json()['access_token'] self.oauth_access_token = self.write_configuration('facebook_oauth_token', oauth_access_token) self.graph = facebook.GraphAPI(oauth_access_token) return True return False
def post_to_facebook(message, link=None): """Posts a message to the Facebook page using GraphAPI authenticated via `FACEBOOK_PAGE_ACCESS_TOKEN`. Args: - message: str. The content of the message to be posted on Facebook. - link: str. (Optional) Url of the attachment to be posted along with message. Returns: - None """ graph = GraphAPI(access_token=FACEBOOK_PAGE_ACCESS_TOKEN) attachment = { 'link': link, # link to visit on clicking on the attachment. 'picture': link # link of the attachment to be posted. } graph.put_wall_post(message=message, attachment=attachment)
def scrape(self, user_data): try: oauth = user_data.services['facebook']['access_token'] except KeyError: return False graph = GraphAPI(access_token=oauth) events = yield facebook_paginate( graph.get_connections('me', connection_name='events'), max_results=self.num_events_per_user) data = {"events": []} for item in events: event = {} event['description'] = item.get('description', None) event['name'] = item.get('name', None) event['status'] = item.get('rsvp_status', None) data['events'].append(event) return data
def __init__(self, tokenpath): self._token = open(tokenpath).read() self._graph = facebook.GraphAPI(access_token=self._token, version='2.2') self.group = self._graph.get_objects(ids=["728040737345863"]) print(self.group["728040737345863"])
def connect(self): self.__client = facebook.GraphAPI(self.__page_access_token) # Sends a start up message on Telegram
def test_get_app_access_token(self): token = facebook.GraphAPI().get_app_access_token( self.app_id, self.secret) # Since "unicode" does not exist in Python 3, we cannot check # the following line with flake8 (hence the noqa comment). assert(isinstance(token, str) or isinstance(token, unicode)) # noqa
def test_get_deleted_app_access_token(self): deleted_app_id = '174236045938435' deleted_secret = '0073dce2d95c4a5c2922d1827ea0cca6' deleted_error_message = ( "Error validating application. Application has been deleted.") self.assert_raises_multi_regex( facebook.GraphAPIError, deleted_error_message, facebook.GraphAPI().get_app_access_token, deleted_app_id, deleted_secret)
def test_no_version(self): graph = facebook.GraphAPI() self.assertNotEqual(graph.version, None, "Version should not be None.") self.assertNotEqual( graph.version, "", "Version should not be an empty string.")
def test_invalid_version(self): self.assertRaises(facebook.GraphAPIError, facebook.GraphAPI, version=1.2)
def test_invalid_format(self): self.assertRaises(facebook.GraphAPIError, facebook.GraphAPI, version="2.a") self.assertRaises(facebook.GraphAPIError, facebook.GraphAPI, version="a.1") self.assertRaises(facebook.GraphAPIError, facebook.GraphAPI, version=2.23) self.assertRaises(facebook.GraphAPIError, facebook.GraphAPI, version="2.23")
def test_fql(self): graph = facebook.GraphAPI(version=2.0) graph.access_token = graph.get_app_access_token( self.app_id, self.secret) # Ensure that version is below 2.1. Facebook has stated that FQL is # not present in this or future versions of the Graph API. if graph.get_version() < 2.1: # This is a tautology, but we are limited in what information # we can retrieve with a proper OAuth access token. fql_result = graph.fql( "SELECT app_id from application where app_id = %s" % self.app_id) self.assertEqual(fql_result["data"][0]["app_id"], str(self.app_id))
def test_extend_access_token(self): """ Test if extend_access_token requests the correct endpoint. Note that this only tests whether extend_access_token returns the correct error message when called without a proper user-access token. """ try: facebook.GraphAPI().extend_access_token(self.app_id, self.secret) except facebook.GraphAPIError as e: self.assertEqual( e.message, "fb_exchange_token parameter not specified")
def test_bogus_access_token(self): invalid_token_error_message = "Invalid OAuth access token." graph = facebook.GraphAPI(access_token='wrong_token') self.assert_raises_multi_regex( facebook.GraphAPIError, invalid_token_error_message, graph.get_object, "me")
def get_user(self): """ Override this method by sublcassing the class. """ if not current.session.token: return None return dict(first_name='Pinco', last_name='Pallino', username='pincopallino') raise NotImplementedError("Must override get_user()") # Following code is never executed. It can be used as example # for overriding in subclasses. if not self.accessToken(): return None if not self.graph: self.graph = GraphAPI((self.accessToken())) user = None try: user = self.graph.get_object("me") except GraphAPIError: current.session.token = None self.graph = None if user: return dict(first_name=user['first_name'], last_name=user['last_name'], username=user['id'])
def get_api(cfg): graph = facebook.GraphAPI(cfg['access_token']) return graph
def __init__(self, graph=facebook.GraphAPI(FACEBOOK_ACCESS_TOKEN)): print("Initializing Uploader") self.events = None #self.token = self.login() self.graph = graph print("Connected to Facebook")
def get_api(cfg): graph = facebook.GraphAPI(cfg['access_token']) # Get page token to post as the page. You can skip # the following if you want to post as yourself. resp = graph.get_object('me/accounts') page_access_token = None for page in resp['data']: if page['id'] == cfg['page_id']: page_access_token = page['access_token'] graph = facebook.GraphAPI(page_access_token) return graph # You can also skip the above if you get a page token: # http://stackoverflow.com/questions/8231877/facebook-access-token-for-pages # and make that long-lived token as in Step 3
def set_graph(self, oauth_access_token=None): if oauth_access_token: self.oauth_access_token = oauth_access_token self.graph = facebook.GraphAPI(self.oauth_access_token)
def save_profile(backend, user, response, details, *args, **kwargs): if backend.name == 'facebook': # Fetches a list of friends who also use the app graph = facebook.GraphAPI(access_token=response['access_token'], version='2.9') friends = [friend for friend in graph.get_all_connections(id='me', connection_name='friends')] # Update LucemUser with new list of friends using the app try: lucem_user = user.lucemuser except ObjectDoesNotExist: # Create LucemUser instance if it does not yet exist lucem_user = LucemUser(user=user) lucem_user.friends = friends lucem_user.fb_id = response['id'] lucem_user.save()
def post(message, url=None): """ ?????????? ? Facebook """ config = SocialConfig.get_solo() if not message: return import facebook facebook_api = facebook.GraphAPI(config.facebook_access_token) attachment = {} if url: attachment['link'] = url facebook_api.put_wall_post(message=message, attachment=attachment)
def __init__(self, debug): self.debug = debug self.settings = Settings() try: self.graph = facebook.GraphAPI(access_token=self._get_acces_token(), version='2.9') self.logger = logging.basicConfig(level=logging.DEBUG) if self.debug else logging.basicConfig( level=logging.INFO) except KeyError: exit("Check if configuration is set right") try: self.scoop = Reserve() except: pass
def __init__(self, token): """ :param token: Facebook Page token :param _api: Instance of the GraphAPI object """ self.token = token self._api = facebook.GraphAPI(self.token)
def handle(text, mic, profile): """ Responds to user-input, typically speech text, with a summary of the user's Facebook notifications, including a count and details related to each individual notification. Arguments: text -- user-input, typically transcribed speech mic -- used to interact with the user (for both input and output) profile -- contains information related to the user (e.g., phone number) """ oauth_access_token = profile['keys']['FB_TOKEN'] graph = facebook.GraphAPI(oauth_access_token) try: results = graph.request("me/notifications") except facebook.GraphAPIError: mic.say("I have not been authorized to query your Facebook. If you " + "would like to check your notifications in the future, " + "please visit the Jasper dashboard.") return except: mic.say( "I apologize, there's a problem with that service at the moment.") if not len(results['data']): mic.say("You have no Facebook notifications. ") return updates = [] for notification in results['data']: updates.append(notification['title']) count = len(results['data']) mic.say("You have " + str(count) + " Facebook notifications. " + " ".join(updates) + ". ") return
def query_profile_with_graph_api(profile_id, access_token): graph = facebook.GraphAPI(access_token) profile = graph.get_object(profile_id) return profile
def authenticate_api(): auth_token = facepy.utils.get_application_access_token(FACEBOOK['APP_ID'], FACEBOOK['APP_SECRET'], api_version='2.9') graph = facebook.GraphAPI(auth_token) return graph
def __init__(self, access_token): self.graph = facebook.GraphAPI(access_token = access_token, version = '2.3') self.pread = post_reader.PostReader()
def __init__(self, config): """ Class constructor :param config: """ self.__config = config self.__graph = facebook.GraphAPI(access_token=self.__config['KEYS']['facebook_api'], version=self.__config['KEYS']['facebook_version']) self.__profile = self.__graph.get_object(id=self.__config['FACEBOOK']['profile']) client_mongo = MongoClient(self.__config['MONGO']['host'],int(self.__config['MONGO']['port'])) self.__database = client_mongo[self.__config['MONGO']['database']]
def facebook_status(status, image_path=None): tags = [_generate_tag(ass['facebook_id']) for ass in settings.BLOOD_ASSOCIATIONS if 'facebook_id' in ass] try: graph = facebook.GraphAPI(settings.FACEBOOK_TOKEN) if image_path: graph.put_photo(image=open(image_path, 'rb'), message=status, tags=json.dumps(tags)) else: graph.put_wall_post(status) except Exception as ex: raise MeteoSangueException(ex)
def get_page_details(access_token, page): graph = GraphAPI(access_token, version='2.7') return graph.get_object(page, fields='about,events,feed,picture')
def __init__(self, access_token, db_path, id_list): """Connects to Facebook Graph API and creates an SQLite database with four tables for Posts, Comments, Post_likes and People if not exists. Takes three arguments: access_token: your own Facebook access token that you can get on https://developers.facebook.com/tools/explorer/ db_path: the path of the SQLite database where you want to store the data id_list: ID's of the Facebook pages you want to scrape """ self.access_token = access_token self.db_path = db_path self.id_list = id_list g = facebook.GraphAPI(self.access_token, version='2.3') self.g = g # connect to database con = lite.connect(self.db_path) self.con = con with con: # create cursor to the database cur = con.cursor() self.cur = cur # create tables for posts, comments, post likes and people if not exists cur.execute( "CREATE TABLE IF NOT EXISTS Posts(post_id TEXT PRIMARY KEY, status_id TEXT, content TEXT, " "person_hash_id TEXT, published_date TEXT, last_comment_date TEXT, post_type TEXT, status_type TEXT, " "post_link TEXT, link TEXT, video_link TEXT, picture_link TEXT, link_name TEXT, link_caption TEXT, " "link_description TEXT, comment_count INTEGER, share_count INTEGER, like_count INTEGER, " "love_count INTEGER, wow_count INTEGER, haha_count INTEGER, sad_count INTEGER, angry_count INTEGER, " "mentions_count INTEGER, mentions TEXT, location TEXT, date_inserted TEXT)") cur.execute( "CREATE TABLE IF NOT EXISTS Comments(comment_id TEXT PRIMARY KEY, person_hash_id TEXT, post_id TEXT, " "comment_content TEXT, comment_date TEXT, like_count INTEGER)") cur.execute( "CREATE TABLE IF NOT EXISTS Post_likes(like_id TEXT PRIMARY KEY, person_hash_id TEXT, post_id TEXT)") cur.execute( "CREATE TABLE IF NOT EXISTS People(person_hash_id TEXT PRIMARY KEY, person_id TEXT, person_name TEXT)")
def update_data(self): graph = facebook.GraphAPI(access_token=settings.FACEBOOK_ACCESS_TOKEN) graph = graph.get_object(id=self.page_name, fields='fan_count') self.likes = int(graph.get('fan_count')) self.save()
def init(self, token): self.rundata.set("fb_token", token) self.graph = facebook.GraphAPI(access_token=token, version="2.7") self.rundata.set("stop", 5) self.messagePost()
def post(self,request): token = request.data['token'] graph = facebook.GraphAPI(token) args = {'fields' : 'id,name,email', } info = graph.get_object('me',**args) Image = graph.get_connections('me','picture?width=600&height=600') user_check = User.objects.filter(email=info['email']) if user_check.exists() == True: payload = jwt_payload_handler(user_check[0]) data = { 'token': jwt_encode_handler(payload),'IsSuccess':True} serializer = UserProfileSerializer(data) return Response(serializer.data) else: user = User.objects.create(email=info['email'],password='123456789',first_name=info['name']) user.set_password('oehewohis72631ksda02137') user.save() userprofile = UserProfile.objects.create(user=user,IsUser=True,IsFacebook=True) userprofile.IsFacebook = True image = Image['data'] image_2 = image.split(b'keep-alive\r\n\r\n', 1)[-1] #open('/Users/emreyavuz/Desktop/Python/Django/emre.jpg','wb').write(image_2) file_name = user.email.split('@')[0] + '.jpg' userprofile.Photo = 'Images/' + file_name userprofile.save() open(os.path.join(settings.MEDIA_ROOT+'/Images', file_name ), 'wb').write(image_2) payload = jwt_payload_handler(user_check[0]) data = { 'token': jwt_encode_handler(payload),'IsSuccess':True} serializer = UserProfileSerializer(data) return Response(serializer.data)
def scrape(self, user_data): try: oauth = user_data.services['facebook']['access_token'] except KeyError: return False graph = GraphAPI(access_token=oauth) profile = graph.get_object( 'me', fields='bio, birthday,education,interested_in,hometown,' 'political,relationship_status, religion, work' ) data = profile return data
def scrape(self, user_data): try: oauth = user_data.services['facebook']['access_token'] except KeyError: return False graph = GraphAPI(access_token=oauth) likes = yield facebook_paginate( graph.get_connections('me', connection_name='likes'), max_results=self.num_likes_per_user) data = {'likes': []} for like in likes: data['likes'].append(like['name']) return data
def scrape(self, user_data): try: oauth = user_data.services['facebook']['access_token'] except KeyError: return False graph = GraphAPI(access_token=oauth) data = { "text": [], "links": [] } posts = [] posts_blob = yield facebook_paginate( graph.get_connections( 'me', 'posts', fields='message, link, created_time' ), max_results=self.num_posts_per_user ) posts, links = self.message_filter(posts_blob) # To do comments we'd have to go through the different posts and look # Scraping the person's feed is another option, but we get more garbage data['text'] = posts data['links'] = links return data
def __init__(self, user_token=None): config = get_config() self._app_id = config.get('facebook.consumer_key') self._app_secret = config.get('facebook.consumer_secret') self._app_access_token = config.get('facebook.app_access_token') token = self._app_access_token if not user_token else user_token version = config.get('facebook.api_version', None) or API_VERSION_USED self._api = facebook.GraphAPI(token, DEFAULT_TIMEOUT, version)
def get_current_user(): """Set g.user to the currently logged in user. Called before each request, get_current_user sets the global g.user variable to the currently logged in user. A currently logged in user is determined by seeing if it exists in Flask's session dictionary. If it is the first time the user is logging into this application it will create the user and insert it into the database. If the user is not logged in, None will be set to g.user. """ # Set the user in the session dictionary as a global g.user and bail out # of this function early. if session.get('user'): g.user = session.get('user') return # Attempt to get the short term access token for the current user. result = get_user_from_cookie(cookies=request.cookies, app_id=FB_APP_ID, app_secret=FB_APP_SECRET) # If there is no result, we assume the user is not logged in. if result: # Check to see if this user is already in our database. user = User.query.filter(User.id == result['uid']).first() if not user: # Not an existing user so get info graph = GraphAPI(result['access_token']) profile = graph.get_object('me') if 'link' not in profile: profile['link'] = "" # Create the user and insert it into the database user = User(id=str(profile['id']), name=profile['name'], profile_url=profile['link'], access_token=result['access_token']) db.session.add(user) elif user.access_token != result['access_token']: # If an existing user, update the access token user.access_token = result['access_token'] # Add the user to the current session session['user'] = dict(name=user.name, profile_url=user.profile_url, id=user.id, access_token=user.access_token) # Commit changes to the database and set the user as a global g.user db.session.commit() g.user = session.get('user', None)
def send_post_fb(self): api = facebook.GraphAPI(tokens.fb) if len(self.image_links) > 1: response = requests.get(self.image_links[0]) pic = Image.open(io.BytesIO(response.content)) pic_byte = io.BytesIO() pic.save(pic_byte, format="png") pic_byte.seek(0) status = api.put_photo(image=pic_byte, message=self.final_text_fb) for url in self.image_links: response = requests.get(url) pic = Image.open(io.BytesIO(response.content)) pic_byte = io.BytesIO() pic.save(pic_byte, format="png") pic_byte.seek(0) status = api.put_photo(image=pic_byte, album_path=config.mm_fb_album + '/photos') return elif len(self.image_links) == 1: response = requests.get(self.image_links[0]) pic = Image.open(io.BytesIO(response.content)) pic_byte = io.BytesIO() pic.save(pic_byte, format="png") pic_byte.seek(0) status = api.put_photo(image=pic_byte, message=self.final_text_fb) return if len(self.links_fb) > 0: status = api.put_object( parent_object="me", connection_name="feed", message=self.final_text_fb, link=self.links_fb[0]) elif len(self.gif_links) > 0 or len(self.audio_links) > 0 or len(self.video_links) > 0: my_media = first((self.gif_links, self.audio_links, self.video_links), key=lambda x: len(x) > 0) status = api.put_object( parent_object="me", connection_name="feed", message=self.final_text_fb, link=my_media) else: status = api.put_object( parent_object="me", connection_name="feed", message=self.final_text_fb, link="https://vk.com/wall{}_{}".format(self.owner_id, self.post['id'])) # ??????? ???????? ''' my_link = "https://vk.com/wall{}_{}".format(self.owner_id, self.post['id']) status = api.put_object( parent_object="me", connection_name="feed", message="", link=my_link) '''