我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用twisted.web.client.downloadPage()。
def loadThumbnail(self, entry): thumbnailUrl = None if entry.has_key("screenshot"): thumbnailUrl = entry["screenshot"] if self.language == "de": if thumbnailUrl[-7:] == "_en.jpg": thumbnailUrl = thumbnailUrl[:-7] + "_de.jpg" if thumbnailUrl is not None: self.thumbnail = "/tmp/" + thumbnailUrl.split('/')[-1] print "[PluginDetails] downloading screenshot " + thumbnailUrl + " to " + self.thumbnail if iSoftwareTools.NetworkConnectionAvailable: client.downloadPage(thumbnailUrl,self.thumbnail).addCallback(self.setThumbnail).addErrback(self.fetchFailed) else: self.setThumbnail(noScreenshot = True) else: self.setThumbnail(noScreenshot = True)
def maybe_download_ca_cert(self, ignored, replace=False): """ :rtype: deferred """ # TODO: doesn't update the cert :(((( enc_domain = self._domain.encode(sys.getfilesystemencoding()) path = os.path.join(self._basedir, 'providers', enc_domain, 'keys', 'ca', 'cacert.pem') if not replace and is_file(path): return defer.succeed('ca_cert_path_already_exists') def errback(failure): raise NetworkError(failure.getErrorMessage()) uri = self._get_ca_cert_uri() mkdir_p(os.path.split(path)[0]) # We don't validate the TLS cert for this connection, # just check the fingerprint of the ca.cert d = downloadPage(uri, path) d.addCallback(self._reload_http_client) d.addErrback(errback) return d
def loadThumbnail(self, entry): thumbnailUrl = None if "screenshot" in entry: thumbnailUrl = entry["screenshot"] if self.language == "de": if thumbnailUrl[-7:] == "_en.jpg": thumbnailUrl = thumbnailUrl[:-7] + "_de.jpg" if thumbnailUrl is not None: self.thumbnail = "/tmp/" + thumbnailUrl.split('/')[-1] print "[PluginDetails] downloading screenshot " + thumbnailUrl + " to " + self.thumbnail if iSoftwareTools.NetworkConnectionAvailable: client.downloadPage(thumbnailUrl,self.thumbnail).addCallback(self.setThumbnail).addErrback(self.fetchFailed) else: self.setThumbnail(noScreenshot = True) else: self.setThumbnail(noScreenshot = True)
def test_downloadPageBrokenDownload(self): """ If the connection is closed before the number of bytes indicated by I{Content-Length} have been received, the L{Deferred} returned by L{downloadPage} fails with L{PartialDownloadError}. """ # test what happens when download gets disconnected in the middle path = FilePath(self.mktemp()) d = client.downloadPage(self.getURL("broken"), path.path) d = self.assertFailure(d, client.PartialDownloadError) def checkResponse(response): """ The HTTP status code from the server is propagated through the C{PartialDownloadError}. """ self.assertEqual(response.status, b"200") self.assertEqual(response.message, b"OK") return response d.addCallback(checkResponse) def cbFailed(ignored): self.assertEqual(path.getContent(), b"abc") d.addCallback(cbFailed) return d
def test_downloadPageLogsFileCloseError(self): """ If there is an exception closing the file being written to after the connection is prematurely closed, that exception is logged. """ class BrokenFile: def write(self, bytes): pass def close(self): raise IOError(ENOSPC, "No file left on device") d = client.downloadPage(self.getURL("broken"), BrokenFile()) d = self.assertFailure(d, client.PartialDownloadError) def cbFailed(ignored): self.assertEqual(len(self.flushLoggedErrors(IOError)), 1) d.addCallback(cbFailed) return d
def test_downloadAfterFoundGet(self): """ Passing C{True} for C{afterFoundGet} to L{client.downloadPage} invokes the same kind of redirect handling as passing that argument to L{client.getPage} invokes. """ url = self.getURL('extendedRedirect?code=302') def gotPage(page): self.assertEqual( self.extendedRedirect.lastMethod, b"GET", "With afterFoundGet, the HTTP method must change to GET") d = client.downloadPage(url, "downloadTemp", followRedirect=True, afterFoundGet=True, method=b"POST") d.addCallback(gotPage) return d
def test_downloadTimeout(self): """ If the timeout indicated by the C{timeout} parameter to L{client.HTTPDownloader.__init__} elapses without the complete response being received, the L{defer.Deferred} returned by L{client.downloadPage} fires with a L{Failure} wrapping a L{defer.TimeoutError}. """ self.cleanupServerConnections = 2 # Verify the behavior if no bytes are ever written. first = client.downloadPage( self.getURL("wait"), self.mktemp(), timeout=0.01) # Verify the behavior if some bytes are written but then the request # never completes. second = client.downloadPage( self.getURL("write-then-wait"), self.mktemp(), timeout=0.01) return defer.gatherResults([ self.assertFailure(first, defer.TimeoutError), self.assertFailure(second, defer.TimeoutError)])
def test_downloadTimeoutsWorkWithoutReading(self): """ If the timeout indicated by the C{timeout} parameter to L{client.HTTPDownloader.__init__} elapses without the complete response being received, the L{defer.Deferred} returned by L{client.downloadPage} fires with a L{Failure} wrapping a L{defer.TimeoutError}, even if the remote peer isn't reading data from the socket. """ self.cleanupServerConnections = 1 # The timeout here needs to be slightly longer to give the resource a # change to stop the reading. d = client.downloadPage( self.getURL("never-read"), self.mktemp(), timeout=0.05) return self.assertFailure(d, defer.TimeoutError)
def test_downloadPageDeprecated(self): """ L{client.downloadPage} is deprecated. """ port = reactor.listenTCP( 0, server.Site(Data(b'', 'text/plain')), interface="127.0.0.1") portno = port.getHost().port self.addCleanup(port.stopListening) url = networkString("http://127.0.0.1:%d" % (portno,)) path = FilePath(self.mktemp()) d = client.downloadPage(url, path.path) warningInfo = self.flushWarnings([self.test_downloadPageDeprecated]) self.assertEqual(len(warningInfo), 1) self.assertEqual(warningInfo[0]['category'], DeprecationWarning) self.assertEqual( warningInfo[0]['message'], "twisted.web.client.downloadPage was deprecated in " "Twisted 16.7.0; please use https://pypi.org/project/treq/ or twisted.web.client.Agent instead") return d.addErrback(lambda _: None)
def downloadThumbnails(self): self.timer_startDownload.stop() for entry in self.screenshotList: thumbnailUrl = entry[1] tubeid = entry[0] thumbnailFile = "/tmp/"+str(tubeid)+".jpg" if self.Details.has_key(tubeid): if self.Details[tubeid]["thumbnail"] is None: if thumbnailUrl is not None: if tubeid not in self.pixmaps_to_load: self.pixmaps_to_load.append(tubeid) if (os_path.exists(thumbnailFile) == True): self.fetchFinished(False,tubeid) else: client.downloadPage(thumbnailUrl,thumbnailFile).addCallback(self.fetchFinished,str(tubeid)).addErrback(self.fetchFailed,str(tubeid)) else: if tubeid not in self.pixmaps_to_load: self.pixmaps_to_load.append(tubeid) self.fetchFinished(False,tubeid, failed = True)
def loadPreviewpics(self): self.thumbnails = [] self.mythumbubeentries = None self.index = 0 self.maxentries = 0 self.picloads = {} self.mythumbubeentries = [self.video.thumbnailUrl] self.maxentries = len(self.mythumbubeentries)-1 if self.mythumbubeentries: currindex = 0 for entry in self.mythumbubeentries: thumbID = self.video.id + str(currindex) thumbnailFile = "/tmp/" + thumbID + ".jpg" currPic = [currindex,thumbID,thumbnailFile,None] self.thumbnails.append(currPic) thumbnailUrl = None thumbnailUrl = entry if thumbnailUrl is not None: client.downloadPage(thumbnailUrl,thumbnailFile).addCallback(self.fetchFinished,currindex,thumbID).addErrback(self.fetchFailed,currindex,thumbID) currindex +=1 else: pass
def testDownloadPage(self): downloads = [] downloadData = [("file", self.mktemp(), "0123456789"), ("nolength", self.mktemp(), "nolength")] for (url, name, data) in downloadData: d = client.downloadPage(self.getURL(url), name) d.addCallback(self._cbDownloadPageTest, data, name) downloads.append(d) return defer.gatherResults(downloads)
def testDownloadPageError1(self): class errorfile: def write(self, data): raise IOError, "badness happened during write" def close(self): pass ef = errorfile() return self.assertFailure( client.downloadPage(self.getURL("file"), ef), IOError)
def testDownloadPageError2(self): class errorfile: def write(self, data): pass def close(self): raise IOError, "badness happened during close" ef = errorfile() return self.assertFailure( client.downloadPage(self.getURL("file"), ef), IOError)
def testDownloadPageError3(self): # make sure failures in open() are caught too. This is tricky. # Might only work on posix. tmpfile = open("unwritable", "wb") tmpfile.close() os.chmod("unwritable", 0) # make it unwritable (to us) d = self.assertFailure( client.downloadPage(self.getURL("file"), "unwritable"), IOError) d.addBoth(self._cleanupDownloadPageError3) return d
def testDownloadServerError(self): return self._downloadTest(lambda url: client.downloadPage(self.getURL(url), url.split('?')[0]))
def async_url_open(url, timeout=0, **kwargs): if url.startswith('http'): page = NamedTemporaryFile(delete=False) new_url = page.name yield downloadPage(encode(url), page, timeout=timeout) else: page, new_url = None, url f = yield async_get_file(new_url, StringTransport(), **kwargs) if page: page.close() remove(page.name) return_value(f)
def testDownloadPage(self): downloads = [] downloadData = [("file", self.mktemp(), b"0123456789"), ("nolength", self.mktemp(), b"nolength")] for (url, name, data) in downloadData: d = client.downloadPage(self.getURL(url), name) d.addCallback(self._cbDownloadPageTest, data, name) downloads.append(d) return defer.gatherResults(downloads)
def testDownloadPageError1(self): class errorfile: def write(self, data): raise IOError("badness happened during write") def close(self): pass ef = errorfile() return self.assertFailure( client.downloadPage(self.getURL("file"), ef), IOError)
def testDownloadPageError2(self): class errorfile: def write(self, data): pass def close(self): raise IOError("badness happened during close") ef = errorfile() return self.assertFailure( client.downloadPage(self.getURL("file"), ef), IOError)
def _download_page(self, url, path): # client.downloadPage does not support cookies! def _check(x): logging.debug("Received segment of %r bytes." % len(x)) return x d = self._get_page(url) d.addCallback(_check) return d return d
def download(item): return downloadPage(item.url, file(item.filename, 'wb'))