我们从Python开源项目中,提取了以下19个代码示例,用于说明如何使用scrapy.http.Response()。
def fake_response_from_file(file_path, request, response_class=Response): """ Create a Scrapy fake HTTP response from a HTML file :param request: :param file_path: Absolute path of source file. :param response_class: returns: A scrapy HTTP response which can be used for unittesting. """ file_content = open(file_path, 'r').read() response = response_class( url=request.url, request=request, body=file_content ) return response
def _get_response(self, args=[], kwargs={}): """ Get response from ``args`` or ``kwargs``. """ # If you're decorating a function without response objects as arguments # or invalid ones, you can set this attribute that has precedence. if hasattr(self, 'response_for_pagination_mixin'): return self.response_for_pagination_mixin total_args = list(args) + list(kwargs.values()) response_objs = [obj for obj in total_args if isinstance(obj, Response)] n_response_objs = len(response_objs) if n_response_objs == 0: raise ValueError('No response could be extracted.') if n_response_objs == 1: return response_objs[0] elif n_response_objs > 1: logging.warning('[-] Detected more than one response. Using the first one.') return response_objs[0]
def default(self, o): if isinstance(o, datetime.datetime): return o.strftime("%s %s" % (self.DATE_FORMAT, self.TIME_FORMAT)) elif isinstance(o, datetime.date): return o.strftime(self.DATE_FORMAT) elif isinstance(o, datetime.time): return o.strftime(self.TIME_FORMAT) elif isinstance(o, decimal.Decimal): return str(o) elif isinstance(o, defer.Deferred): return str(o) elif isinstance(o, BaseItem): return dict(o) elif isinstance(o, Request): return "<%s %s %s>" % (type(o).__name__, o.method, o.url) elif isinstance(o, Response): return "<%s %s %s>" % (type(o).__name__, o.status, o.url) elif isinstance(o, Crawler): return o.stats.get_stats() else: return super(ScrapyJSONEncoder, self).default(o)
def process_response(self, request, response, spider): meta = request.meta # parse CDX requests and schedule future snapshot requests if meta.get('wayback_machine_cdx_request'): snapshot_requests = self.build_snapshot_requests(response, meta) # treat empty listings as 404s if len(snapshot_requests) < 1: return Response(meta['wayback_machine_original_request'].url, status=404) # schedule all of the snapshots for snapshot_request in snapshot_requests: self.crawler.engine.schedule(snapshot_request, spider) # abort this request raise UnhandledIgnoreRequest # clean up snapshot responses if meta.get('wayback_machine_url'): return response.replace(url=meta['wayback_machine_original_request'].url) return response
def test_nosplash(): mw = _get_mw() cookie_mw = _get_cookie_mw() req = scrapy.Request("http://example.com") old_meta = copy.deepcopy(req.meta) assert cookie_mw.process_request(req, None) is None assert mw.process_request(req, None) is None assert old_meta == req.meta # response is not changed response = Response("http://example.com", request=req) response2 = mw.process_response(req, response, None) response3 = cookie_mw.process_response(req, response, None) assert response2 is response assert response3 is response assert response3.url == "http://example.com"
def __init__(self, url, *args, **kwargs): real_url = kwargs.pop('real_url', None) if real_url is not None: self.real_url = real_url else: self.real_url = None # FIXME: create a .request @property with a setter? # Scrapy doesn't pass request to Response constructor; # it is worked around in SplashMiddleware. request = kwargs['request'] splash_args = self._splash_args(request) _url = splash_args.get('url') if _url is not None: self.real_url = url url = _url super(_SplashResponseMixin, self).__init__(url, *args, **kwargs)
def test_parse_content(self): content = requests.get('http://xiaoguotu.to8to.com/topic/11.html') response = Response('http://xiaoguotu.to8to.com/topic/11.html') response.text = content.content.decode("utf-8") selector = Selector(response) title = selector.xpath('//div[@class="xdb_title"]/h1/text()').extract()[0] description = selector.xpath('//div[@class="xdbc_description"]//div//p/text()').extract()[0] items_selector = selector.xpath('//div[@class="xdbc_main_content"]//p') article = [] text = '' for index, item_selector in enumerate(items_selector): try: text = item_selector.xpath('span/text()').extract()[0] except IndexError: try: img_url = item_selector.xpath('img/@src').extract()[0] img_width = 0 try: img_width = item_selector.xpath('img/@width').extract()[0] except IndexError: pass img_height = 0 try: img_height = item_selector.xpath('img/@height').extract()[0] except IndexError: pass article.append({'content': text, 'img_url': img_url, 'img_width': img_width, 'img_height': img_height}) except IndexError: continue design_topic_item = DesignTopicItem() design_topic_item['title'] = title design_topic_item['description'] = description design_topic_item['article'] = article design_topic_item['html_url'] = response.url return design_topic_item
def test_dont_process_response(): mw = _get_mw() req = SplashRequest("http://example.com/", endpoint="render.html", dont_process_response=True, ) req2 = mw.process_request(req, None) resp = Response("http://example.com/") resp2 = mw.process_response(req2, resp, None) assert resp2.__class__ is Response assert resp2 is resp
def from_args(self, headers=None, url=None, filename=None, body=None): """Guess the most appropriate Response class based on the given arguments.""" cls = super(SplashResponseTypes, self).from_args( headers=headers, url=url, filename=filename, body=body ) if cls is Response: cls = scrapy_splash.SplashResponse return cls
def replace(self, *args, **kwargs): """Create a new Response with the same attributes except for those given new values. """ for x in ['url', 'status', 'headers', 'body', 'request', 'flags', 'real_url']: kwargs.setdefault(x, getattr(self, x)) cls = kwargs.pop('cls', self.__class__) return cls(*args, **kwargs)
def get_response(**kwargs): return Response(request.url, request=request, **kwargs)