我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用scrapy.linkextractors.LinkExtractor()。
def parse(self, response): """Parse the recipe list.""" recipes = LinkExtractor(allow=r"/recipe/\d+/.*").extract_links(response) if len(recipes) > 0: for recipe_link in recipes: yield scrapy.Request(recipe_link.url, callback=self.parse_item)
def main(): start = timer() url = 'http://scrapinghub.com/' link_extractor = LinkExtractor() total = 0 for files in glob.glob('sites/*'): f = (io.open(files, "r", encoding="utf-8")) html = f.read() r3 = HtmlResponse(url=url, body=html, encoding='utf8') links = link_extractor.extract_links(r3) total = total + len(links) end = timer() print("\nTotal number of links extracted = {0}".format(total)) print("Time taken = {0}".format(end - start)) click.secho("Rate of link extraction : {0} links/second\n".format( float(total / (end - start))), bold=True) with open("Benchmark.txt", 'w') as g: g.write(" {0}".format((float(total / (end - start)))))
def __init__(self, conf=None, conn=None): # Save conf/conn self.conf = conf self.conn = conn # Make urls self.start_urls = [ 'http://www.takedaclinicaltrials.com/browse/?protocol_id=', ] # Make rules self.rules = [ Rule(LinkExtractor( allow=r'browse/summary/', ), callback=parse_record), Rule(LinkExtractor( allow=r'browse', )), ] # Inherit parent super(Spider, self).__init__()
def __init__(self,rule): dispatcher.connect(self.spider_opened, signals.spider_opened) dispatcher.connect(self.spider_closed, signals.spider_closed) self.rule = rule self.name = rule.name self.allowed_domains = rule.allowed_domains.split(',') self.start_urls = rule.start_urls.split(',') rule_list = [] # ??`???`??? if len(rule.next_page): rule_list.append(Rule(LinkExtractor(restrict_xpaths=rule.next_page), follow=True)) rule_list.append(Rule(LinkExtractor( allow=rule.allow_url.split(','), unique=True), follow=True, callback='parse_item')) self.rules = tuple(rule_list) super(ProxySpiderSpider, self).__init__()
def __init__(self, **kw): super(FollowAllSpider, self).__init__(**kw) url = 'http://localhost/books.toscrape.com/index.html' if not url.startswith('http://') and not url.startswith('https://'): url = 'http://%s/' % url self.url = url self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)] self.link_extractor = LinkExtractor() self.cookies_seen = set() self.previtem = 0 self.items = 0 self.timesec = datetime.datetime.utcnow()
def __init__(self, url, search_terms=None, *args, **kwargs): if url.startswith('.') or url.startswith('/'): with Path(url).open('rt', encoding='utf8') as f: urls = [line.strip() for line in f] else: urls = [u for u in url.split() if u] self.start_urls = [add_http_if_no_scheme(_url) for _url in urls] self.search_terms = search_terms self._extra_search_terms = None # lazy-loaded via extra_search_terms self._reset_link_extractors() self.images_link_extractor = LinkExtractor( tags=['img'], attrs=['src'], deny_extensions=[], canonicalize=False) self.state = {} self.use_splash = None # set up in start_requests self._screenshot_dest = None # type: Path # Load headless horseman scripts self.lua_source = load_directive('headless_horseman.lua') self.js_source = load_directive('headless_horseman.js') super().__init__(*args, **kwargs)
def parse_tag(self, response): res = LinkExtractor(allow=('.*/user/.*'), allow_domains='www.reddit.com').extract_links(response) for one in res: if one.text != 'Click here!': path = one.url.replace('https://www.reddit.com', '') yield Request(url=one.url, callback=self.parse_user, meta={'cookies': True, 'path': path}) res = LinkExtractor(allow=('.*/comments/.*'), allow_domains='www.reddit.com').extract_links(response) for one in res: path = one.url.replace('https://www.reddit.com', '') yield Request(url=one.url, callback=self.parse_comment, meta={'cookies': True, 'path': path}) next_page = response.css( '#siteTable > div.nav-buttons > span > span.next-button > a::attr(href)').extract_first() if next_page: path = next_page.replace('https://www.reddit.com', '') yield Request(url=next_page, callback=self.parse_tag, meta={'cookies': True, 'path': path}) else: self.logger.info('No next page in parse_tag')
def __init__(self, domains, directory, allow=(), deny=(), unix=False): self.directory = directory self.unix = unix self.rules = ( Rule(LinkExtractor(allow=allow, deny=deny), callback='save_page'), ) # parse the allowed domains and start urls self.allowed_domains = [] self.start_urls = [] for domain in domains: url_parts = domain.split('://') unqualified_url = url_parts[-1] url_scheme = url_parts[0] if len(url_parts) > 1 else 'http' full_url = '{0}://{1}'.format(url_scheme, unqualified_url) bare_domain = unqualified_url.split('/')[0] self.allowed_domains.append(bare_domain) self.start_urls.append(full_url) super().__init__()
def __init__(self, website): self.name = website.spider_name self.redis_key = website.spider_name + ":start_urls" self.website = website self.allowed_domains = website.allow_domains.split(";") self.start_urls = website.start_urls.split(";") rule_list = [] rules_to_follow = website.rules_to_follow.split(";") rules_to_parse = website.rules_to_parse.split(";") rule_list.append( Rule(LinkExtractor(allow=rules_to_parse), follow=True, callback='parse_detail') ) rule_list.append( Rule(LinkExtractor(allow=rules_to_follow), follow=True) ) self.rules = tuple(rule_list) super(ArticleSpider, self).__init__()
def __init__(self, topic=None, newspaper=None, term='', *args, **kwargs): self.term = term if newspaper: sources = [source for source in SOURCE_NEWSPAPERS if newspaper == source['name']] else: sources = TOPIC_TO_SOURCES.get(topic, SOURCE_NEWSPAPERS) self.allowed_domains = [source['allowed_domains'] for source in sources] self.start_urls = [source['url'] for source in sources] self.rules = [] for source in sources: if topic: allowed_domain_regex=(source['allowed_subdomains_regex'][topic], ) else: allowed_domain_regex = (regexsubdomain for topic, regexsubdomain in source['allowed_subdomains_regex'].items()) rule = Rule(link_extractor=LinkExtractor(allow=allowed_domain_regex), callback='parse_with_term', cb_kwargs={ 'term': self.term, 'newspaper': newspaper }, follow=True) self.rules.append(rule) return super(NewspaperCrawler, self).__init__(*args, **kwargs)
def get_link_extractor(self): return LinkExtractor(allow=r'^http://[a-z2-7]{16}.onion', deny=[r'^https://blockchainbdgpzk.onion/address/', r'^https://blockchainbdgpzk.onion/tx/'], deny_domains=settings.get('FAKE_DOMAINS'))
def __init__(self, seeds=None, login_credentials=None, profile=None): super().__init__() self.le = LinkExtractor(canonicalize=False) self.files_le = LinkExtractor(deny_extensions=[], canonicalize=False) self.images_le = LinkExtractor( tags=['img'], attrs=['src'], deny_extensions=[], canonicalize=False) if seeds: with Path(seeds).open('rt', encoding='utf8') as f: self.start_urls = [url for url in (line.strip() for line in f) if not url.startswith('#')] if login_credentials: with Path(login_credentials).open('rt', encoding='utf8') as f: self.login_credentials = json.load(f) else: self.login_credentials = None if profile: setup_profiling(profile)
def __init__(self, conf=None, conn=None): # Save conf/conn self.conf = conf self.conn = conn # Make urls self.start_urls = [ 'http://www.pfizer.com/research/clinical_trials/find_a_trial?recr=0', ] # Make rules self.rules = [ Rule(LinkExtractor( allow=r'find_a_trial/NCT\d+', ), callback=parse_record), Rule(LinkExtractor( allow=r'page=\d+', )), ] # Inherit parent super(Spider, self).__init__()
def __init__(self, conf=None, conn=None, date_from=None, date_to=None): # Save conf/conn self.conf = conf self.conn = conn # Make start urls self.start_urls = _make_start_urls( prefix='http://www.gsk-clinicalstudyregister.com/search', date_from=date_from, date_to=date_to) # Make rules self.rules = [ Rule(LinkExtractor( allow=r'study\/\d+' ), callback=parse_record), ] # Inherit parent super(Spider, self).__init__() # Internal
def extractLinks(self, response): retv = [] link_extractor = LinkExtractor() if isinstance(response, HtmlResponse): links = link_extractor.extract_links(response) for link in links: if self.postfix in link.url: retv.append(link.url) return retv
def __init__(self, url): super(Spider, self).__init__() self.start_urls = [url] self.le = LinkExtractor(canonicalize=False) self.files_le = LinkExtractor( tags=['a'], attrs=['href'], deny_extensions=[], canonicalize=False)
def parse(self, response): """Parse the recipe list.""" recipes = LinkExtractor( allow=("/recipes/.*/views") ).extract_links(response) if len(recipes) > 0: for recipe_link in recipes: yield scrapy.Request(recipe_link.url, callback=self.parse_item) base_url, page = response.url.split("=") yield scrapy.Request("{}={}".format(base_url, int(page) + 1), callback=self.parse) else: print "Finished on {}".format(response.url)
def link_extractor(self): return LinkExtractor(allow=self.allowed, unique=False, canonicalize=False)
def iframe_link_extractor(self): return LinkExtractor( allow=self.allowed, tags=['iframe'], attrs=['src'], unique=False, canonicalize=False)
def files_link_extractor(self): return LinkExtractor( allow=self.allowed, tags=['a'], attrs=['href'], deny_extensions=[], # allow all extensions canonicalize=False, )
def parse(self, response): articleLinks = LinkExtractor(restrict_css='div.main > div.article') pages = articleLinks.extract_links(response) for page in pages: yield scrapy.Request(page.url, callback=self.parse_article)
def parse_comment(self, response): # Do not show all comment res = LinkExtractor(allow=('.*/user/.*'), allow_domains='www.reddit.com').extract_links(response) for one in res: path = one.url.replace('https://www.reddit.com', '') yield Request(url=one.url, callback=self.parse_user, meta={'cookies': True, 'path': path})
def __init__(self, forum_id=58, digit=1, *args, **kwargs): self.start_urls = [self.ip_format % d for d in [int(forum_id)]] self.rules = [Rule(sle(allow=("/forum/forum-" + str(forum_id) + "-[0-9]{," + str(digit) + "}\.html")), follow=True, callback='parse_1'),] super(sisSpider, self).__init__(*args, **kwargs)
def parse(self, response): e = LinkExtractor() urls = [link.url for link in e.extract_links(response)] for url in urls: parsed = urlparse.urlsplit(url) qs = urlparse.parse_qs(parsed.query) if qs and 'Url' in qs: event_url = qs['Url'][0] yield self.add_url(event_url)
def parse(self, response): e = LinkExtractor() urls = [link.url for link in e.extract_links(response)] for url in urls: if response.url != url: yield self.addurl(url) if urls: qs = urlparse.parse_qs(urlparse.urlparse(response.url).query) qs = dict((k, v[0]) for (k, v) in qs.iteritems()) qs['p'] = int(qs['p']) + 1 url = 'http://comeon5678.com/event/list' yield scrapy.Request('%s?%s' % (url, urllib.urlencode(qs)))
def __init__(self, url): self.start_urls = [url] self.link_extractor = LinkExtractor() self.collected_items = [] self.visited_urls = [] self.responses = [] super(TestSpider, self).__init__()
def parse_jianjie(self, response): item = response.meta['item'] item['intro'] = response.xpath(u'//div[@class="schInfoSubT" and a/@name="2"]/following-sibling::div[1]').extract_first() for link in LinkExtractor(restrict_xpaths=u'//ul[@id="topNav"]//a[.="????"]').extract_links(response): yield Request(link.url, meta={'item': item}, callback=self.parse_zhuanye)
def get_link_extractor(self): return LinkExtractor(allow=r'.i2p',)
def __init__(self, conf=None, conn=None, page_from=None, page_to=None): # Save conf/conn self.conf = conf self.conn = conn # Default values if page_from is None: page_from = '1' if page_to is None: page_to = '1' # Make start urls self.start_urls = _make_start_urls( prefix='https://upload.umin.ac.jp/cgi-open-bin/ctr_e/index.cgi', page_from=page_from) # Make rules self.rules = [ Rule(LinkExtractor( allow=r'cgi-open-bin/ctr_e/ctr_view.cgi', ), callback=parse_record), Rule(LinkExtractor( allow=r'page=\d+', process_value=partial(_process_url, page_from, page_to), )), ] # Inherit parent super(Spider, self).__init__() # Internal
def __init__(self, conf=None, conn=None, date_from=None, date_to=None): # Save conf/conn self.conf = conf self.conn = conn # Make start urls self.start_urls = _make_start_urls( prefix='http://www.anzctr.org.au/TrialSearch.aspx', date_from=date_from, date_to=date_to) # Make rules self.rules = [ Rule(LinkExtractor( allow=r'Trial/Registration/TrialReview.aspx', process_value=lambda value: value.replace('http', 'https', 1), ), callback=parse_record), Rule(LinkExtractor( allow=r'page=\d+', )), ] # Inherit parent super(Spider, self).__init__() # Internal
def __init__(self, conf=None, conn=None, date_from=None, date_to=None): # Save conf/conn self.conf = conf self.conn = conn # Make start urls self.start_urls = _make_start_urls( prefix='http://www.isrctn.com/search', date_from=date_from, date_to=date_to) # Make rules self.rules = [ Rule(LinkExtractor( allow=r'ISRCTN\d+', ), callback=parse_record), Rule(LinkExtractor( allow=r'page=\d+', )), ] # Inherit parent super(Spider, self).__init__() # Internal
def parse(self, response): le = LinkExtractor() for link in le.extract_links(response): yield SplashRequest( link.url, self.parse_link, endpoint='render.json', args={ 'har': 1, 'html': 1, } )
def __init__(self, domains, urls, *args, **kwargs): """Constructor for PageSpider. Parameters ---------- domains : list A list of domains for the site. urls : list A list of URLs of the site. href_xpaths : list A list of XPATH expression indicating the ancestors of `<a>` element. url_regex : string URL pattern regular expression. If you use this spider to store item into database, additional keywords are required: platform_id : int The id of a platform instance. session : object An instance of SQLAlchemy session. """ self.session = kwargs.pop('session', None) self.platform_id = kwargs.pop('platform_id', None) self.href_xpaths = kwargs.pop('href_xpaths', ()) self.url_regex = kwargs.pop('url_regex', None) self.start_urls = urls self.allowed_domains = domains self.link_extractor = LinkExtractor( allow_domains=self.allowed_domains, restrict_xpaths=self.href_xpaths, unique=True) super(PageSpider, self).__init__(*args, **kwargs)
def __init__(self, domains, urls, *args, **kwargs): """Constructor for SiteSpider. Parameters ---------- domains : list A list of domains for the site. urls : list A list of sitemap URLS of the site. href_xpaths : list A list of XPATH expression indicating the ancestors of `<a>` element. url_regex : string URL pattern regular expression. If you use this spider to store item into database, additional keywords are required: platform_id : int The id of a platform instance. session : object An instance of SQLAlchemy session. """ self.session = kwargs.pop('session', None) self.platform_id = kwargs.pop('platform_id', None) self.url_regex = kwargs.pop('url_regex', None) self.href_xpaths = kwargs.pop('href_xpaths', ()) self.start_urls = urls self.allowed_domains = domains self.rules = (Rule( LinkExtractor( allow_domains=self.allowed_domains, restrict_xpaths=self.href_xpaths, unique=True), callback="parse_item", follow=True),) super(SiteSpider, self).__init__(*args, **kwargs)
def main(): url = 'http://scrapinghub.com/' link_extractor = LinkExtractor() total = 0 time = 0 tar = tarfile.open("sites.tar.gz") for member in tar.getmembers(): f = tar.extractfile(member) html = f.read() start = timer() response = HtmlResponse(url=url, body=html, encoding='utf8') links = link_extractor.extract_links(response) end = timer() total = total + len(links) time = time + end - start print("\nTotal number of links extracted = {0}".format(total)) print("Time taken = {0}".format(time)) click.secho("Rate of link extraction : {0} links/second\n".format( float(total / time)), bold=True) with open("Benchmark.txt", 'w') as g: g.write(" {0}".format((float(total / time))))
def __init__(self, **kw): super(BroadBenchSpider, self).__init__(**kw) self.link_extractor = LinkExtractor() self.cookies_seen = set() self.previtem = 0 self.items = 0 self.timesec = datetime.datetime.utcnow()
def parse(self, response): """ Scrapy parse callback """ # Get current nesting level curr_depth = response.meta.get('depth', 1) if self.config['login']['enabled']: curr_depth = curr_depth - 1 # Do not count the login page as nesting depth # Store to disk? if self.config['store']['enabled']: path = response.url.replace(os.sep, '--') # Replace directory separator path = self.config['store']['path'] + os.sep + path with open(path, 'wb') as fpointer: fpointer.write(response.body) # Yield current url item item = CrawlpyItem() item['url'] = response.url item['status'] = response.status item['depth'] = curr_depth item['referer'] = response.meta.get('referer', '') yield item # Get all links from the current page links = LinkExtractor().extract_links(response) # Iterate all found links and crawl them for link in links: deny = False # Check requests to be ignored for ignore in self.config['ignores']: if (ignore in link.url) or (ignore.lower() in link.url.lower()): # Ignore pattern found, stop looking into other patterns deny = True break # [NO] Max depth exceeded if curr_depth >= self.max_depth: logging.info('[Not Crawling] Current depth (' + curr_depth + ') exceeds max depth (' + self.max_depth + ')') pass # [NO] Duplicate URL elif link.url in self.duplicates: logging.info('[Not Crawling] Url already crawled: ' + link.url) pass # [NO] URL denied elif deny: logging.info('[Not Crawling] Url denied (pattern: "' + ignore + '"): ' + link.url) pass # [OK] Crawl! else: self.duplicates.append(link.url) yield Request(link.url, meta={'depth': curr_depth+1, 'referer': response.url})
def parse(self, response): for li_item in response.css('#content div.entry-content ul.lcp_catlist li'): title = li_item.css('h3.lcp_post a::text').extract_first().strip() text_date = li_item.css('::text').extract_first().strip() try: date_obj = datetime.datetime.strptime(text_date, '%d %B %Y') date = date_obj.date().isoformat() except ValueError: date = None paragraphs = li_item.xpath('p').xpath("string()").extract() description = '\n'.join(paragraphs) feedback_days = None feedback_date = self.get_feedback_date(description) if feedback_date: days_diff = feedback_date - date_obj feedback_days = days_diff.days links = li_item.css('a') documents = self.get_documents_from_links(links) item = JustPublication( title=title, type=self.get_type(title), identifier=self.slugify(title)[0:127], date=date, institution='justitie', description=description, documents=documents, contact=self.get_contacts(description), feedback_days=feedback_days ) yield item paginationLinkEx = LinkExtractor(restrict_css='ul.lcp_paginator') pages = paginationLinkEx.extract_links(response) for page in pages: yield scrapy.Request(page.url, callback=self.parse) pass
def parse_item(self, response): loader = ItemLoader(ChsiDaxueItem(), response) loader.add_value('id', response.url, re=ur'schId-(\w+)\.dhtml') loader.add_value('url', response.url) loader.add_css('logo', u'.r_c_sch_logo>img::attr(src)', MapCompose(lambda url: urljoin('http://gaokao.chsi.com.cn/', url))) loader.add_css('name', u'.topImg::text') loader.add_css('badges', u'.r_c_sch_attr .r_c_sch_icon::attr(title)') data_clean = MapCompose(lambda x: re.sub(r'\s+', ' ', x), unicode.strip) loader.add_xpath('type', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean) loader.add_xpath('membership', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean) loader.add_xpath('province', u'//span[@class="f_bold" and span]/following-sibling::text()', data_clean) loader.add_xpath('address', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean) loader.add_xpath('phone', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean) loader.add_xpath('website', u'//span[@class="f_bold" and .="?????"]/following-sibling::a/@href', data_clean) loader.add_xpath('backdoor', u'//span[@class="f_bold" and .="?????"]/following-sibling::text()', data_clean) def parse_votes(): xpath = u'//td[@class="tdMydT" and .="{}"]/following-sibling::td/div[@class="rank"]/@rank' get_vote = lambda what: float(response.xpath(xpath.format(what)).extract_first() or 0) return { 'overall': get_vote(u'?????'), 'environment': get_vote(u'???????'), 'life': get_vote(u'?????'), } loader.add_value('votes', parse_votes()) def parse_trending(): css = u'{}>table tr:not(:first-child)' def get_trending(what): majors = [] for e in response.css(css.format(what)): majors.append({ 'id': e.css(u'.tdZytjTDiv>a::attr(href)').re_first(r'specId=(\w+)'), 'name': e.css(u'.tdZytjTDiv::attr(title)').extract_first(), 'vote': float(e.css(u'.avg_rank::text').extract_first()), 'count': int(e.css(u'.c_f00::text, .red::text').extract_first()), }) return majors return { 'count': get_trending(u'#topNoofPTable'), 'index': get_trending(u'#topIndexTable'), 'like': get_trending(u'.r_r_box_zymyd'), } loader.add_value('trending', parse_trending()) item = loader.load_item() for link in LinkExtractor(restrict_xpaths=u'//ul[@id="topNav"]//a[.="????"]').extract_links(response): yield Request(link.url, meta={'item': item}, callback=self.parse_jianjie)