我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用scrapy.http.Request()。
def parse(self, response): """ 1. ???????????url???scrapy???????? 2. ??????url???scrapy????? ???????parse """ # ???????????url???scrapy???????? if response.status == 404: self.fail_urls.append(response.url) self.crawler.stats.inc_value("failed_url") #?extra?list???????? post_nodes = response.css("#archive .floated-thumb .post-thumb a") for post_node in post_nodes: #??????url image_url = post_node.css("img::attr(src)").extract_first("") post_url = post_node.css("::attr(href)").extract_first("") #request?????????parse_detail?????????? # Request(url=post_url,callback=self.parse_detail) yield Request(url=parse.urljoin(response.url, post_url), meta={"front_image_url": image_url}, callback=self.parse_detail) #??href????????? #response.url + post_url print(post_url) # ????????scrapy???? next_url = response.css(".next.page-numbers::attr(href)").extract_first("") if next_url: yield Request(url=parse.urljoin(response.url, next_url), callback=self.parse)
def process_spider_output(self, response, result, spider): """record this page """ mongo_uri=spider.crawler.settings.get('MONGO_URI') mongo_db=spider.crawler.settings.get('MONGO_DB') client = pymongo.MongoClient(mongo_uri) db = client[mongo_db] def add_field(request, response): if isinstance(request, Request): db[self.collection_name].update_one( {}, {'$set': {'page_url': response.request.url}}, upsert=True) return True ret = [req for req in result if add_field(req, response)] client.close() return ret
def start_requests(self): url = 'https://www.assetstore.unity3d.com/login' yield Request( url = url, headers = { 'Accept': 'application/json', 'Accept-Encoding': 'gzip, deflate, br', 'Accept-Language': 'zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3', 'Connection': 'keep-alive', 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8', 'Host': 'www.assetstore.unity3d.com', 'Referer': 'https://www.assetstore.unity3d.com/en/', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:50.0) Gecko/20100101 ' 'Firefox/50.0', 'X-Kharma-Version': '0', 'X-Requested-With': 'UnityAssetStore', 'X-Unity-Session': '26c4202eb475d02864b40827dfff11a14657aa41', }, meta = { }, dont_filter = True, callback = self.get_unity_version, errback = self.error_parse, )
def login(self,response): cookie_jar = CookieJar() cookie_jar.extract_cookies(response,response.request) for k,v in cookie_jar._cookies.items(): for i,j in v.items(): for m,n in j.items(): self.cookie_dict[m] = n.value req = Request( url='http://dig.chouti.com/login', method='POST', headers={'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8'}, body='phone=13331167937&password=zds819918&oneMonth=1', cookies=self.cookie_dict, callback=self.check_login ) yield req
def parse(self, response): """ Parses the first request and request the click event on the confirmation button """ self.driver.get(settings.request_url) while True: try: next_req = self.driver.find_element_by_class_name('submit') yield Request(settings.confirmation_url, callback=self.parse_callback) next_req.click() break except Exception as err: logging.error(err) break # Waiting to close browser... This gives enough time to download the file. time.sleep(settings.sleep_time) downloaded_file = get_download_folder() + '\\' + settings.downloaded_file_name moved_file = settings.destination_path + settings.new_file_name move_file(downloaded_file, moved_file) delete_file(downloaded_file)
def parse(self, response): """ Parses the first request and request the click event on the confirmation button """ self.driver.get(settings.request_url) while True: try: next_req = self.driver.find_element_by_class_name('submit') yield Request(settings.confirmation_url, callback=self.parse_callback) next_req.click() break except Exception as err: logging.error(err) break self.driver.close() # Waiting to close browser... This gives enough time to download the file. time.sleep(settings.sleep_time) downloaded_file = get_download_folder() + '\\' + settings.downloaded_file_name moved_file = settings.destination_path + settings.new_file_name move_file(downloaded_file, moved_file)
def start_requests(self): for cityid, cityname in cityids.items(): url = 'http://wthrcdn.etouch.cn/weather_mini?citykey=%s' % cityid yield Request( url = url, method = 'GET', headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Encoding': 'gzip, deflate', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive', 'Host': 'wthrcdn.etouch.cn', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.11; rv:50.0) Gecko/20100101 ' 'Firefox/50.0', }, meta = { 'cityid': cityid, 'cityname': cityname, }, callback = self.get_sk_2d_weather, )
def parse(self, response): ''' 1.????????????url,???scrapy?????????? 2.??????url???scrapy?????????????parse :param response: :return: ''' #???????????url????scrapy??????? post_nodes = response.css("#archive .floated-thumb .post-thumb a") for post_node in post_nodes: #image_url?????? image_url = post_node.css("img::attr(src)").extract_first("") post_url = post_node.css("::attr(href)").extract_first("") #????meta??????url????????parse.urljoin?????????????response.url??? # ???????response.url?post_url??? yield Request(url=parse.urljoin(response.url,post_url),meta={"front_image_url":parse.urljoin(response.url,image_url)},callback=self.parse_detail) #????????scrapy?? next_url = response.css(".next.page-numbers::attr(href)").extract_first("") if next_url: yield Request(url=next_url,callback=self.parse)
def relations(self,response): self.obj.get(response.url) followees_a = self.obj.find_elements_by_xpath('//a[@class="UserLink-link"]') #pdb.set_trace() #followees_a = response.xpath('//a[@class="UserLink-link"]/@href').extract() followees = [] for one in followees_a: try: one = one.get_attribute('href') followees.append(one.replace('https://www.zhihu.com/people/','')) except: pass followees = list(set(followees)) #pdb.set_trace() response.meta['item']['relations_id']+=followees nextpage_button = response.xpath('//button[@class="Button PaginationButton PaginationButton-next Button--plain"]').extract() if nextpage_button: #pdb.set_trace() nextpage_url = response.url.replace('?page='+str(response.meta['page']),'') + "?page=" + str(response.meta['page']+1) yield Request(nextpage_url,callback=self.relations,meta={'page':response.meta['page']+1,'item':response.meta['item']}) else: yield response.meta['item'] for user in followees: yield Request('https://www.zhihu.com/people/'+user+'/answers',callback=self.parse)
def parse_relation(self,response): json_result = str(response.body,encoding="utf8").replace('false','0').replace('true','1') dict_result = eval(json_result) relations_id = [] for one in dict_result['data']: relations_id.append(one['url_token']) response.meta['item']['relations_id'] = relations_id if response.meta['offset'] == 0: response.meta['item']['relation_type'] = response.meta['relation_type'] else: response.meta['item']['relation_type'] = 'next:' + response.meta['relation_type'] #pdb.set_trace() yield response.meta['item'] for one in response.meta['item']['relations_id']: yield Request('https://www.zhihu.com/api/v4/members/'+one+'?include=locations,employments,industry_category,gender,educations,business,follower_count,following_count,description,badge[?(type=best_answerer)].topics',meta={'user_id':one},callback=self.parse) #pdb.set_trace() if dict_result['paging']['is_end'] == 0: #pdb.set_trace() offset = response.meta['offset'] + 20 next_page = re.findall('(.*offset=)\d+',response.url)[0] #pdb.set_trace() yield Request(next_page + str(offset),callback=self.parse_relation,meta={'item':response.meta['item'],'offset':offset,'relation_type':response.meta['relation_type']})
def parse_answer(self,response): json_result = str(response.body,encoding="utf8").replace('false','0').replace('true','1') dict_result = eval(json_result) for one in dict_result['data']: item = AnswerItem() item['answer_user_id'] = response.meta['answer_user_id'] item['answer_id'] = one['id'] item['question_id'] = one['question']['id'] #pdb.set_trace() item['cretated_time'] = one['created_time'] item['updated_time'] = one['updated_time'] item['voteup_count'] = one['voteup_count'] item['comment_count'] = one['comment_count'] item['content'] = one['content'] yield item if dict_result['paging']['is_end'] == 0: offset = response.meta['offset'] + 20 next_page = re.findall('(.*offset=)\d+',response.url)[0] yield Request(next_page + str(offset),callback=self.parse_answer,meta={'answer_user_id':response.meta['answer_user_id'],'offset':offset})
def parse_question(self,response): list_item = response.xpath('//div[@class="List-item"]') for one in list_item: item = QuestionItem() item['ask_user_id'] = response.meta['ask_user_id'] title = one.xpath('.//div[@class="QuestionItem-title"]') item['title'] = title.xpath('./a/text()').extract()[0] item['question_id'] = title.xpath('./a/@href').extract()[0].replace('/question/','') content_item = one.xpath('.//div[@class="ContentItem-status"]//span/text()').extract() item['ask_time'] = content_item[0] item['answer_count'] = content_item[1] item['followees_count'] = content_item[2] yield item next_page = response.xpath('//button[@class="Button PaginationButton PaginationButton-next Button--plain"]/text()').extract() if next_page: response.meta['page'] += 1 next_url = re.findall('(.*page=)\d+',response.url)[0] + str(response.meta['page']) yield Request(next_url,callback=self.parse_question,meta={'ask_user_id':response.meta['ask_user_id'],'page':response.meta['page']})
def parse_article(self,response): json_result = str(response.body,encoding="utf8").replace('false','0').replace('true','1') dict_result = eval(json_result) for one in dict_result['data']: item = ArticleItem() item['author_id'] = response.meta['author_id'] item['title'] = one['title'] item['article_id'] = one['id'] item['content'] = one['content'] #pdb.set_trace() item['cretated_time'] = one['created'] item['updated_time'] = one['updated'] item['voteup_count'] = one['voteup_count'] item['comment_count'] = one['comment_count'] yield item if dict_result['paging']['is_end'] == 0: offset = response.meta['offset'] + 20 next_page = re.findall('(.*offset=)\d+',response.url)[0] yield Request(next_page + str(offset),callback=self.parse_article,meta={'author_id':response.meta['author_id'],'offset':offset})
def _crawl(self, start_file_path, fake_url, items=None, connector=None): """ :param start_file_path: file path of start file :param fake_url: The fake url for Request :param connector: Connector instance :param items: List of jobs item to use as "job database". Default is empty list :return: list of job items """ if items is None: items = [] if connector is None: connector = SpiderTestConnector(items) request = Request(url=fake_url) start_response = fake_response_from_file( start_file_path, request=request, response_class=HtmlResponse ) self._spider = self._get_prepared_spider()() self._spider.set_connector(connector) return list(self._parse_spider_response(self._spider.parse(start_response)))
def login_verify(self, response): if response.url == self.login_verify_url: self.is_login = True self.login_time = time.mktime(time.strptime(\ response.headers['Date'], \ '%a, %d %b %Y %H:%M:%S %Z')) + (8 * 60 * 60) time.sleep(1) return [FormRequest(self.submit_url, formdata = { 'problem_id': self.problem_id, 'language': LANGUAGE.get(self.language, '0'), 'source': self.source, 'submit': 'Submit', 'encoded': '1' }, callback = self.after_submit, dont_filter = True )] else: return Request(self.start_urls[0], callback=self.parse_start_url)
def parse(self, response): sel = Selector(response) self.item = AccountItem() self.item['oj'] = 'poj' self.item['username'] = self.username if self.is_login: try: self.item['rank'] = sel.xpath('//center/table/tr')[1].\ xpath('.//td/font/text()').extract()[0] self.item['accept'] = sel.xpath('//center/table/tr')[2].\ xpath('.//td/a/text()').extract()[0] self.item['submit'] = sel.xpath('//center/table/tr')[3].\ xpath('.//td/a/text()').extract()[0] yield Request(self.accepted_url % self.username, callback = self.accepted ) self.item['status'] = 'Authentication Success' except: self.item['status'] = 'Unknown Error' else: self.item['status'] = 'Authentication Failed' yield self.item
def accepted(self, response): sel = Selector(response) next_url = sel.xpath('//p/a/@href')[2].extract() table_tr = sel.xpath('//table')[-1].xpath('.//tr')[1:] for tr in table_tr: name = tr.xpath('.//td/a/text()').extract()[0] problem_id = tr.xpath('.//td[3]/a/text()').extract()[0].strip() submit_time = tr.xpath('.//td/text()').extract()[-1] self.solved[problem_id] = submit_time self.item['solved'] = self.solved if table_tr: yield Request('http://' + self.allowed_domains[0] + '/' + next_url, callback = self.accepted ) yield self.item
def parse_search_page(self, response): # handle current page for item in self.parse_tweets_block(response.body): yield item # get next page tmp = self.reScrollCursor.search(response.body) if tmp: query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0] scroll_cursor = tmp.group(1) url = 'https://twitter.com/i/search/timeline?q=%s&' \ 'include_available_features=1&include_entities=1&max_position=%s' % \ (urllib.quote_plus(query), scroll_cursor) yield http.Request(url, callback=self.parse_more_page) # TODO: # get refresh page # tmp = self.reRefreshCursor.search(response.body) # if tmp: # query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0] # refresh_cursor=tmp.group(1)
def parse_user_0(self, response): """ ??????-???????????????? """ user_item = UserItem() selector = Selector(response) text0 = selector.xpath('body/div[@class="u"]/div[@class="tip2"]').extract_first() if text0: num_tweets = re.findall(u'\u5fae\u535a\[(\d+)\]', text0) # ??? num_follows = re.findall(u'\u5173\u6ce8\[(\d+)\]', text0) # ??? num_fans = re.findall(u'\u7c89\u4e1d\[(\d+)\]', text0) # ??? if num_tweets: user_item["ctweets"] = int(num_tweets[0]) if num_follows: user_item["cfollows"] = int(num_follows[0]) if num_fans: user_item["cfans"] = int(num_fans[0]) user_item["_id"] = response.meta["user_id"] url_information1 = "http://weibo.cn/%s/info" % response.meta["user_id"] yield Request(url=url_information1, meta={"item": user_item}, callback=self.parse_user_1)
def parse(self, response): selector = Selector(response) articles = selector.xpath('//ul[@class="article-list thumbnails"]/li') for article in articles: item = Jianshu2Item() url = article.xpath('div/h4/a/@href').extract() likeNum = article.xpath('div/div/span[2]/text()').extract() posturl = 'http://www.jianshu.com'+url[0] if len(likeNum) == 0: item['likeNum'] = 0 else: item['likeNum'] = int(likeNum[0].split(' ')[-1]) request = Request(posturl,callback=self.parse_donate) request.meta['item'] = item yield request next_link = selector.xpath('//*[@id="list-container"]/div[@class="load-more"]/button/@data-url').extract()[0] if next_link: next_link = self.url + str(next_link) yield Request(next_link,callback=self.parse)
def get_chapterurl(self, response): resp = BeautifulSoup(response.text, 'lxml') item = DingdianItem() tds = resp.find('table').find_all('td') category = resp.find('table').find('a').get_text() author = tds[1].get_text() base_url = resp.find( 'p', class_='btnlinks').find( 'a', class_='read')['href'] novel_id = str(base_url)[-6:-1].replace('/', '') serialstatus = tds[2].get_text() serialnumber = tds[4].get_text() item['name'] = str(response.meta['name']).replace('\xa0', '') item['novelurl'] = response.meta['url'] item['category'] = str(category).replace('/', '') item['author'] = str(author).replace('\xa0', '') item['novel_id'] = novel_id item['serialstatus'] = str(serialstatus).replace('\xa0', '') item['serialnumber'] = str(serialnumber).replace('\xa0', '') yield item yield Request(url=base_url, callback=self.get_chapter, meta={'novel_id': novel_id})
def parse_article(self,response): hxs = Selector(response) keyword = response.meta['keyword'] movie_name = hxs.xpath('//*[@id="content"]/h1/span[1]/text()').extract() movie_roles_paths = hxs.xpath('//*[@id="info"]/span[3]/span[2]') movie_roles = [] for movie_roles_path in movie_roles_paths: movie_roles = movie_roles_path.select('.//*[@rel="v:starring"]/text()').extract() movie_classification= hxs.xpath('//span[@property="v:genre"]/text()').extract() douban_item = DoubanItem() douban_item['movie_keyword'] = keyword douban_item['movie_name'] = ''.join(movie_name).strip().replace(',',';').replace('\'','\\\'').replace('\"','\\\"').replace(':',';').replace(' ','') douban_item['movie_roles'] = ';'.join(movie_roles).strip().replace(',',';').replace('\'','\\\'').replace('\"','\\\"').replace(':',';') douban_item['movie_classification'] = ';'.join(movie_classification).strip().replace(',',';').replace('\'','\\\'').replace('\"','\\\"').replace(':',';') article_link = hxs.xpath('//*[@id="review_section"]/div/div/div/h3/a/@href').extract() tmp = "https://movie.douban.com/review/" for item in article_link: if tmp in item: yield Request(item,meta={'item': douban_item},callback=self.parse_item,cookies=[{'name': 'COOKIE_NAME','value': 'VALUE','domain': '.douban.com','path': '/'},])
def parse(self, response): # ?request.content ??? Element items = response.xpath('//form[@name="moderate"]/*/div[@class="spaceborder"]/table/tr') for item in items: url_str = 'http://www.mayattt.com/'+item.xpath('./td[@class="f_title"]/a/@href').extract()[0] title_str = '' date_str = '' try: title_str = item.xpath('./td[@class="f_title"]/a/text()').extract()[0] date_str = item.xpath('./td[@class="f_last"]/span/a/text()').extract()[0] except: self.logger.error('get list page failure!') pass yield Request(url_str, headers=self.headers, callback=self.parseImage, meta={'title': title_str, 'date': date_str}) # ??????? ??url , ??item?
def get_all_category(self, response): self.write_file('%s/category.html' % self.log_dir, response.body) tags = response.xpath('//table/tbody/tr/td/a/@href').extract() for tag in tags: res = tag.split('/') res = res[len(res) - 1] utils.log('tag:%s' % tag) url = response.urljoin(tag) yield Request( url = url, headers = self.headers, dont_filter = True, meta = { 'tag': res, 'download_timeout': 20, # 'is_proxy': False, }, callback = self.get_page_count, errback = self.error_parse )
def get_page_count(self, response): pages = response.xpath('//div[@class="paginator"]/a/text()').extract() page_count = int(pages[len(pages) - 1]) tag = response.meta.get('tag') for i in range(page_count): url = 'https://movie.douban.com/tag/%s?start=%s&type=T' % (tag, i * 20) yield Request( url = url, headers = self.headers, dont_filter = True, meta = { 'tag': tag, 'page': i + 1, 'download_timeout': 20, }, callback = self.get_page, errback = self.error_parse )
def get_page_count(self, response): pages = response.xpath('//div[@class="paginator"]/a/text()').extract() page_count = int(pages[len(pages) - 1]) tag = response.meta.get('tag') for i in range(page_count): url = 'https://book.douban.com/tag/%s?start=%s&type=T' % (tag, i * 20) yield Request( url = url, headers = self.headers, dont_filter = True, meta = { 'tag': tag, 'page': i + 1, 'download_timeout': 20, }, callback = self.get_page, errback = self.error_parse )
def parse(self, response): se=Selector(response) #???????HtmlXPathSelector??? if(re.match("http://desk.zol.com.cn/fengjing/\d+x\d+/\d+.html", response.url)):#??url??????????url???? src=se.xpath("//ul[@class='pic-list2 clearfix']/li")#???ul?????li for i in range(len(src)):#??li?? imgURLs=se.xpath("//ul[@class='pic-list2 clearfix']/li[%d]/a/img/@src"%i).extract() #?????????? titles=se.xpath("//ul[@class='pic-list2 clearfix']/li[%d]/a/img/@title"%i).extract() if imgURLs: realUrl=imgURLs[0].replace("t_s208x130c5","t_s2560x1600c5") #???????????????? file_name=u"%s.jpg"%titles[0] #???????? path=os.path.join("D:\pics",file_name)#??????????????F??pics???? type = sys.getfilesystemencoding() print file_name.encode(type) item=WebcrawlerScrapyItem() #??item??????item??,?????????????item??? item['name']=file_name item['url']=realUrl print item["name"],item["url"] yield item #??item,???????item urllib.urlretrieve(realUrl,path) #?????????????????????????????????????? all_urls=se.xpath("//a/@href").extract()#???????url for url in all_urls: if url.startswith("/fengjing/1920x1080/"):#?????????????? yield Request("http://desk.zol.com.cn"+url,callback=self.parse)
def start_requests(self): #?aims????ID #??????finished?? while self.db.Aims.find_one()!=None: ID_item = self.db.Aims.find_one() self.db.Aims.delete_one({'ID': ID_item['ID']}) print '-----------------------------------------' print ID_item['ID'] print '-----------------------------------------' ID = str(ID_item['ID']) # self.finish_ID.add(ID) #??????finish if self.db.findin_finished(ID_item): print '-----------------------------------------' print 'WARNING: ', ID, ' already finished' print '-----------------------------------------' self.db.Aims.delete_one(ID_item) continue else: # ???? url_information0 = "https://m.weibo.cn/api/container/getIndex?type=uid&value=%s" % ID print url_information0 yield Request(url=url_information0, meta={"ID": ID_item['ID']}, callback=self.parseInformation)
def parseHome(self,response): if len(response.body) > 50: print "###########################" print "Fetch Home Success" print "###########################" infos = json.loads(response.body) if infos.get('cards', ''): cards = infos['cards'] for card in cards: if card['card_type'] == 6: print '=========================================' #????ID?????? ori_ID = re.findall(r'\d+',card['actionlog']['oid'])[0] ori_url = 'https://m.weibo.cn/api/container/getIndex?containerid={ori_id}_-_WEIBO_SECOND_PROFILE_WEIBO_ORI&type=uid&page_type=03&value={value}'.format( ori_id = ori_ID,value=response.meta['ID'] ) print 'ori_ID:',ori_ID yield Request(url=ori_url, meta={'ID': response.meta["ID"],'ori_id': ori_ID, 'owner':response.meta['owner']}, callback=self.parseTweets, dont_filter=True)
def parse_index(self, response): post_nodes = response.css('#warp .list15 li') for post_node in post_nodes: post_url = post_node.css('::attr(href)').extract_first("") url_get = parse.urljoin(response.url, post_url) yield Request(url=url_get, dont_filter=True, callback=self.parse_detail) print(parse.urljoin(response.url, post_url)) next_urls = response.css('#warp .list15 .list_sort > a:nth-child(3) ::attr(href)').extract_first("") if next_urls: next_url = parse.urljoin(response.url, next_urls) last_second_url = response.css('#warp .list15 .list_sort > a:nth-child(2) ::attr(href)').extract_first("") if last_second_url != 'index248.htm': yield Request(url=next_url, dont_filter=True, callback=self.parse_index)
def parse_detail(self, response): content = response.css('#work span::text').extract() reg = "^(http|https|ftp)://.*(.com|.cn|.html|.htm|.asp|.jsp)" url = response.url reg_url_name = ".*?(\d+)" get_url = re.match(reg_url_name, url) if get_url: self.get_name = get_url.group(1) reference_url_list = [] for each_line in content: get_reference_url = re.match(reg, each_line) if get_reference_url: reference_url_list.append(get_reference_url.group(0)) self.count = 0 if reference_url_list: for each_url in reference_url_list: yield Request(url=each_url, dont_filter=True, callback=self.parse_reference) self.count += 1 else: pass
def post_get_playlist(self, response): collection = self.db.playlist result = json.loads(response.body, encoding='utf-8')['result'] # inserted = collection.update({'id': result['id']}, result, upsert=True) # upsert=True??insert or update # logger.info('Update or Insert to playlist database[%s]' % (str(inserted),)) if result['id'] not in self.playlist_id_buffer: collection.insert(result) for song in result['tracks']: artists = [] for detail in song['artists']: artists.append(detail['name']) comment_url = 'http://music.163.com/weapi/v1/resource/comments/%s/?csrf_token=' % (song['commentThreadId'],) # ??FormRequest???POST?????????????? # Request(url, method='POST', body=json.dumps(data)) yield FormRequest(comment_url, formdata=self.post_data, callback=self.parse, meta={'m_id': song['id'], 'm_name': song['name'], 'artists': artists})
def parse_list(self, response): url = response.meta['splash']['args']['url'] pattern = re.compile(r'http://www.mogujie.com/book/\w+/\d+/') if (pattern.match(url)): page = int(pattern.split(url)[1]) url = pattern.findall(url)[0] page += 1 url = url + str(page) else: url = url + '/2' print '+++++++++++++++++++++++++ Next url:', url req = SplashRequest(url = url, callback = self.parse_list) yield req pattern_detail = re.compile(r'http://shop.mogujie.com/detail/.{7}') for item_url in pattern_detail.findall(response.body): req = Request(url = item_url, callback = self.parse_item) yield req
def default(self, o): if isinstance(o, datetime.datetime): return o.strftime("%s %s" % (self.DATE_FORMAT, self.TIME_FORMAT)) elif isinstance(o, datetime.date): return o.strftime(self.DATE_FORMAT) elif isinstance(o, datetime.time): return o.strftime(self.TIME_FORMAT) elif isinstance(o, decimal.Decimal): return str(o) elif isinstance(o, defer.Deferred): return str(o) elif isinstance(o, BaseItem): return dict(o) elif isinstance(o, Request): return "<%s %s %s>" % (type(o).__name__, o.method, o.url) elif isinstance(o, Response): return "<%s %s %s>" % (type(o).__name__, o.status, o.url) elif isinstance(o, Crawler): return o.stats.get_stats() else: return super(ScrapyJSONEncoder, self).default(o)
def get_all_page(self, response): all_page = 0 # ??? current_page = 1 # ???? body = str(response.body) regex_str = ".*?PAGE.pager = ({.*?});.*" pager = re.match(regex_str, body) if pager: pager_data = pager.group(1).replace('\\n', '').replace('\\r', '').replace(" ", "") regex_str = '.*count:"(\d+)".*' all_page = int(re.match(regex_str, pager_data).group(1)) print("all_page :" + str(all_page)) # ????????scrapy???? while current_page <= all_page: url = apiconstants.get_douyu_list_url(current_page) print(url) current_page = current_page + 1 yield Request(url=url, callback=self.parse) print("????")
def get_torrent(self, response): sel = Selector(response) cl_title = sel.xpath('//td[@class="h"]/text()[2]').extract_first() cl_bankuai = sel.xpath('//div[@class="t3"]/table/tr/td/b/a[2]/text()').extract_first() cl_url = response.url torrent = re.search('rmdown\.com(.+?)</a>', response.body) torrent_url = 'http://www.' + torrent.group()[:-4] posted = sel.xpath('//div[@class="tipad"]/text()').extract()[1] posted = posted.encode('utf-8')[9:-7] yield Request( url=torrent_url, meta={ 'cl_title': cl_title, 'cl_bankuai': cl_bankuai, 'cl_url': cl_url, 'posted': posted, }, callback=self.parse_item, dont_filter=True)
def init_request(self): """This function is called before crawling starts.""" # Do not start a request on error, # simply return nothing and quit scrapy if self.abort: return logging.info('All set, start crawling with depth: ' + str(self.max_depth)) # Do a login if self.config['login']['enabled']: # Start with login first logging.info('Login required') return Request(url=self.login_url, callback=self.login) else: # Start with pase function logging.info('Not login required') return Request(url=self.base_url, callback=self.parse) #----------------------------------------------------------------------
def parse_followers(self, response): nametoken = response.meta['nametoken'] api_followees_url = self.base_url + '/api/v4/members/' + response.url.split('/')[-2] + '/followees' api_followers_url = self.base_url + '/api/v4/members/' + response.url.split('/')[-2] + '/followers' yield scrapy.Request(url=api_followees_url, callback=self.parser_follow_json, headers=ZHIHU_HEADER, cookies=ZHIHU_COOKIE, meta={ 'nametoken': nametoken }) yield scrapy.Request(url=api_followers_url, callback=self.parser_follow_json, headers=ZHIHU_HEADER, cookies=ZHIHU_COOKIE, meta={ 'nametoken': nametoken }) # ??json
def parse(self, response): item = DoubanspiderItem() selector = Selector(response) Movies = selector.xpath('//div[@class="info"]') for eachMovie in Movies: title = eachMovie.xpath('div[@class="hd"]/a/span[@class="title"]/text()').extract() movieInfo = eachMovie.xpath('div[@class="bd"]/p/text()').extract() star = eachMovie.xpath('div[@class="bd"]/div[@class="star"]/span[@class="rating_num"]/text()').extract() quote = eachMovie.xpath('div[@class="bd"]/p[@class="quote"]/span/text()').extract() item['title'] = title item['movieInfo'] = ';'.join(movieInfo) item['star'] = star item['quote'] = quote # ??item yield item nextLink = selector.xpath('//span[@class="next"]/link/@href').extract() if nextLink: nextLink = nextLink[0] print(nextLink) yield Request(self.url + nextLink,callback=self.parse)
def parse(self, response): for i in range(10): self.current += 1 if self.current >= self.rule.max_page: break yield Request(self.rule.url_fmt.format(self.current)) if response.status != 200: return None ip_list = response.xpath(self.rule.row_xpath)[1:] for ip_item in ip_list: l = ProxyItemLoader(item=ProxyItem(), selector=ip_item) l.add_xpath('proxy', self.rule.host_xpath) l.add_xpath('proxy', self.rule.port_xpath) l.add_xpath('ip', self.rule.host_xpath) l.add_xpath('port', self.rule.port_xpath) l.add_xpath('addr', self.rule.addr_xpath) l.add_xpath('mode', self.rule.mode_xpath) l.add_xpath('protocol', self.rule.proto_xpath) l.add_xpath('validation_time', self.rule.vt_xpath) l.add_value('src_rule', self.rule.name) yield l.load_item()
def parse_ph_key(self, response): selector = Selector(response) logging.debug('request url:------>' + response.url) # logging.info(selector) divs = selector.xpath('//div[@class="phimage"]') for div in divs: viewkey = re.findall('viewkey=(.*?)"', div.extract()) # logging.debug(viewkey) yield Request(url='https://www.pornhub.com/embed/%s' % viewkey[0], callback=self.parse_ph_info) url_next = selector.xpath( '//a[@class="orangeButton" and text()="Next "]/@href').extract() logging.debug(url_next) if url_next: # if self.test: logging.debug(' next page:---------->' + self.host + url_next[0]) yield Request(url=self.host + url_next[0], callback=self.parse_ph_key) # self.test = False
def parse_articles(self, response): article_ptn = "http://www.theglobeandmail.com/opinion/(.*?)/article(\d+)/" resp_url = response.url article_m = re.match(article_ptn, resp_url) article_id = '' if article_m != None: article_id = article_m.group(2) if article_id == '32753320': print('***URL***', resp_url) soup = BeautifulSoup(response.text, 'html.parser') text = Selector(text=response.text).xpath('//*[@id="content"]/div[1]/article/div/div[3]/div[2]').extract() if text: print("*****in Spider text*****", soup.title.string) yield {article_id: {"title": soup.title.string, "link": resp_url, "article_text": text}} comments_link = response.url + r'comments/' if comments_link == 'http://www.theglobeandmail.com/opinion/a-fascists-win-americas-moral-loss/article32753320/comments/': yield Request(comments_link, callback=self.parse_comments)
def parse_follows(self, response): ''' parse the follows ''' url = response.url _id = url.split('=')[-1] item = response.meta['item'] driver = response.meta['driver'] try: driver.switch_to.default_content() g_iframe = driver.find_elements_by_tag_name('iframe')[0] driver.switch_to.frame(g_iframe) lis = driver.find_elements_by_xpath('//*[@id="main-box"]/li') follows = {} for li in lis: a = li.find_element_by_tag_name('a') title = a.get_attribute('title') href = a.get_attribute('href') uid = href.split('=')[-1] follows[uid] = title item['follows'] = follows except Exception as e: item['follows'] = None print e # driver.close() request = Request(url='http://music.163.com/user/fans?id=' + _id, callback=self.parse_fans) request.meta['item'] = copy.deepcopy(item) yield request # TODO: ??
def parse_fans(self, response): ''' parse the follows ''' url = response.url _id = url.split('=')[-1] item = response.meta['item'] driver = response.meta['driver'] try: driver.switch_to.default_content() g_iframe = driver.find_elements_by_tag_name('iframe')[0] driver.switch_to.frame(g_iframe) lis = driver.find_elements_by_xpath('//*[@id="main-box"]/li') fans = {} for li in lis: a = li.find_element_by_tag_name('a') title = a.get_attribute('title') href = a.get_attribute('href') uid = href.split('=')[-1] fans[uid] = title item['fans'] = fans except Exception as e: item['fans'] = None print e # driver.close() request = Request(url='http://music.163.com/user/songs/rank?id=' + _id, callback=self.parse_songs_rank) request.meta['item'] = copy.deepcopy(item) yield request
def start_requests(self): for u in self.start_urls: yield Request(u,callback=self.parse, errback=self.errback)
def parse(self, response): yield self.parse_item(response) for a in response.css('a::attr(href)').extract(): if not a: continue next_url = response.urljoin(a) yield Request(next_url,callback=self.parse)
def pop(self, timeout=0): """Pop a request""" if timeout > 0: data = self.server.brpop(self.key, timeout=timeout) if isinstance(data, tuple): data = data[1] else: data = self.server.rpop(self.key) if data: cb, url = data.split('--', 1) try: cb = getattr(self.spider, str(cb)) return Request(url=url, callback=cb) except AttributeError: raise ValueError("Method %r not found in: %s" % (cb, self.spider))
def parse(self, response): article_nodes = response.css('#block-content-article .mainer .item a.title') for article_node in article_nodes: article_url = urlparse.urljoin(response.url, str(article_node.css("::attr(href)").extract_first( ""))) # "http://www.acfun.cn" + str(article_node.css("::attr(href)").extract_first("")) yield Request(url=article_url, callback=self.parse_detail, dont_filter=True) next_nodes = response.css(".pager") next_node = next_nodes[len(next_nodes) - 1] next_url = str(next_node.css("::attr(href)").extract_first("")) if next_url: next_url = urlparse.urljoin(response.url, next_url) yield Request(url=next_url, callback=self.parse, dont_filter=True)
def start_requests(self): for i, url in enumerate(self.urls): yield Request( url = url, headers = self.headers, meta = self.meta, dont_filter = True, callback = self.parse_page, errback = self.error_parse, )