我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用feedparser.parse()。
def crawl_feed(self, feed_url: str=None) -> List[str]: urls = [] if not feed_url: feed_url = constants.rss_url feed = feedparser.parse( feed_url, handlers=ProxyHandler, request_headers=self.settings.requests_headers ) for item in feed['items']: if any([item['title'].startswith(category) for category in self.own_settings.accepted_rss_categories]): urls.append(item['link']) return urls
def getwordcounts(url): d = feedparser.parse(url) wc ={} #???????? d.entries for e in d.entries: if 'summary' in e: summary = e.summary else: summary = e.description #?????? words = getwords(e.title+' '+summary) for word in words: wc.setdefault(word,0) wc[word]+=1 print d.feed.title return d.feed.title,wc
def news(): """Get news from different ATOM RSS feeds.""" import feedparser from pybossa.core import sentinel from pybossa.news import get_news, notify_news_admins, FEED_KEY try: import cPickle as pickle except ImportError: # pragma: no cover import pickle urls = ['https://github.com/pybossa/pybossa/releases.atom', 'http://scifabric.com/blog/all.atom.xml'] score = 0 notify = False if current_app.config.get('NEWS_URL'): urls += current_app.config.get('NEWS_URL') for url in urls: d = feedparser.parse(url) tmp = get_news(score) if (len(tmp) == 0) or (tmp[0]['updated'] != d.entries[0]['updated']): sentinel.master.zadd(FEED_KEY, float(score), pickle.dumps(d.entries[0])) notify = True score += 1 if notify: notify_news_admins()
def newscaster(p, l): """ Dictate the latest news (which are essentially entries in the RSS feed) """ respond("fetching news", prepend_positive_response=True) feeds = [feedparser.parse(url) for url in preferences.get_news_feed_urls()] counter = 1 for feed in feeds: for entry in feed.entries: data = [] parser = NewsFeedParser(data) try: description = entry.description except AttributeError: description = "None given" parser.feed(description) news = "News #" + str(counter) + ": title: " + entry.title + ". description: " + " ".join(data) respond(news, override_subtitle=True) counter += 1
def parse_job_list_page(self, response): self.get_connector().log(self.name, self.ACTION_CRAWL_LIST, response.url) feed_parser = feedparser.parse(response.body) for job_entry in feed_parser.entries: job_url = job_entry.link job_publication_date = datetime.fromtimestamp(mktime(job_entry.published_parsed)) job_publication_time = mktime(job_publication_date.timetuple()) last_job_publication_time = mktime(self._last_job_date.timetuple()) if job_publication_time <= last_job_publication_time: self.get_connector().log(self.name, self.ACTION_MARKER_FOUND, "%s <= %s" % (job_publication_time, last_job_publication_time)) return prepared_job = JobItem() request = Request(job_url, self.parse_job_page) request.meta['item'] = prepared_job prepared_job['title'] = job_entry.title prepared_job['description'] = job_entry.description prepared_job['publication_datetime'] = job_publication_date yield request
def fetch(self): fetch_time = int(time.time()) feed = feedparser.parse(self.url, etag=self.status.last_result) last_updated = self.status.updated self.status = ChoreStatus(fetch_time, feed.get('etag')) for e in feed.entries: evt_time = int(calendar.timegm(e.updated_parsed)) if last_updated and evt_time > last_updated: evturl = e.link match = RE_BADURL.match(evturl) if match: evturl = urllib.parse.urljoin(self.url, match.group(1)) else: evturl = urllib.parse.urljoin(self.url, evturl) if not self.title_regex or self.title_regex.search(e.title): yield Event(self.name, self.category, evt_time, e.title, e.summary, evturl)
def fetch(self): if self.category == 'release': url = 'https://github.com/%s/releases.atom' % self.repo elif self.category == 'tag': url = 'https://github.com/%s/tags.atom' % self.repo elif self.category == 'commit': url = 'https://github.com/%s/commits/%s.atom' % \ (self.repo, self.branch or 'master') else: raise ValueError('unknown category: %s' % self.category) fetch_time = int(time.time()) feed = feedparser.parse(url, etag=self.status.last_result) last_updated = self.status.updated self.status = ChoreStatus(fetch_time, feed.get('etag')) for e in feed.entries: evt_time = calendar.timegm(e.updated_parsed) if last_updated and evt_time > last_updated: yield Event(self.name, self.category, evt_time, e.title, e.summary, e.link)
def detect(cls, name, url, **kwargs): urlp = urllib.parse.urlparse(url) if urlp.netloc != 'github.com': return pathseg = urlp.path.lstrip('/').split('/') if pathseg[0] == 'downloads': pathseg.pop(0) repo = '/'.join(pathseg[:2]) if repo.endswith('.git'): repo = repo[:-4] if len(pathseg) > 2: if pathseg[2] == 'releases': return cls(name, repo, 'release') elif pathseg[2] == 'tags': return cls(name, repo, 'tag') elif pathseg[2] == 'commits': return cls(name, repo, 'commit', pathseg[3]) for category, url in ( ('release', 'https://github.com/%s/releases.atom' % repo), ('tag', 'https://github.com/%s/tags.atom' % repo), ('commit', 'https://github.com/%s/commits/master.atom' % repo)): feed = feedparser.parse(url) if feed.entries: return cls(name, repo, category)
def detect(cls, name, url, **kwargs): urlp = urllib.parse.urlparse(url) if urlp.netloc != 'bitbucket.org': return pathseg = urlp.path.lstrip('/').split('/') repo = '/'.join(pathseg[:2]) if repo.endswith('.git'): repo = repo[:-4] if len(pathseg) > 2: if pathseg[2] == 'downloads': return cls(name, repo, 'release') elif pathseg[2] == 'get': return cls(name, repo, 'tag') for category, url in ( ('release', 'https://api.bitbucket.org/2.0/repositories/%s/downloads' % repo), ('tag', 'https://api.bitbucket.org/2.0/repositories/%s/refs/tags' % repo)): req = HSESSION.get(url, timeout=30) if req.status_code == 200: d = req.json() if d.get('values'): return cls(name, repo, category)
def detect_name(url, title): urlp = urllib.parse.urlparse(url) if urlp.netloc == 'github.com': return urlp.path.strip('/').split('/')[1].lower() else: urlpath = os.path.splitext(urlp.path.strip('/'))[0].lower().split('/') urlkwd = [x for x in urlpath if x not in URL_FILTERED and not RE_IGN.match(x)] titlel = title.lower() candidates = [] for k in urlkwd: if k in titlel: candidates.append(k) if candidates: return candidates[-1] else: host = urlp.hostname.split('.') cand2 = [x for x in urlp.hostname.split('.') if x not in URL_FILTERED] if cand2: return cand2[0] else: return host[-2]
def getNewsFeed(self): # parse the feed and get the result in res res = feedparser.parse(self.rssFeedUrl) # get the total number of entries returned resCount = len(res.entries) # exit out if empty if resCount == 0: return "" # if the resCount is less than the feedCount specified cap the feedCount to the resCount if resCount < self.feedCount: self.feedCount = resCount # create empty array resultList = [] # loop from 0 to feedCount so we append the right number of entries to the return list for x in range(0, self.feedCount): resultList.append(res.entries[x]) return resultList
def get_arxiv_mail(title_words, abstract_words, author_words, feed_url, my_mail): feed = feedparser.parse(feed_url) filtered_entries = [entry for entry in feed.entries if filter(entry)] msg = ["<h1>arXiv results for {}</h1>".format(date_str)] for entry in filtered_entries: msg.append('<h2>{}</h2>'.format(entry.title)) msg.append('<h3>{}</h3>'.format(strip_html(entry.author))) msg.append('<p>{}</p>'.format(strip_html(entry.description))) num = 'arXiv:' + entry['id'].split('/')[-1] link = '<a href="{}">{}</a>'.format(entry['id'], num) pdf_link = '[<a href="{}">pdf</a>]'.format(entry.id.replace('abs', 'pdf')) msg.append(link + " " + pdf_link) keywords = ', '.join(title_words + abstract_words) authors = ', '.join(author_words) footer = ("<p><em>Selected keywords: {}. Selected authors: {}. " + "From feed: {}</em></p>") msg.append(footer.format(keywords, authors, feed_url)) msg = "".join(msg) return msg
def test_latest_feeds(self): packages = Project.objects.all().order_by('-created')[:15] for feed_type in ('rss', 'atom'): url = reverse('feeds_latest_packages_%s' % feed_type) response = self.client.get(url) self.assertEqual(response.status_code, 200) feed = feedparser.parse(response.content) expect_titles = [p.title for p in packages] actual_titles = [e['title'] for e in feed.entries] for expected_title, actual_title in zip(expect_titles, actual_titles): self.assertEqual(expected_title, actual_title) expect_summaries = [p.repo_description for p in packages] actual_summaries = [e['summary'] for e in feed.entries] for expected_summary, actual_summary in zip(expect_summaries, actual_summaries): self.assertEqual(expected_summary, actual_summary)
def _parse_episodes_from_feed(self): feed = feedparser.parse(settings.RSS_FEED) if not feed.entries: logging.error('No episodes found in RSS feed, please check URL') episodes = [] for feed_item in feed.entries: show = self._get_matching_show(feed_item) if show: episode = self._get_episode_data_from_item(feed_item, show) quality_check = episode.quality is not None and \ episode.quality >= show.minimum_quality follow_check = episode.season > show.follow_from_season or \ (episode.season == show.follow_from_season and episode.episode >= show.follow_from_episode) is_downloaded = self._is_episode_downloaded(episode) if quality_check and follow_check and not is_downloaded: episodes.append(episode) return episodes
def read_RSS_feed(assistant, player_vlc, instance_vlc, rss_dic, number_records_to_read): assistant.speak("Tell me the name of the rss feed") msg = assistant.active_listen() if msg in rss_dic.keys(): rss = rss_dic[msg] else: rss = DEFAULT_RSS assistant.speak("ok! I am calling my assistant, she will read the RSS feed.") res = feedparser.parse(rss) number_records_in_feed = len(res.entries) if number_records_in_feed < number_records_to_read: number_records_to_read = number_records_in_feed entries_to_read = [entry.title_detail.value for entry in res.entries[0:number_records_to_read]] txt=". ".join(entries_to_read) read_nicely_text(txt, instance_vlc, player_vlc) ''' for entry in entries_to_read: assistant.speak(entry.title_detail.value) time.sleep(1) '''
def play_podcast(assistant, player_vlc, instance_vlc, podcast_dic, podcast_index=None): assistant.speak("Tell me the name of the podcast") msg = assistant.active_listen() if msg in podcast_dic.keys(): rss = podcast_dic[msg] else: rss = DEFAULT_PODCAST assistant.speak("There you go!") res = feedparser.parse(rss) number_records_in_feed = len(res.entries) if podcast_index is None: podcast_index = random.randint(0,len(res.entries) - 1) if number_records_in_feed < podcast_index: podcast_index = number_records_in_feed href = "" for link in res.entries[podcast_index].links: if ".mp3" in link.href: href = link.href break if href != "": media = instance_vlc.media_new(href) player_vlc.set_media(media) player_vlc.play() else: assistant.speak("I am sorry, but the podcast requested is not available!")
def handle(msg): """ This function handle all messages incoming from users """ content_type, chat_type, chat_id = telepot.glance(msg) command_input = msg['text'] if command_input == '/start': # Check if already registred if register_user(chat_id): bot.sendMessage(chat_id, start_msg) feed = feedparser.parse(feed_url) # Send all news from older to newest for entry in reversed(feed.entries): msg = entry.title + '\n' + entry.link bot.sendMessage(chat_id, msg) if command_input == '/stop': bot.sendMessage(chat_id, stop_msg) remove_user(chat_id)
def get_data_from_feed(feed, posts, loop): try: data = parse(feed) if data.bozo == 0: category = data['feed']['title'] if len(category) > 0: gather(*[parse_item(posts=posts, data=data, feed=feed, \ category=category, i=i, loop=loop) for i in range(0, \ len(data.entries))], return_exceptions=True) else: err = data.bozo_exception print(colored.red("Feed {0} is malformed: {1}".format(feed, err))) source_obj = Sources.objects.get(feed=feed) if source_obj.failures < 5: source_obj.failures = source_obj.failures + 1 else: source_obj.failures = source_obj.failures + 1 source_obj.active = False source_obj.save() except Exception as err: print(colored.red("At get_data_from_feed {}".format(err)))
def get(query='', lang='en'): d = feedparser.parse('https://news.google.it/news?cf=all&hl={l}&query={q}&pz=1&ned={l}&output=rss' .format(l=lang, q=query)) text = d.feed.title for e in d.entries: soup = bs4.BeautifulSoup(e.description, 'html.parser') news = soup.find_all('font', size="-1")[1].get_text() title = e.title.rsplit('-')[0] author = e.title.rsplit('-')[1] title, author = title.rstrip().lstrip(), author.rstrip().lstrip() link = e.link text += ( '\n?? <b>{title}</b> • <a href="{link}">{author}</a>' '\n{news}\n'.format(title=title, news=news, link=link, author=author) ) return text
def update(feed): last_etag = feed.etag last_modified = feed.modified feed_update = feedparser.parse(url, etag=last_etag, modified=last_modified) o = feed['entries'] o = o[0] if feed_update.status == 304: return "304" else: return "200" #####################################################################################################################################################
def alog(self, *, username): """Gets a users recent adventure log""" username = username.replace(" ", "_") if feedparser is None: await self.bot.say("You'll need to run `pip3 install feedparser` " "before you can get a user's adventure log.") return url = self.alog_url + username try: page = await aiohttp.get(url) text = await page.text() text = text.replace("\r", "") except: await self.bot.say("No user found.") feed = feedparser.parse(text) titles = [post.title for post in feed.entries] await self.bot.say(self._fmt_alog(username, titles))
def get_entries(feed): NEW_POST = u"""New post, author {author}, title {title} {content}""" for entry in feed.entries: if "http" in entry.id: nid = hashlib.md5(str(entry.id)) entry.id = nid.hexdigest() entry_content = entry.content[0].value soup = BeautifulSoup(entry_content, 'html.parser') chunks = split_content_by_dot(soup, REQUEST_LIMIT-len(NEW_POST)) chunks = list(chunks) published = dateutil.parser.parse(entry.published) for i, chunk in enumerate(chunks): if i == 0: chunk = NEW_POST.format( author=entry.author, title=entry.title, content=chunk) yield dict( content=chunk, id="%s_%d" % (entry.id, i), title=entry.title, published=published - datetime.timedelta(0, i), ) remaining = chunk
def make_rss_dictionary(): """ Grabs the RSS data and makes a dictionary out of the wanted information """ print('*** Al Jazeera ***') print('\nFetching Al Jazeera feed...') feed = feedparser.parse(url) rss_dict = [] for article in feed['entries']: rss_dict.append({ "title": article.title, "description": article.summary, "url": article.link, }) print('Done\n') return rss_dict
def topics_id_rss(self): logging.debug('fetch rss feeds') topic_ids=list() for v2ex_rss_url in self.v2ex_rss_url_list: feed=feedparser.parse(v2ex_rss_url) logging.debug('fetch rss feed: %s' % v2ex_rss_url) items=feed["items"] for item in items: author=item["author"] title=item["title"] link=item["link"] published=item[ "date" ] summary=item["summary"] topic_id=int(re.findall(r't\/(\d+)#?', link)[0]) topic_ids.append(topic_id) topic_ids=set(topic_ids) return topic_ids
def fetch_feed_if_updated(url, date): """ Fetches an RSS feed if has been updated since a given date. Args: url: URL to the RSS feed date: Date as time_struct. Returns: FeedParser object representing the feed if the feed has been updated, None otherwise. """ feed = feedparser.parse(url) if feed_updated(feed, date): return feed else: return None
def news(): url = 'https://www.bunq.com/en/news/feed.rss' feed = feedparser.parse(url) data = [] for item in feed['items']: s = MLStripper() s.feed(item['summary']) obj = { 'title': item['title'], 'date': item['published'], 'summary': s.get_data(), 'link': item['link'], 'author': item['author'] } data.append(obj) with open('bunq_bot/responses/commands/news.md', 'r') as f: return TRender(f.read()).render({'data': data[:5]})
def crawl(url,username,full_articles=True): articles = list() d = feedparser.parse(url) for entry in d["entries"]: if 'published_parsed' in entry: pubdate = pytz.utc.localize(datetime.fromtimestamp(mktime(entry['published_parsed']))) else: pubdate = pytz.utc.localize(datetime.fromtimestamp(mktime(entry['updated_parsed']))) articles.append(Article( title=entry['title'], url= entry['link'], body=entry["content"][0]["value"] if 'content' in entry else entry["summary"], username=username, pubdate=pubdate, )) return articles
def get_feed_entries(self, url): parse = feedparser.parse(url) num = len(parse.entries) if num > 0: for entry in parse.entries: title = getattr(entry, 'title', None) url = getattr(entry, 'link', None) desc = getattr(entry, 'description', None) image = parse.get('image', '') if not desc: desc = getattr(entry, 'summary', None) description = BeautifulSoup(desc).get_text() item, created = Article.objects.get_or_create( title=title, url=url, desc=desc) pubdate = getattr(entry, 'published', None) if pubdate: item.created = tparser.parse(pubdate, ignoretz=True) udate = getattr(entry, 'updated', None) if udate: item.updated = tparser.parse(udate, ignoretz=True) item.save() print item.title
def get_feed_entries_from_url(url): """ Gets feed entries from an url that should be an RSS or Atom feed. >>> get_feed_entries_from_url("http://delhomme.org/notfound.html") Error 404 while fetching "http://delhomme.org/notfound.html". >>> feed = get_feed_entries_from_url("http://blog.delhomme.org/index.php?feed/atom") >>> feed.status 200 """ feed = feedparser.parse(url) if 'status' in feed: feed = manage_http_status(feed, url) else: # An error happened such that the feed does not contain an HTTP response manage_non_http_errors(feed, url) feed = None return feed # End of get_feed_entries_from_url() function
def get(self, url): """ Wrapper for API requests. Take a URL, return a json array. >>> url = 'http://rss.denverpost.com/mngi/rss/CustomRssServlet/36/213601.xml' >>> parser = build_parser() >>> args = parser.parse_args([url]) >>> rf = RecentFeed(args) >>> rf.get(url) True >>> rf.parse() #>>> articles = rf.recently() """ h = httplib2.Http('.tmp') (response, xml) = h.request(url, "GET") if response['status'] != '200': if 'verbose' in self.args and self.args.verbose: print "URL: %s" % url raise ValueError("URL %s response: %s" % (url, response.status)) self.xml = xml return True
def main(args): rf = RecentFeed(args) if args: articles = [] for arg in args.urls[0]: if args.verbose: print arg rf.get(arg) rf.parse() articles.append(rf.recently()) for article in articles[0]: if args.output == 'html': if type(article['title']) is types.UnicodeType: article['title'] = article['title'].encode('utf-8', 'replace') print '<li><a href="{0}">{1}</a></li>'.format(article['id'], article['title']) elif args.output == 'json': json.dumps({'title': article['title'], 'url': article['id']})
def getRss(self, url): d = feedparser.parse(url) os.system("rm -r /tmp/rss.html") with open('/tmp/rss.html', 'a') as the_file: the_file.write('<!DOCTYPE html><html><head><meta') the_file.write('charset="utf-8"><meta') the_file.write('name="viewport" content="width=device-width, initial-scale=1"><title>' + d['feed']['title'] + '</') the_file.write('title><style type="text/css">body{margin:40px auto;') the_file.write('max-width:650px;line-height:1.6;font-size:18px;color:#444;padding:0') the_file.write('10px}h1,h2,h3{line-height:1.2}a{text-decoration: none; color:black;};</style></head><body><!-- RSS Feed --><header><h1>') the_file.write( d['feed']['title'] + '</h1>') #the_file.write('<aside>' + '-' + '</aside>') the_file.write('</header><hr noshade>') the_file.write('<p>') for post in d.entries: the_file.write('<a href="' + post.link.encode('ascii', 'ignore') + '">' + post.title.encode('ascii', 'ignore') + "</a><br><br>") the_file.write('</p>') the_file.write('</body>') url = QUrl( 'file:///' + 'tmp' + '/rss.html' ) self.webView.load(url)
def _get_channel_data_from_cache(self, key, config): """Fetch channel feed from cache.""" channel_path = self._get_channel_cache_path(key) if os.path.exists(channel_path): if "ttl" in config and isinstance(config["ttl"], int): ttl = config["ttl"] else: ttl = self._settings.get_int(["ttl"]) ttl *= 60 now = time.time() if os.stat(channel_path).st_mtime + ttl > now: d = feedparser.parse(channel_path) self._logger.debug(u"Loaded channel {} from cache at {}".format(key, channel_path)) return d return None
def _get_channel_data_from_network(self, key, config): """Fetch channel feed from network.""" import requests url = config["url"] try: start = time.time() r = requests.get(url) self._logger.info(u"Loaded channel {} from {} in {:.2}s".format(key, config["url"], time.time() - start)) except Exception as e: self._logger.exception( u"Could not fetch channel {} from {}: {}".format(key, config["url"], str(e))) return None response = r.text channel_path = self._get_channel_cache_path(key) with codecs.open(channel_path, mode="w", encoding="utf-8") as f: f.write(response) return feedparser.parse(response)
def parse(self, content): """Parses feed content of http response body into multiple :class:`news.models.abstract.Readable`s. Internally uses :mod:`~feedparser` library to extract entries from the response body. :param content: Http response body :type content: :class:`str` :returns: An iterator of parsed readables :rtype: An iterator of :class:`news.models.abstract.Readable` """ f = feedparser.parse(content) return (Readable( author=e.author, title=e.title, content=e.content, url=e.link, summary=e.summary, image=f.image) for e in f.entries)
def update(self, mark_read=False): # Brad Frost's feed starts with a newline, # throwing off feedparser. try: content = requests.get(self.url).content.strip() except requests.exceptions.ConnectionError: logger.error('Could not sync %s' % self.url) return data = feedparser.parse(content) for entry in data["entries"][:25]: obj, created = Entry.objects.get_or_create( source=self, url=entry["link"], defaults={ "title": entry["title"], "author": (entry.get("author") or data["feed"].get("author") or self.name), "summary": entry["summary"], "sent": mark_read, }) self.last_updated = datetime.datetime.now(pytz.utc) self.save()
def parse_non_wp_blogs(blog): from wsgi import non_wp_blogs feed = feedparser.parse(blog) post_table = [] for item in feed.entries: title = item.title url = item.link post_date = DateTime(item.published).ISO()[:-9] try: author = item.author except: author = "N/A" tags = get_tags(url) curr_content = ""#get_content(non_wp_url = url) post_table.append({'title': title, 'author': author, 'post_date': post_date, 'tags': tags, 'url': url, 'views': 0, 'content': curr_content}) return post_table
def remove_feed(chat_id, feed_url): '''Function to remove (unsubscribe) a feed from the chat feeds file''' # Create TSjson object for feeds of chat file and read the content fjson_chat_feeds = TSjson.TSjson('{}/{}.json'.format(CONST['CHATS_DIR'], chat_id)) subs_feeds = fjson_chat_feeds.read_content() subs_feeds = subs_feeds[0] # Get the feed and set json data feed = {} feedpars = parse(feed_url) feed['Title'] = feedpars['feed']['title'] feed['URL'] = feed_url feed['SEARCH_TERMS'] = [] for sub_feed in subs_feeds['Feeds']: if sub_feed['URL'] == feed['URL']: feed['SEARCH_TERMS'] = sub_feed['SEARCH_TERMS'] break # Remove the specific feed and update json file subs_feeds['Feeds'].remove(feed) fjson_chat_feeds.update(subs_feeds, 'Chat_id')
def get_context_data(self, **kwargs): context = super(FeedReaderNavlet, self).get_context_data(**kwargs) blogurl = None feed = None maxposts = 5 navlet = AccountNavlet.objects.get(pk=self.navlet_id) if navlet.preferences: blogurl = navlet.preferences.get('blogurl') maxposts = int(navlet.preferences.get('maxposts', maxposts)) if self.mode == NAVLET_MODE_VIEW and blogurl: feed = feedparser.parse(blogurl) feed['maxentries'] = feed['entries'][:maxposts] context.update({ 'feed': feed, 'blogurl': blogurl, 'maxposts': maxposts }) return context
def handle(text, mic, profile): if 'INDIA' in text: url = 'http://news.google.com/news?pz=1&cf=all&ned=in&hl=en&output=rss' elif 'CRICKET' in text: url = 'http://www.espncricinfo.com/rss/content/story/feeds/6.xml' elif 'TECH' in text: url = 'http://www.theregister.co.uk/headlines.atom' else: url = 'http://news.google.com/news?pz=1&cf=all&ned=us&hl=en&output=rss' feed = feedparser.parse(url) if not feed: mic.say("I'm sorry. I could not get the news for you") return mic.say("Here is the headline news") for post in feed.entries: mic.say(post.title)
def get_headlines(self): try: # remove all children for widget in self.headlinesContainer.winfo_children(): widget.destroy() if news_country_code == None: headlines_url = "https://news.google.com/news?ned=us&output=rss" else: headlines_url = "https://news.google.com/news?ned=%s&output=rss" % news_country_code feed = feedparser.parse(headlines_url) for post in feed.entries[0:5]: headline = NewsHeadline(self.headlinesContainer, post.title) headline.pack(side=TOP, anchor=W) except Exception as e: traceback.print_exc() print "Error: %s. Cannot get news." % e self.after(600000, self.get_headlines)
def getwordcounts(url): # Parse the feed d = feedparser.parse(url) wc = {} # Loop over all the entries for e in d.entries: if 'summary' in e: summary = e.summary else: summary = e.description # Extract a list of words words = getwords(e.title + ' ' + summary) for word in words: wc.setdefault(word, 0) wc[word] += 1 return d.feed.title, wc
def __init__(self, user, passwd, codec='iso-8859-1', api_request=dlcs_api_request, xml_parser=dlcs_parse_xml): """Initialize access to the API with ``user`` and ``passwd``. ``codec`` sets the encoding of the arguments. The ``api_request`` and ``xml_parser`` parameters by default point to functions within this package with standard implementations to request and parse a resource. See ``dlcs_api_request()`` and ``dlcs_parse_xml()``. Note that ``api_request`` should return a file-like instance with an HTTPMessage instance under ``info()``, see ``urllib2.openurl`` for more info. """ assert user != "" self.user = user self.passwd = passwd self.codec = codec # Implement communication to server and parsing of respons messages: assert callable(api_request) self._api_request = api_request assert callable(xml_parser) self._parse_response = xml_parser
def read(feed, classifier): # Get feed entries and loop over them f = feedparser.parse(feed) for entry in f['entries']: print print '-----' # Print the contents of the entry print 'Title: ' + entry['title'].encode('utf-8') print 'Publisher: ' + entry['publisher'].encode('utf-8') print print entry['summary'].encode('utf-8') # Combine all the text to create one item for the classifier fulltext = '%s\n%s\n%s' % (entry['title'], entry['publisher'], entry['summary']) # Print the best guess at the current category print 'Guess: ' + str(classifier.classify(entry)) # Ask the user to specify the correct category and train on that cl = raw_input('Enter category: ') classifier.train(entry, cl)
def get_wet(): # Get the weather data print("Updating weather for", postcode) d = feedparser.parse(url) entries = int(len(d['entries'])) val = " " + d['entries'][0]['title'] val +=" " + d['entries'][1]['title'] val +=" " + d['entries'][2]['title'] # Tidy & shorten the message for the scroll display val = val.replace("Maximum", "Max") val = val.replace("Minimum", "Min") val = val.replace("Temperature: ", "") val = val.replace(u"\u00B0","") val = val.replace(",", "") val = val.replace("(", "") val = val.replace(")", "") return val
def tor_search(self, keyword): self.mode = '' self.sender.sendMessage('Searching torrent..') self.navi = feedparser.parse(self.rssUrl + parse.quote(keyword)) outList = [] if not self.navi.entries: self.sender.sendMessage('Sorry, No results') self.mode = self.MENU1_1 return for (i, entry) in enumerate(self.navi.entries): if i == 10: break title = str(i + 1) + ". " + entry.title templist = [] templist.append(title) outList.append(templist) show_keyboard = {'keyboard': self.put_menu_button(outList)} self.sender.sendMessage('Choose one from below', reply_markup=show_keyboard) self.mode = self.MENU1_2
def handle_headlines(self, message): """Speak the latest headlines from the selected feed.""" title = message.data['TitleKeyword'] feed = feedparser.parse(self.feeds[title]) items = feed.get('items', []) # Only read three items if len(items) > 3: items = items[:3] self.cache(title, items) self._is_reading_headlines = True self.speak('Here\'s the latest headlines from ' + message.data['TitleKeyword']) for i in items: if not self._is_reading_headlines: break logger.info('Headline: ' + i['title']) self.speak(i['title']) time.sleep(5) self._is_reading_headlines = False
def get_items(self, name): """ Get items from the named feed, if cache exists use cache otherwise fetch the feed and update. """ cache_timeout = 10 * 60 cached_time = float(self.cache_time.get(name, 0)) if name in self.cached_items \ and (time.time() - cached_time) < cache_timeout: logger.debug('Using cached feed...') return self.cached_items[name] else: logger.debug('Fetching feed and updating cache') feed = feedparser.parse(self.feeds[name]) feed_items = feed.get('items', []) self.cache(name, feed_items) if len(feed_items) > 5: return feed_items[:5] else: return feed_items
def get_status_fm(service): response = feedparser.parse(service["url"]) for item in response.entries: status = item.title.split(" - ")[-1] date = datetime(*item.published_parsed[:6]) icon = ICON_STATUS_GOOD if status == "Up" else None icon = ICON_STATUS_MINOR if status == "Warning" else icon icon = ICON_STATUS_MAJOR if status == "Down" else icon wf.add_item( title=status.capitalize(), subtitle=date.strftime('%d %B %Y - ') + item.description, icon=icon, icontype="file" )