我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用twisted.web.client.HTTPClientFactory()。
def getPage(url, contextFactory=None, *args, **kwargs): scheme, host, port, path, username, password = _parse(url) if username and password: url = scheme + '://' + host + ':' + str(port) + path basicAuth = encodestring("%s:%s" % (username, password)) authHeader = "Basic " + basicAuth.strip() AuthHeaders = {"Authorization": authHeader} if kwargs.has_key("headers"): kwargs["headers"].update(AuthHeaders) else: kwargs["headers"] = AuthHeaders factory = HTTPClientFactory(url, *args, **kwargs) reactor.connectTCP(host, port, factory) return factory.deferred
def getPage(url, contextFactory=None, *args, **kwargs): scheme, host, port, path, username, password = _parse(url) if username and password: url = scheme + '://' + host + ':' + str(port) + path basicAuth = encodestring("%s:%s" % (username, password)) authHeader = "Basic " + basicAuth.strip() AuthHeaders = {"Authorization": authHeader} if kwargs.has_key("headers"): kwargs["headers"].update(AuthHeaders) else: kwargs["headers"] = AuthHeaders factory = HTTPClientFactory(url, *args, **kwargs) reactor.connectTCP(host, port, factory) return factory.deferred #############################################################
def getPagePrxoy(url, proxy=None, contextFactory=None, *args, **kwargs): ''' proxy= { host:192.168.1.111, port:6666 } ''' kwargs["timeout"] = 60 if proxy is None: scheme, host, port, path = client._parse(url) factory = client.HTTPClientFactory(url, *args, **kwargs) if scheme == b'https': from twisted.internet import ssl if contextFactory is None: contextFactory = ssl.ClientContextFactory() reactor.connectSSL(client.nativeString(host), port, factory, contextFactory) else: reactor.connectTCP(client.nativeString(host), port, factory) return factory.deferred else: factory = client.HTTPClientFactory(url, *args, **kwargs) reactor.connectTCP(proxy["host"], proxy["port"], factory) return factory.deferred
def test_infiniteRedirection(self): """ When more than C{redirectLimit} HTTP redirects are encountered, the page request fails with L{InfiniteRedirection}. """ def checkRedirectCount(*a): self.assertEqual(f._redirectCount, 13) self.assertEqual(self.infiniteRedirectResource.count, 13) f = client._makeGetterFactory( self.getURL('infiniteRedirect'), client.HTTPClientFactory, redirectLimit=13) d = self.assertFailure(f.deferred, error.InfiniteRedirection) d.addCallback(checkRedirectCount) return d
def test_afterFoundGet(self): """ Enabling unsafe redirection behaviour overwrites the method of redirected C{POST} requests with C{GET}. """ url = self.getURL('extendedRedirect?code=302') f = client.HTTPClientFactory(url, followRedirect=True, method=b"POST") self.assertFalse( f.afterFoundGet, "By default, afterFoundGet must be disabled") def gotPage(page): self.assertEqual( self.extendedRedirect.lastMethod, b"GET", "With afterFoundGet, the HTTP method must change to GET") d = client.getPage( url, followRedirect=True, afterFoundGet=True, method=b"POST") d.addCallback(gotPage) return d
def testFactoryInfo(self): url = self.getURL('file') scheme, host, port, path = client._parse(url) factory = client.HTTPClientFactory(url) reactor.connectTCP(host, port, factory) return factory.deferred.addCallback(self._cbFactoryInfo, factory)
def testFactoryInfo(self): url = self.getURL('file') scheme, host, port, path = client._parse(url) factory = client.HTTPClientFactory(url) reactor.connectSSL(host, port, factory, ssl.ClientContextFactory()) # The base class defines _cbFactoryInfo correctly for this return factory.deferred.addCallback(self._cbFactoryInfo, factory)
def testCookieHeaderParsing(self): d = defer.Deferred() factory = client.HTTPClientFactory('http://foo.example.com/') proto = factory.buildProtocol('127.42.42.42') proto.transport = FakeTransport() proto.connectionMade() for line in [ '200 Ok', 'Squash: yes', 'Hands: stolen', 'Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/; expires=Wednesday, 09-Nov-99 23:12:40 GMT', 'Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/', 'Set-Cookie: SHIPPING=FEDEX; path=/foo', '', 'body', 'more body', ]: proto.dataReceived(line + '\r\n') self.assertEquals(proto.transport.data, ['GET / HTTP/1.0\r\n', 'Host: foo.example.com\r\n', 'User-Agent: Twisted PageGetter\r\n', '\r\n']) self.assertEquals(factory.cookies, { 'CUSTOMER': 'WILE_E_COYOTE', 'PART_NUMBER': 'ROCKET_LAUNCHER_0001', 'SHIPPING': 'FEDEX', })
def _getPage(self): factory = client.HTTPClientFactory(self.proxyHost, self.url) factory.headers = {'pragma': 'no-cache'} reactor.connectTCP(self.proxyHost, self.proxyPort, factory) d = factory.deferred d.addErrback(self.noPage) d.addCallback(self.page)
def getPage(url, contextFactory=None, *args, **kwargs): log.msg('Method: %s' % kwargs.get('method', 'GET')) log.msg('URI: %s' % url) try: log.msg('Headers: %r' % kwargs['headers']) except KeyError: pass try: log.msg('Payload: %r' % kwargs['postdata']) except KeyError: pass scheme, host, port, path = client._parse(url) factory = HTTPClientFactory(url, *args, **kwargs) if scheme == 'https': from twisted.internet import ssl if contextFactory is None: contextFactory = ssl.ClientContextFactory() reactor.connectSSL(host, port, factory, contextFactory) else: reactor.connectTCP(host, port, factory) def _eb(failure): log.msg('Failed.') log.msg(failure) return failure return factory.deferred.addCallback(_checkCacheControl).addErrback(_eb)
def test_earlyHeaders(self): """ When a connection is made, L{HTTPPagerGetter} sends the headers from its factory's C{headers} dict. If I{Host} or I{Content-Length} is present in this dict, the values are not sent, since they are sent with special values before the C{headers} dict is processed. If I{User-Agent} is present in the dict, it overrides the value of the C{agent} attribute of the factory. If I{Cookie} is present in the dict, its value is added to the values from the factory's C{cookies} attribute. """ factory = client.HTTPClientFactory( b'http://foo/bar', agent=b"foobar", cookies={b'baz': b'quux'}, postdata=b"some data", headers={ b'Host': b'example.net', b'User-Agent': b'fooble', b'Cookie': b'blah blah', b'Content-Length': b'12981', b'Useful': b'value'}) transport = StringTransport() protocol = client.HTTPPageGetter() protocol.factory = factory protocol.makeConnection(transport) result = transport.value() for expectedHeader in [ b"Host: example.net\r\n", b"User-Agent: foobar\r\n", b"Content-Length: 9\r\n", b"Useful: value\r\n", b"connection: close\r\n", b"Cookie: blah blah; baz=quux\r\n"]: self.assertIn(expectedHeader, result)
def testFactoryInfo(self): url = self.getURL('file') uri = client.URI.fromBytes(url) factory = client.HTTPClientFactory(url) reactor.connectTCP(nativeString(uri.host), uri.port, factory) return factory.deferred.addCallback(self._cbFactoryInfo, factory)
def test_setURL(self): """ L{client.HTTPClientFactory.setURL} alters the scheme, host, port and path for absolute URLs. """ url = b'http://example.com' f = client.HTTPClientFactory(url) self.assertEqual( (url, b'http', b'example.com', 80, b'/'), (f.url, f.scheme, f.host, f.port, f.path))
def test_setURLRelativePath(self): """ L{client.HTTPClientFactory.setURL} alters the path in a relative URL. """ f = client.HTTPClientFactory(b'http://example.com') url = b'/hello' f.setURL(url) self.assertEqual( (url, b'http', b'example.com', 80, b'/hello'), (f.url, f.scheme, f.host, f.port, f.path))
def testFactoryInfo(self): url = self.getURL('file') uri = client.URI.fromBytes(url) factory = client.HTTPClientFactory(url) reactor.connectSSL(nativeString(uri.host), uri.port, factory, ssl.ClientContextFactory()) # The base class defines _cbFactoryInfo correctly for this return factory.deferred.addCallback(self._cbFactoryInfo, factory)
def testCookieHeaderParsing(self): factory = client.HTTPClientFactory(b'http://foo.example.com/') proto = factory.buildProtocol('127.42.42.42') transport = StringTransport() proto.makeConnection(transport) for line in [ b'200 Ok', b'Squash: yes', b'Hands: stolen', b'Set-Cookie: CUSTOMER=WILE_E_COYOTE; path=/; expires=Wednesday, 09-Nov-99 23:12:40 GMT', b'Set-Cookie: PART_NUMBER=ROCKET_LAUNCHER_0001; path=/', b'Set-Cookie: SHIPPING=FEDEX; path=/foo', b'', b'body', b'more body', ]: proto.dataReceived(line + b'\r\n') self.assertEqual(transport.value(), b'GET / HTTP/1.0\r\n' b'Host: foo.example.com\r\n' b'User-Agent: Twisted PageGetter\r\n' b'\r\n') self.assertEqual(factory.cookies, { b'CUSTOMER': b'WILE_E_COYOTE', b'PART_NUMBER': b'ROCKET_LAUNCHER_0001', b'SHIPPING': b'FEDEX', })
def test_HTTPDefaultPort(self): """ No port should be included in the host header when connecting to the default HTTP port. """ factory = client.HTTPClientFactory(b'http://foo.example.com/') proto = factory.buildProtocol(b'127.42.42.42') proto.makeConnection(StringTransport()) self.assertEqual(self._getHost(proto.transport.value()), b'foo.example.com')
def test_HTTPPort80(self): """ No port should be included in the host header when connecting to the default HTTP port even if it is in the URL. """ factory = client.HTTPClientFactory(b'http://foo.example.com:80/') proto = factory.buildProtocol('127.42.42.42') proto.makeConnection(StringTransport()) self.assertEqual(self._getHost(proto.transport.value()), b'foo.example.com')
def test_HTTPSDefaultPort(self): """ No port should be included in the host header when connecting to the default HTTPS port. """ factory = client.HTTPClientFactory(b'https://foo.example.com/') proto = factory.buildProtocol('127.42.42.42') proto.makeConnection(StringTransport()) self.assertEqual(self._getHost(proto.transport.value()), b'foo.example.com')
def test_HTTPSPort443(self): """ No port should be included in the host header when connecting to the default HTTPS port even if it is in the URL. """ factory = client.HTTPClientFactory(b'https://foo.example.com:443/') proto = factory.buildProtocol('127.42.42.42') proto.makeConnection(StringTransport()) self.assertEqual(self._getHost(proto.transport.value()), b'foo.example.com')
def test_HTTPSNotPort443(self): """ The port should be included in the host header when connecting to the a non default HTTPS port. """ factory = client.HTTPClientFactory(b'http://foo.example.com:8080/') proto = factory.buildProtocol('127.42.42.42') proto.makeConnection(StringTransport()) self.assertEqual(self._getHost(proto.transport.value()), b'foo.example.com:8080')
def test_httpClientFactoryDeprecated(self): """ L{client.HTTPClientFactory} is deprecated. """ self._testDeprecatedClass("HTTPClientFactory")
def get_page(self, contextFactory=None, description=None, *args, **kwargs): if description is None: description = self.url scheme, _, _, _ = self.url_parse(self.url) factory = txwebclient.HTTPClientFactory(self.url, *args, **kwargs) if scheme == 'https': from twisted.internet import ssl if contextFactory is None: contextFactory = ssl.ClientContextFactory() if self.use_proxy: reactor.connectSSL(self.proxy_host, self.proxy_port, factory, contextFactory) else: reactor.connectSSL(self.host, self.port, factory, contextFactory) else: if self.use_proxy: reactor.connectTCP(self.proxy_host, self.proxy_port, factory) else: reactor.connectTCP(self.host, self.port, factory) if self.return_headers: return factory.deferred.addCallback( lambda page: (page, factory.response_headers)) else: return factory.deferred