我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scrapy.http.HtmlResponse()。
def process_request(self, request, spider): if spider.name == "jobbole": self.browser.get(request.url) import time time.sleep(3) print ("??:{0}".format(request.url)) return HtmlResponse(url=self.browser.current_url, body=self.browser.page_source, encoding="utf-8", request=request) #linux? # from pyvirtualdisplay import Display # display = Display(visible=0, size=(800, 600)) # display.start() # # browser = webdriver.Chrome() # browser.get()
def main(): start = timer() url = 'http://scrapinghub.com/' link_extractor = LinkExtractor() total = 0 for files in glob.glob('sites/*'): f = (io.open(files, "r", encoding="utf-8")) html = f.read() r3 = HtmlResponse(url=url, body=html, encoding='utf8') links = link_extractor.extract_links(r3) total = total + len(links) end = timer() print("\nTotal number of links extracted = {0}".format(total)) print("Time taken = {0}".format(end - start)) click.secho("Rate of link extraction : {0} links/second\n".format( float(total / (end - start))), bold=True) with open("Benchmark.txt", 'w') as g: g.write(" {0}".format((float(total / (end - start)))))
def process_request(self, request, spider): if request.meta.has_key('PhantomJS'): log.debug('PhantomJS Requesting: %s' % request.url) ua = None try: ua = UserAgent().random except: ua = 'Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11' webdriver.DesiredCapabilities.PHANTOMJS['phantomjs.page.settings.userAgent'] = ua try: self.driver.get(request.url) content = self.driver.page_source.encode('utf-8') url = self.driver.current_url.encode('utf-8') except: return HtmlResponse(request.url, encoding='utf-8', status=503, body='') if content == '<html><head></head><body></body></html>': return HtmlResponse(request.url, encoding ='utf-8', status=503, body='') else: return HtmlResponse(url, encoding='utf-8', status=200, body=content) else: log.debug('Common Requesting: %s' % request.url)
def goodsUrlList(home_url): ''' ?????????????????url :param home_url: http://www.vipmro.com/search/?&categoryId=501110 :return:url?? ''' # ???????? all_group_list = parseOptional(home_url) # ????goods????url url_list = [] for url in all_group_list: # url = 'http://www.vipmro.com/search/?ram=0.9551325197768372&categoryId=501110&attrValueIds=509805,509801,509806,509807' # ??html home_page = getHtmlFromJs(url)['content'].encode('utf-8') html = HtmlResponse(url=url,body=str(home_page)) urls = html.selector.xpath('/html/body/div[7]/div[1]/ul/li/div[2]/a/@href').extract() url_list.extend(urls) # print(len(urls)) # print(urls) # exit() # print(len(url_list)) # print(url_list) return url_list
def parseOptional(url): ''' ??url???????????url :param url: http://www.vipmro.com/search/?&categoryId=501110 :return:['http://www.vipmro.com/search/?categoryId=501110&attrValueIds=509801,512680,509807,509823'] ''' # ??html home_page = getHtmlFromJs(url)['content'].encode('utf-8') html = HtmlResponse(url=url,body=str(home_page)) # ???? xi_lie = html.selector.xpath('/html/body/div[5]/div[6]/ul/li/a/@href').re(r'ValueIds=(\d+)') # ???????? fen_duan = html.selector.xpath('/html/body/div[5]/div[10]/ul/li/a/@href').re(r'ValueIds=(\d+)') # ????? tuo_kou_qi = html.selector.xpath('/html/body/div[5]/div[14]/ul/li/a/@href').re(r'ValueIds=(\d+)') # ???? an_zhuang = html.selector.xpath('/html/body/div[5]/div[12]/ul/li/a/@href').re(r'ValueIds=(\d+)') # ???????? all_group = list(itertools.product(xi_lie,fen_duan,tuo_kou_qi,an_zhuang)) _url = url + '&attrValueIds=' url_list = map(lambda x:_url+','.join(list(x)),all_group) return url_list
def process_request(self, request, spider): try: driver = webdriver.PhantomJS() #???????? # driver = webdriver.Firefox() print "---"+str(request.meta["page"])+"-----js url start-------" print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") driver.get(self.pc_index_url+"&page="+str(request.meta["page"]) ) # time.sleep(1) tmp=driver.find_element_by_id('sf-item-list-data').get_attribute("innerHTML") print "---"+str(request.meta["page"])+"-----js url end-------" print datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") body = tmp return HtmlResponse(driver.current_url, body=body, encoding='utf-8', request=request) except Exception,e: print "-------------------" print e.__doc__ print e.message print "-------------------"
def intohotel(self,Links): url = "http://hotels.ctrip.com/" + Links self.driver.get(url) self.driver.maximize_window() self.driver.implicitly_wait(80) time.sleep(3) response = HtmlResponse(url="my HTML string",body=self.driver.page_source,encoding="utf-8") # ???????? # self.crawlcommentinfo(commentnum) # # ???????? try: items = self.crawlhotelinfo(response,url) except: items = self.crawlhotelinfo2(response,url) # ???????? self.xiechengDao.savehotelComment(items) # ???????????
def __crawllianjie(self,page_sourse): response = HtmlResponse(url="my HTML string",body=page_sourse,encoding="utf-8") hotel_list = response.xpath("//div[@class='searchresult_list ']/ul") for hotel in hotel_list: url = hotel.xpath("li[@class='searchresult_info_name']/h2/a/@href").extract()[0] address = hotel.xpath("li[@class='searchresult_info_name']/p[@class='searchresult_htladdress']/text()").extract()[0] commnum = hotel.xpath("li[@class='searchresult_info_judge ']/div/a/span[@class='hotel_judgement']/text()").extract() if len(commnum): commnum = re.sub('\D','',commnum[0]) commnum = commnum if len(commnum)>0 else 0 else: commnum = 0 name = hotel.xpath("li[@class='searchresult_info_name']/h2/a/text()").extract()[0] self.listPageInfo.append({ "guid": uuid.uuid1(), "url": url, "hotel_name": name, "OTA": self.__ota_info, "comm_num": int(commnum), "address": address })
def __parseUrls(self, page_source): response = HtmlResponse(url="my HTML string",body=page_source,encoding="utf-8") # ?????????url???urlList? url_list = response.xpath("//a[@class='name']/@href").extract() comment_number_list = response.xpath("//div[@class='comment']/a/span/text()").extract() name_list = response.xpath("//a[@class='name']/text()").extract() address_list = response.xpath("//span[@class='address']/text()").extract() if len(url_list) == len(comment_number_list) == len(name_list) == len(address_list): for i in range(0, len(url_list)): self.listPageInfo.append({ "guid": uuid.uuid1(), "url": url_list[i], "hotel_name": name_list[i], "OTA": "??", "comm_num": int(comment_number_list[i]), "address": address_list[i] })
def __parseUrls(self,page_source): response = HtmlResponse(url="My HTML String",body=page_source,encoding="utf-8") hotel_list = response.xpath("//div[@class='h_list']/div[@class='h_item']") for hotel in hotel_list: url = hotel.xpath(".//p[@class='h_info_b1']/a/@href").extract()[0] name = hotel.xpath(".//p[@class='h_info_b1']/a/@title").extract()[0] address = hotel.xpath(".//p[@class='h_info_b2']/text()").extract()[1] commnum = hotel.xpath(".//div[@class='h_info_comt']/a/span[@class='c555 block mt5']/b/text()").extract() if len(commnum)==0: commnum = 0 else:commnum = commnum[0] self.listPageInfo.append({ "guid": uuid.uuid1(), "url": url, "hotel_name": name, "OTA": self.__ota_info, "comm_num": commnum, "address": address }) pass
def process_request(self, request, spider): try: selenium_enable = request.meta.get('selenium') except Exception as e: log.info(e) selenium_enable = False if selenium_enable: self.driver.get(request.url) WebDriverWait(self.driver, 10).until( EC.presence_of_element_located( (By.CSS_SELECTOR, "#js-fans-rank > div > div.f-con > div.f-cn.cur > ul > li> a")) ) body = self.driver.page_source response = HtmlResponse(url=self.driver.current_url, body=body, request=request, encoding='utf8') return response else: request.headers[ 'User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) ' request.headers[ 'Accept'] = '*/*' request.headers['Accept-Encoding'] = 'gzip, deflate, sdch, br' request.headers['Accept-Language'] = 'zh-CN,zh;q=0.8,zh-TW;q=0.6' request.headers['Connection'] = 'keep-alive' request.headers['Host'] = 'www.douyu.com' request.headers['Upgrade-Insecure-Requests'] = 1 try: cookies_enable = request.meta.get('cookies') except Exception as e: log.info(e) cookies_enable = False if cookies_enable: del request.headers['Upgrade-Insecure-Requests'] request.headers['DNT'] = '1' request.headers['X-Requested-With'] = 'XMLHttpRequest' request.headers['referer'] = request.meta['referer'] self.cookies['_dys_lastPageCode'] = request.meta.get('_dys_lastPageCode') self.cookies['_dys_refer_action_code'] = request.meta.get('_dys_refer_action_code') request.cookies = self.cookies
def process_request(self, request, spider): if self.use_selenium(request.url): if self.use_proxy(): if self._count > 20: self.update_driver() self._count = 0 log.info('update driver') yield HtmlResponse(request.url, encoding='utf-8', body=self.driver.page_source.encode('utf8'))
def process_request(self, request, spider): if request.url[26] == 'c': ua = random.choice(self.user_agent_list) dcap = dict(DesiredCapabilities.PHANTOMJS) dcap["phantomjs.page.settings.userAgent"] = ua dcap["phantomjs.page.settings.loadImages"] = False driver = webdriver.PhantomJS(executable_path='E:\Webdriver\phantomjs-2.1.1-windows\\bin\phantomjs.exe', desired_capabilities=dcap) driver.get(request.url) sleep_time = random.randint(15, 22) time.sleep(sleep_time) try: detail = driver.find_element_by_xpath('//a[@ng-click="showDetail = btnOnClick(showDetail)"]') detail.click() except: pass body = driver.page_source url = driver.current_url driver.quit() return HtmlResponse(url=url, body=body, request=request, encoding='utf-8')
def process_request(self, request, spider): if spider.name == "gsxt": # print("PhantomJS is starting...") # driver = webdriver.PhantomJS(r"/home/lxw/Downloads/phantomjs/phantomjs-2.1.1-linux-x86_64/bin/phantomjs") # OK driver = webdriver.Chrome(r"/home/lxw/Software/chromedirver_selenium/chromedriver") # OK """ # Using IP Proxies: # ????chrome?????chrome???IP????????????????? # ??DesiredCapabilities(????)??????????sessionId????????????????????????????url proxy = webdriver.Proxy() proxy.proxy_type = ProxyType.MANUAL req = requests.get("http://datazhiyuan.com:60001/plain", timeout=10) print("Get an IP proxy:", req.text) if req.text: proxy.http_proxy = req.text # "1.9.171.51:800" # ????????webdriver.DesiredCapabilities.PHANTOMJS? proxy.add_to_capabilities(webdriver.DesiredCapabilities.PHANTOMJS) driver.start_session(webdriver.DesiredCapabilities.PHANTOMJS) """ driver.get(request.url) # ????????????, ??http://roll.news.qq.com/?? time.sleep(2) js = "var q=document.documentElement.scrollTop=10000" driver.execute_script(js) # ???js???????????????????? time.sleep(3) body = driver.page_source print("??" + request.url) return HtmlResponse(driver.current_url, body=body, encoding='utf-8', request=request) else: return
def do_test(self, meta_object, text, expected_raw, expected_requests): request = Request(url='http://www.drudgereport.com', meta=meta_object) response = HtmlResponse('drudge.url', body=text, request=request) raw_item_count = 0 request_count = 0 for x in self.spider.parse(response): if isinstance(x, RawResponseItem): raw_item_count = raw_item_count + 1 elif isinstance(x, Request): request_count = request_count + 1 self.assertEqual(raw_item_count, expected_raw) self.assertEqual(request_count, expected_requests)
def detail_translate_note(self, all_url, itemi): for url in all_url: url = self.site_domain + url print('detail_translate_note url %s' % url) html_requests = requests.get(url).text.encode('utf-8') html_response = HtmlResponse(url=url, body=html_requests, headers={'Connection': 'close'}) html_all = Selector(html_response) itemi['detail_translate_note_text_title'] = html_all.xpath( '//div[@class="main3"]/div[@class="shileft"]/div[@class="son1"]/h1/text()').extract() itemi['detail_translate_text'] = html_all.xpath( '//div[@class="main3"]/div[@class="shileft"]/div[@class="shangxicont"]/p[not(@style)]/descendant-or-self::text()').extract() item_list_temp = [] for item_list in itemi['detail_translate_text']: temp = item_list.encode('utf-8') temp = re.sub(r'\"', "“", temp) item_list_temp.append(temp) itemi['detail_translate_text'] = item_list_temp pass # ????
def test_parse_drug_details_or_overview_generates_new_request_if_redirected_to_search_page(self): url = 'http://www.accessdata.fda.gov/scripts/cder/drugsatfda/index.cfm?fuseaction=Search.Search_Drug_Name' meta = { 'original_url': 'http://www.accessdata.fda.gov/somewhere.cfm', 'original_cookies': { 'foo': 'bar', }, } mock_response = HtmlResponse(url=url) mock_response.request = Request(url, meta=meta) with mock.patch('random.random', return_value='random_cookiejar'): spider = Spider() request = spider.parse_drug_details_or_overview(mock_response) assert request.url == meta['original_url'] assert request.cookies == meta['original_cookies'] assert request.dont_filter assert request.callback == spider.parse_drug_details_or_overview assert request.meta['cookiejar'] == 'random_cookiejar'
def get_url(betamax_session): def _get_url(url, request_kwargs={}): '''Returns a scrapy.html.HtmlResponse with the contents of the received url. Note that the session is kept intact among multiple calls to this method (i.e. cookies are passed over). We also don't verify SSL certificates, because Takeda's certificate is invalid. If they become valid, we can resume verifying the certificates. ''' response = betamax_session.get(url, verify=False) scrapy_response = HtmlResponse( url=str(response.url), body=response.content, ) scrapy_response.request = Request(url, **request_kwargs) return scrapy_response return _get_url
def test_form_request_from_response(): # Copied from scrapy tests (test_from_response_submit_not_first_clickable) def _buildresponse(body, **kwargs): kwargs.setdefault('body', body) kwargs.setdefault('url', 'http://example.com') kwargs.setdefault('encoding', 'utf-8') return HtmlResponse(**kwargs) response = _buildresponse( """<form action="get.php" method="GET"> <input type="submit" name="clickable1" value="clicked1"> <input type="hidden" name="one" value="1"> <input type="hidden" name="two" value="3"> <input type="submit" name="clickable2" value="clicked2"> </form>""") req = SplashFormRequest.from_response( response, formdata={'two': '2'}, clickdata={'name': 'clickable2'}) assert req.method == 'GET' assert req.meta['splash']['args']['url'] == req.url fs = cgi.parse_qs(req.url.partition('?')[2], True) assert fs['clickable2'] == ['clicked2'] assert 'clickable1' not in fs assert fs['one'] == ['1'] assert fs['two'] == ['2']
def extractLinks(self, response): retv = [] link_extractor = LinkExtractor() if isinstance(response, HtmlResponse): links = link_extractor.extract_links(response) for link in links: if self.postfix in link.url: retv.append(link.url) return retv
def goodsDetail(detail_url): ''' ??xpath?????? :param detail_url: ???url :return: ?????? dict ''' goods_data = defaultdict() # ????? goods_data['source_url'] = detail_url # ??html body???str?? body = getHtmlFromJs(detail_url)['content'].encode('utf-8') html = HtmlResponse(url=detail_url,body=str(body)) # ?? goods_data['name'] = html.xpath('/html/body/div[7]/div[2]/h1/text()').extract()[0] # ?? goods_data['price'] = html.selector.xpath('/html/body/div[7]/div[2]/div[2]/ul/li[1]/label[1]/text()').extract()[0] # ?? goods_data['type'] = html.selector.xpath('/html/body/div[7]/div[2]/div[2]/ul/li[3]/label/text()').extract()[0] # ?? goods_data['detail'] = html.selector.xpath('/html/body/div[9]/div[2]/div[2]/table').extract()[0] # ?? pics = [] for pic in html.selector.xpath('/html/body/div[7]/div[1]/div[2]/div[2]/ul/li/img'): # ??????,???? pics.append(pic.xpath('@src').extract()[0].replace('!240240','')) goods_data['pics'] = '|'.join(pics) goods_data['storage'] = '' goods_data['lack_period'] = '' goods_data['created'] = int(time.time()) goods_data['updated'] = int(time.time()) # print(goods_data['detail']) return goods_data
def process_request(self, request, spider): # driver = webdriver.Firefox(executable_path="/Users/roysegall/geckodriver") driver = webdriver.PhantomJS(executable_path='/Users/roysegall/phantomjs') driver.get(request.url) return HtmlResponse(request.url, encoding='utf-8', body=driver.page_source.encode('utf-8'))
def process_request(self, request, spider): if request.meta.get('nojs'): # disable js rendering in a per-request basis return self.driver.get(request.url) content = self.driver.page_source return HtmlResponse(request.url, body=content, encoding='utf-8')
def pageHandler_comment(self,page_source,pageNum,userID,weiboID): response = HtmlResponse(url="my HTML string",body=page_source,encoding="utf-8") if pageNum==1: pass items = self.__getCommentItems(response,pageNum,userID,weiboID) if len(items)>0: self.weiboDao.saveWeiboComment(items) # ??????????
def __parseHotelComment(self, page_source, hotel_id, comm_type): response = HtmlResponse(url="My HTML String", body=page_source, encoding="utf-8") remarkDom = response.xpath("//div[@class='user_remark_datail']") remarkDomLen = len(response.xpath("//div[@class='user_remark_datail']/div")) # ????????????????????? same_num = 0 for i in range(1, remarkDomLen+1): id = uuid.uuid1() # ??? username = remarkDom.xpath("div[%d]/div[@class='a1']/div[@class='b2']/text()"%i).extract() username = username[0] if len(username) > 0 else "" # ???? remarkText = remarkDom.xpath("div[%d]/div[@class='a2']/div[@class='b2']/p/text()"%i).extract() remark = "" for str in remarkText: remark = remark + re.sub("\s+", "", str) # ???? comm_time = remarkDom.xpath("div[%d]/div[@class='a2']/div[@class='b4']/div[@style='float: right;']/text()"%i).extract()[0] # ???? user_type = "" senti_value = None viewpoint = None try: user_type = remarkDom.xpath("div[%d]/div[@class='a1']/div[@class='b3']/text()"%i).extract()[0] senti_value = self.hotelNLP.sentiment(remark.encode("utf-8")) viewpoint = json.dumps(self.hotelNLP.viewpoint(remark.encode("utf-8"),decoding="utf-8")) except: traceback.print_exc() comm = {"guid":id, "username":username, "remark":remark, "comm_time":comm_time, "user_type":user_type, "hotel_id":hotel_id, "comm_type":comm_type, "senti_value":senti_value, "viewpoint":viewpoint} if self.__is_exist_in_comment_list(comm): same_num += 1 else: self.commList.append(comm) if same_num == remarkDomLen: return False else: return True
def _extract_requests(self, response): r = [] if isinstance(response, HtmlResponse): links = self.link_extractor.extract_links(response) r.extend(Request(x.url, callback=self.parse) for x in links) return r
def parse(self, response): # Wiener Linien returns HTML with an XML content type which creates an # XmlResponse. response = HtmlResponse(url=response.url, body=response.body) for item in response.css('.block-news-item'): il = FeedEntryItemLoader(response=response, timezone=self._timezone, base_url='http://{}'.format(self.name)) link = response.urljoin(item.css('a::attr(href)').extract_first()) il.add_value('link', link) il.add_value('title', item.css('h3::text').extract_first()) il.add_value('updated', item.css('.date::text').extract_first()) yield scrapy.Request(link, self.parse_item, meta={'il': il})
def process_request(self, request, spider): if JAVASCRIPT in request.meta and request.meta[JAVASCRIPT] is True: driver = self.phantomjs_opened() try: driver.get(request.url) body = driver.page_source return HtmlResponse(request.url, body=body, encoding='utf-8', request=request) finally: self.phantomjs_closed(driver)
def process_request(self, request, spider): if 'how' in request.meta: if 'isscreen' in request.meta: print(1) true_page = selenium_request(request.url,True) else: true_page = selenium_request(request.url) return HtmlResponse(request.url, body=true_page, encoding='utf-8', request=request, )
def process_request(self, request, spider): if spider.name == "jobbole": spider.browser.get(request.url) import time # time.sleep(3) print("???{0}".format(request.url)) return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8" , request=request)
def parse_kb(self, response): # initial html tokenization to find regions segmented by e.g. "======" # or "------" filtered = response.xpath( "//div[@class='sfdc_richtext']").extract()[0].split("=-") for entry in [x and x.strip() for x in filtered]: resp = HtmlResponse(url=response.url, body=entry, encoding=response.encoding) for link in resp.xpath("//a"): href = link.xpath("@href").extract()[0] if "cache-www" in href: text = resp.xpath("//text()").extract() text_next = link.xpath("following::text()").extract() item = FirmwareLoader(item=FirmwareImage(), response=response, date_fmt=["%b %d, %Y", "%B %d, %Y", "%m/%d/%Y"]) version = FirmwareLoader.find_version_period(text_next) if not version: version = FirmwareLoader.find_version_period(text) item.add_value("version", version) item.add_value("date", item.find_date(text)) item.add_value("url", href) item.add_value("product", response.meta["product"]) item.add_value("vendor", self.name) yield item.load_item()
def parse_kb(self, response): mib = None # need to perform some nasty segmentation because different firmware versions are not clearly separated # reverse order to get MIB before firmware items for entry in reversed(response.xpath( "//div[@id='support-article-downloads']/div/p")): for segment in reversed(entry.extract().split("<br><br>")): resp = HtmlResponse( url=response.url, body=segment, encoding=response.encoding) for href in resp.xpath("//a/@href").extract(): text = resp.xpath("//text()").extract() if "MIBs" in href: mib = href elif "firmware" in href: text = resp.xpath("//text()").extract() item = FirmwareLoader( item=FirmwareImage(), response=resp, date_fmt=["%m/%d/%Y"]) item.add_value("date", item.find_date(text)) item.add_xpath("url", "//a/@href") item.add_value("mib", mib) item.add_value("product", response.meta["product"]) item.add_value("vendor", self.name) item.add_value( "version", FirmwareLoader.find_version_period(text)) yield item.load_item()
def process_request(self, request, spider): print("Using process_request") true_page = selenium_request(request.url) return HtmlResponse(request.url, body=true_page, encoding='utf-8', request=request)
def process_request(self, request, spider): if spider.name == 'jobbole': spider.browser.get(request.url) time.sleep(3) print("??: {0}".format(request.url)) return HtmlResponse( url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8", request=request )
def read(self, source): source_filename = os.path.basename(source) with zipfile.ZipFile(source) as zf: filenames = sorted(set([zipinfo.filename[:10] for zipinfo in zf.infolist()])) for filename in filenames: source_path = u'{0}/{1}'.format(source_filename, filename) # Read info desc = zf.read(self.INFO_FORMAT.format(filename)) info = json.loads(desc) url = info['url'].encode('utf8') info.pop('url', None) headers = info['headers'] info.pop('headers', None) status = info['status'] info.pop('status', None) info_meta = info['meta'] info_meta['source_path'] = source_path # Read content content = zf.read(self.BODY_FORMAT.format(filename)) request = Request( url=url, meta=info_meta ) response = HtmlResponse( url=url, headers=headers, status=status, body=content, request=request, ) yield response
def handle_detail(self, response, itemi): print(response) response = response.strip() # requests.adapters.DEFAULT_RETRIES = 10 # s = requests.session() # s.config['keep_alive'] = False html_requests_item = requests.get(response) html_requests = html_requests_item.text.encode('utf-8') # html_requests_item.connection.close() html_response = HtmlResponse(url=response, body=html_requests, headers={'Connection': 'close'}) html_all = Selector(html_response) html = html_all.xpath('//div[@class="main3"]/div[@class="shileft"]') itemi['detail_dynasty'] = html.xpath( u'div[@class="son2"]/p/span[contains(text(),"???")]/parent::p/text()').extract()[0] itemi['detail_translate_note_url'] = html.xpath( u'div[@class="son5"]//u[contains(text(),"?????")]/parent::a/@href').extract() itemi['detail_appreciation_url'] = html.xpath( u'div[@class="son5"]//u[contains(text(),"?")]/parent::a/@href').extract() itemi['detail_background_url'] = html.xpath( u'div[@class="son5"]//u[contains(text(),"????") or contains(text(),"????")]/parent::a/@href').extract() itemi['detail_author'] = html.xpath( u'div[@class="son2"]/p/span[contains(text(),"???")]/parent::p/a/text()').extract() itemi['detail_text'] = "".join(html.xpath('div[@class="son2"]/text()').extract()).strip().encode('utf-8') # itemi['detail_text'] = re.sub(r'?',"“",itemi['detail_text']) # itemi['detail_text'] = re.sub(r'\(.*?\)',"",itemi['detail_text']) itemi['detail_text'] = re.sub(r'\r?\n\t?.*?\)', "", itemi['detail_text']) if itemi['detail_background_url']: self.detail_background(itemi['detail_background_url'], itemi) pass else: pass self.detail_translate_note(itemi['detail_translate_note_url'], itemi) self.detail_appreciation(itemi['detail_appreciation_url'], itemi) # ??????
def detail_background(self, all_url, itemi): detail_appreciation_container = [] for url in all_url: url = self.site_domain + url print('detail_background_text url : %s' % url) html_requests = requests.get(url).text.encode('utf-8') html_response = HtmlResponse(url=url, body=html_requests, headers={'Connection': 'close'}) html_all = Selector(html_response) temp = ''.join(html_all.xpath( u'//div[@class="main3"]/div[@class="shileft"]/div[@class="shangxicont"]/p[not(@style or contains(text(),"?????"))]').extract()) temp = temp.encode('utf-8') temp = re.sub(r'<p>', '', temp) temp = re.sub(r'</p>', '', temp) temp = re.sub(r'</a>', '', temp) temp = re.sub(r'(<a\s+href=\s*\".*?\">)', '', temp) alt = re.search(r'\s+alt=\s*\"(.*?)\"\s+', temp) # print(alt.group(1)) if alt is not None: temp = re.sub(r'<img.*\s*>', alt.group(1), temp) else: print('%s have a none img' % url) temp = re.sub(r'\"', "“", temp) detail_appreciation_container.append(temp) itemi['detail_background_text'] = detail_appreciation_container # ???????
def detail_appreciation(self, all_url, itemi): detail_appreciation_container = [] for url in all_url: url = self.site_domain + url print('detail_appreciation url : %s' % url) html_requests = requests.get(url).text.encode('utf-8') html_response = HtmlResponse(url=url, body=html_requests, headers={'Connection': 'close'}) html_all = Selector(html_response) temp = ''.join(html_all.xpath( u'//div[@class="main3"]/div[@class="shileft"]/div[@class="shangxicont"]/p[not(@style or contains(text(),"?????"))]').extract()) temp = temp.encode('utf-8') temp = re.sub(r'<p>', '', temp) temp = re.sub(r'</p>', '', temp) temp = re.sub(r'</a>', '', temp) temp = re.sub(r'(<a\s+href=\s*\".*?\">)', '', temp) alt = re.search(r'\s+alt=\s*\"(.*?)\"\s+', temp) # print(alt.group(1)) if alt is not None: temp = re.sub(r'<img.*\s*>', alt.group(1), temp) else: print('%s have a none img in appricate' % url) temp = re.sub(r'\"', "“", temp) # if self.site_domain + '/shangxi_4618.aspx' == url: # print(temp) detail_appreciation_container.append(temp) itemi['detail_appreciation_text'] = detail_appreciation_container pass
def process_request(self, request, spider): if spider.name == "jobbole": # browser = webdriver.Chrome(executable_path="D:/Temp/chromedriver.exe") spider.browser.get(request.url) import time time.sleep(3) print ("??:{0}".format(request.url)) return HtmlResponse(url=spider.browser.current_url, body=spider.browser.page_source, encoding="utf-8", request=request)
def test_parse_drug_details_or_overview_delegates_to_parse_drug_details_when_response_in_drug_details(self): url = 'http://www.accessdata.fda.gov/scripts/cder/drugsatfda/index.cfm?fuseaction=Search.DrugDetails' mock_response = HtmlResponse(url=url) expected_result = 'expected_result' with mock.patch.object(Spider, 'parse_drug_details', return_value=expected_result) as mock_method: spider = Spider() result = spider.parse_drug_details_or_overview(mock_response) mock_method.assert_called_once_with(mock_response) assert result == expected_result
def test_parse_drug_details_or_overview_delegates_to_parse_drug_details_when_response_in_drug_overview(self): url = 'http://www.accessdata.fda.gov/scripts/cder/drugsatfda/index.cfm?fuseaction=Search.Overview&DrugName=E-BASE' mock_response = HtmlResponse(url=url) expected_result = 'expected_result' with mock.patch.object(Spider, 'parse_drug_overview', return_value=expected_result) as mock_method: spider = Spider() result = spider.parse_drug_details_or_overview(mock_response) mock_method.assert_called_once_with(mock_response) assert result == expected_result
def test_parse_drug_details_or_overview_raises_exception_for_unknown_pages(self): url = 'http://www.accessdata.fda.gov/' mock_response = HtmlResponse(url=url) with pytest.raises(Exception): spider = Spider() spider.parse_drug_details_or_overview(mock_response)
def parse(self, response): marker_txt = re.findall(re.compile("markerData.*\}", re.MULTILINE), response.body_as_unicode()) if not len(marker_txt): return markers_json = "{\"" + marker_txt[0] markers = list(json.loads(markers_json).values())[0] if not len(markers): return for marker in markers: marker_response = HtmlResponse(url="", body=marker["info"].encode("utf-8")) hours = re.findall(r"\{\"label.*\}", marker["info"]) hours = hours[0] parsed_hours = json.loads(hours) addr_parts = marker_response.css(".address span:not(.phone)::text").extract() url = marker_response.css("header a").xpath("@href").extract_first() city, state = addr_parts[-1].split(",") yield GeojsonPointItem(lat=marker.get("lat"), lon=marker.get("lng"), name=marker_response.css("header a::text").extract_first(default=None), addr_full=", ".join(addr_parts), city=city.strip(), state=state.strip(), country="United States", phone=marker_response.css(".phone::text").extract_first(), website=url, opening_hours=get_hours(parsed_hours["days"]), ref=url.split("/")[-1].split(".")[0])
def parse(self, response): data = json.loads(response.body_as_unicode()) stores = data['markers'] for store in stores: html = HtmlResponse( url="", body=store['info'].encode('UTF-8') ) unp = {} unp['lat'] = store['lat'] unp['lon'] = store['lng'] if unp['lat']: unp['lat'] = float(unp['lat']) if unp['lon']: unp['lon'] = float(unp['lon']) unp['ref'] = store['locationId'] unp['addr_full'] = html.xpath('//div[contains(@class, "addr")]/text()').extract_first() unp['phone'] = html.xpath('//div[contains(@class, "phone")]/text()').extract_first() unp['name'] = html.xpath('//div[@class="loc-name"]/text()').extract_first() addr2 = html.xpath('//div[contains(@class, "csz")]/text()').extract_first() if addr2: addr2 = addr2.strip() three_pieces = self.addr2regex.search(addr2) if three_pieces: city, state, zipcode = three_pieces.groups() unp['city'] = city unp['state'] = state unp['postcode'] = zipcode properties = {} for key in unp: if unp[key]: properties[key] = unp[key] yield GeojsonPointItem(**properties)
def main(): total = 0 time = 0 tar = tarfile.open("bookfiles.tar.gz") for member in tar.getmembers(): f = tar.extractfile(member) html = f.read() response = HtmlResponse(url="local", body=html, encoding='utf8') start = timer() rating = response.xpath( "//*[@id='content_inner']/article/div[1]/div[2]/p[3]/i[1]").extract(), # .split(' ')[-1], title = response.xpath( "//*[@id=('content_inner')]/article/div[1]/div[2]/h1").extract(), price = response.xpath( "//*[@id=('content_inner')]/article/div[1]/div[2]/p[1]"), stock = ''.join(response.xpath( "//*[@id=('content_inner')]/article/div[1]/div[2]/p[2]").re('(\d+)')), end = timer() page = [rating, title, price, stock] total = total + 1 time = time + end - start print("\nTotal number of pages extracted = {0}".format(total)) print("Time taken = {0}".format(time)) click.secho("Rate of link extraction : {0} pages/second\n".format( float(total / time)), bold=True) with open("Benchmark.txt", 'w') as g: g.write(" {0}".format((float(total / time))))
def main(): url = 'http://scrapinghub.com/' link_extractor = LinkExtractor() total = 0 time = 0 tar = tarfile.open("sites.tar.gz") for member in tar.getmembers(): f = tar.extractfile(member) html = f.read() start = timer() response = HtmlResponse(url=url, body=html, encoding='utf8') links = link_extractor.extract_links(response) end = timer() total = total + len(links) time = time + end - start print("\nTotal number of links extracted = {0}".format(total)) print("Time taken = {0}".format(time)) click.secho("Rate of link extraction : {0} links/second\n".format( float(total / time)), bold=True) with open("Benchmark.txt", 'w') as g: g.write(" {0}".format((float(total / time))))
def main(): total = 0 time = 0 tar = tarfile.open("bookfiles.tar.gz") for member in tar.getmembers(): f = tar.extractfile(member) html = f.read() response = HtmlResponse(url="local", body=html, encoding='utf8') start = timer() rating = response.css( 'p.star-rating::attr(class)').extract_first().split(' ')[-1] title = response.css('.product_main h1::text').extract_first() price = response.css( '.product_main p.price_color::text').re_first('£(.*)') stock = ''.join( response.css('.product_main .instock.availability ::text').re('(\d+)')) category = ''.join( response.css('ul.breadcrumb li:nth-last-child(2) ::text').extract()).strip() end = timer() page = [rating, title, price, stock, category] total = total + 1 time = time + end - start print("\nTotal number of pages extracted = {0}".format(total)) print("Time taken = {0}".format(time)) click.secho("Rate of link extraction : {0} pages/second\n".format( float(total / time)), bold=True) with open("Benchmark.txt", 'w') as g: g.write(" {0}".format((float(total / time))))