我们从Python开源项目中,提取了以下20个代码示例,用于说明如何使用twisted.web.client.HTTPConnectionPool()。
def test_cancelGetConnectionCancelsEndpointConnect(self): """ Cancelling the C{Deferred} returned from L{HTTPConnectionPool.getConnection} cancels the C{Deferred} returned by opening a new connection with the given endpoint. """ self.assertEqual(self.pool._connections, {}) connectionResult = Deferred() class Endpoint: def connect(self, factory): return connectionResult d = self.pool.getConnection(12345, Endpoint()) d.cancel() self.assertEqual(self.failureResultOf(connectionResult).type, CancelledError)
def test_nonPersistent(self): """ If C{persistent} is set to C{False} when creating the L{HTTPConnectionPool}, C{Request}s are created with their C{persistent} flag set to C{False}. Elsewhere in the tests for the underlying HTTP code we ensure that this will result in the disconnection of the HTTP protocol once the request is done, so that the connection will not be returned to the pool. """ pool = HTTPConnectionPool(self.reactor, persistent=False) agent = client.Agent(self.reactor, pool=pool) agent._getEndpoint = lambda *args: self agent.request(b"GET", b"http://127.0.0.1") self.assertEqual(self.protocol.requests[0][0].persistent, False)
def test_onlyRetryIdempotentMethods(self): """ Only GET, HEAD, OPTIONS, TRACE, DELETE methods cause a retry. """ pool = client.HTTPConnectionPool(None) connection = client._RetryingHTTP11ClientProtocol(None, pool) self.assertTrue(connection._shouldRetry( b"GET", RequestNotSent(), None)) self.assertTrue(connection._shouldRetry( b"HEAD", RequestNotSent(), None)) self.assertTrue(connection._shouldRetry( b"OPTIONS", RequestNotSent(), None)) self.assertTrue(connection._shouldRetry( b"TRACE", RequestNotSent(), None)) self.assertTrue(connection._shouldRetry( b"DELETE", RequestNotSent(), None)) self.assertFalse(connection._shouldRetry( b"POST", RequestNotSent(), None)) self.assertFalse(connection._shouldRetry( b"MYMETHOD", RequestNotSent(), None)) # This will be covered by a different ticket, since we need support #for resettable body producers: # self.assertTrue(connection._doRetry("PUT", RequestNotSent(), None))
def test_onlyRetryIfNoResponseReceived(self): """ Only L{RequestNotSent}, L{RequestTransmissionFailed} and L{ResponseNeverReceived} exceptions cause a retry. """ pool = client.HTTPConnectionPool(None) connection = client._RetryingHTTP11ClientProtocol(None, pool) self.assertTrue(connection._shouldRetry( b"GET", RequestNotSent(), None)) self.assertTrue(connection._shouldRetry( b"GET", RequestTransmissionFailed([]), None)) self.assertTrue(connection._shouldRetry( b"GET", ResponseNeverReceived([]),None)) self.assertFalse(connection._shouldRetry( b"GET", ResponseFailed([]), None)) self.assertFalse(connection._shouldRetry( b"GET", ConnectionRefusedError(), None))
def test_wrappedOnPersistentReturned(self): """ If L{client.HTTPConnectionPool.getConnection} returns a previously cached connection, it will get wrapped in a L{client._RetryingHTTP11ClientProtocol}. """ pool = client.HTTPConnectionPool(Clock()) # Add a connection to the cache: protocol = StubHTTPProtocol() protocol.makeConnection(StringTransport()) pool._putConnection(123, protocol) # Retrieve it, it should come back wrapped in a # _RetryingHTTP11ClientProtocol: d = pool.getConnection(123, DummyEndpoint()) def gotConnection(connection): self.assertIsInstance(connection, client._RetryingHTTP11ClientProtocol) self.assertIdentical(connection._clientProtocol, protocol) return d.addCallback(gotConnection)
def test_dontRetryIfRetryAutomaticallyFalse(self): """ If L{HTTPConnectionPool.retryAutomatically} is set to C{False}, don't wrap connections with retrying logic. """ pool = client.HTTPConnectionPool(Clock()) pool.retryAutomatically = False # Add a connection to the cache: protocol = StubHTTPProtocol() protocol.makeConnection(StringTransport()) pool._putConnection(123, protocol) # Retrieve it, it should come back unwrapped: d = pool.getConnection(123, DummyEndpoint()) def gotConnection(connection): self.assertIdentical(connection, protocol) return d.addCallback(gotConnection)
def inject_pool_to_treq(params): params["pool"] = HTTPConnectionPool(reactor, params.pop("persistent", True)) for key, default_value in TREQ_POOL_DEFAULT_PARAMS.items(): setattr(params["pool"], key, params.pop(key, default_value))
def __init__(self, reactor, url, pool=None, timeout=None, connect_timeout=None): """ :param rector: Twisted reactor to use. :type reactor: class :param url: etcd URL, eg `http://localhost:2379` :type url: str :param pool: Twisted Web agent connection pool :type pool: :param timeout: If given, a global request timeout used for all requests to etcd. :type timeout: float or None :param connect_timeout: If given, a global connection timeout used when opening a new HTTP connection to etcd. :type connect_timeout: float or None """ if type(url) != six.text_type: raise TypeError('url must be of type unicode, was {}'.format(type(url))) self._url = url self._timeout = timeout self._pool = pool or HTTPConnectionPool(reactor, persistent=True) self._pool._factory.noisy = False self._agent = Agent(reactor, connectTimeout=connect_timeout, pool=self._pool)
def setUp(self): self.fakeReactor = self.createReactor() self.pool = HTTPConnectionPool(self.fakeReactor) self.pool._factory = DummyFactory # The retry code path is tested in HTTPConnectionPoolRetryTests: self.pool.retryAutomatically = False
def test_getReturnsNewIfCacheEmpty(self): """ If there are no cached connections, L{HTTPConnectionPool.getConnection} returns a new connection. """ self.assertEqual(self.pool._connections, {}) def gotConnection(conn): self.assertIsInstance(conn, StubHTTPProtocol) # The new connection is not stored in the pool: self.assertNotIn(conn, self.pool._connections.values()) unknownKey = 12245 d = self.pool.getConnection(unknownKey, DummyEndpoint()) return d.addCallback(gotConnection)
def test_getUsesQuiescentCallback(self): """ When L{HTTPConnectionPool.getConnection} connects, it returns a C{Deferred} that fires with an instance of L{HTTP11ClientProtocol} that has the correct quiescent callback attached. When this callback is called the protocol is returned to the cache correctly, using the right key. """ class StringEndpoint(object): def connect(self, factory): p = factory.buildProtocol(None) p.makeConnection(StringTransport()) return succeed(p) pool = HTTPConnectionPool(self.fakeReactor, True) pool.retryAutomatically = False result = [] key = "a key" pool.getConnection( key, StringEndpoint()).addCallback( result.append) protocol = result[0] self.assertIsInstance(protocol, HTTP11ClientProtocol) # Now that we have protocol instance, lets try to put it back in the # pool: protocol._state = "QUIESCENT" protocol._quiescentCallback(protocol) # If we try to retrive a connection to same destination again, we # should get the same protocol, because it should've been added back # to the pool: result2 = [] pool.getConnection( key, StringEndpoint()).addCallback( result2.append) self.assertIdentical(result2[0], protocol)
def test_closeCachedConnections(self): """ L{HTTPConnectionPool.closeCachedConnections} closes all cached connections and removes them from the cache. It returns a Deferred that fires when they have all lost their connections. """ persistent = [] def addProtocol(scheme, host, port): p = HTTP11ClientProtocol() p.makeConnection(StringTransport()) self.pool._putConnection((scheme, host, port), p) persistent.append(p) addProtocol("http", b"example.com", 80) addProtocol("http", b"www2.example.com", 80) doneDeferred = self.pool.closeCachedConnections() # Connections have begun disconnecting: for p in persistent: self.assertEqual(p.transport.disconnecting, True) self.assertEqual(self.pool._connections, {}) # All timeouts were cancelled and removed: for dc in self.fakeReactor.getDelayedCalls(): self.assertEqual(dc.cancelled, True) self.assertEqual(self.pool._timeouts, {}) # Returned Deferred fires when all connections have been closed: result = [] doneDeferred.addCallback(result.append) self.assertEqual(result, []) persistent[0].connectionLost(Failure(ConnectionDone())) self.assertEqual(result, []) persistent[1].connectionLost(Failure(ConnectionDone())) self.assertEqual(result, [None])
def test_persistent(self): """ If C{persistent} is set to C{True} on the L{HTTPConnectionPool} (the default), C{Request}s are created with their C{persistent} flag set to C{True}. """ pool = HTTPConnectionPool(self.reactor) agent = client.Agent(self.reactor, pool=pool) agent._getEndpoint = lambda *args: self agent.request(b"GET", b"http://127.0.0.1") self.assertEqual(self.protocol.requests[0][0].persistent, True)
def test_endpointFactoryDefaultPool(self): """ If no pool is passed in to L{Agent.usingEndpointFactory}, a default pool is constructed with no persistent connections. """ agent = client.Agent.usingEndpointFactory( self.reactor, StubEndpointFactory()) pool = agent._pool self.assertEqual((pool.__class__, pool.persistent, pool._reactor), (HTTPConnectionPool, False, agent._reactor))
def test_retryIfFailedDueToNonCancelException(self): """ If a request failed with L{ResponseNeverReceived} due to some arbitrary exception, C{_shouldRetry} returns C{True} to indicate the request should be retried. """ pool = client.HTTPConnectionPool(None) connection = client._RetryingHTTP11ClientProtocol(None, pool) self.assertTrue(connection._shouldRetry( b"GET", ResponseNeverReceived([Failure(Exception())]), None))
def test_notWrappedOnNewReturned(self): """ If L{client.HTTPConnectionPool.getConnection} returns a new connection, it will be returned as is. """ pool = client.HTTPConnectionPool(None) d = pool.getConnection(123, DummyEndpoint()) def gotConnection(connection): # Don't want to use isinstance since potentially the wrapper might # subclass it at some point: self.assertIdentical(connection.__class__, HTTP11ClientProtocol) return d.addCallback(gotConnection)
def test_onlyRetryWithoutBody(self): """ L{_RetryingHTTP11ClientProtocol} only retries queries that don't have a body. This is an implementation restriction; if the restriction is fixed, this test should be removed and PUT added to list of methods that support retries. """ pool = client.HTTPConnectionPool(None) connection = client._RetryingHTTP11ClientProtocol(None, pool) self.assertTrue(connection._shouldRetry(b"GET", RequestNotSent(), None)) self.assertFalse(connection._shouldRetry(b"GET", RequestNotSent(), object()))
def __init__(self, *args, **kw): super(TrueHeadersAgent, self).__init__(*args, **kw) self._pool = HTTPConnectionPool(reactor, False)
def _default_client(jws_client, reactor, key, alg): """ Make a client if we didn't get one. """ if jws_client is None: pool = HTTPConnectionPool(reactor) agent = Agent(reactor, pool=pool) jws_client = JWSClient(HTTPClient(agent=agent), key, alg) return jws_client
def _get_agent(): context_factory = MyWebClientContextFactory() try: # HTTPConnectionPool has been present since Twisted version 12.1 from twisted.web.client import HTTPConnectionPool pool = HTTPConnectionPool(reactor, persistent=True) pool.maxPersistentPerHost = _MAX_PERSISTENT_PER_HOST pool.cachedConnectionTimeout = _CACHED_CONNECTION_TIMEOUT agent = Agent(reactor, context_factory, connectTimeout=_CONNECT_TIMEOUT, pool=pool) except ImportError: from _zenclient import ZenAgent agent = ZenAgent(reactor, context_factory, persistent=True, maxConnectionsPerHostName=1) return agent