我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.web.client.Agent()。
def twisted_coroutine_fetch(self, url, runner): body = [None] @gen.coroutine def f(): # This is simpler than the non-coroutine version, but it cheats # by reading the body in one blob instead of streaming it with # a Protocol. client = Agent(self.reactor) response = yield client.request(b'GET', utf8(url)) with warnings.catch_warnings(): # readBody has a buggy DeprecationWarning in Twisted 15.0: # https://twistedmatrix.com/trac/changeset/43379 warnings.simplefilter('ignore', category=DeprecationWarning) body[0] = yield readBody(response) self.stop_loop() self.io_loop.add_callback(f) runner() return body[0]
def get_request(self, path, ccb=None, ecb=None): self.completion_callback = self._completion_default if ccb is None \ else ccb self.error_callback = self._error_default if ecb is None \ else ecb self.request_path = path d = self.agent.request( 'GET', path, Headers({'User-Agent': ['sawtooth stats collector']}), None) d.addCallback(self._handle_request) d.addErrback(self._handle_error) return d
def request_validate(self, url, user, realm, password): """ Issue an HTTP request to authenticate an user with a password in a given realm using the specified privacyIDEA /validate/check endpoint. :param url: an HTTP or HTTPS url to the /validate/check endpoint :param user: username to authenticate :param realm: realm of the user, empty string for default realm :param password: password for authentication :return: A Twisted Deferred which yields a `twisted.web.client.Response` instance or fails. """ body = urllib.urlencode({'user': user, 'realm': realm, 'pass': password}) # TODO: Is this really the preferred way to pass a string body? producer = FileBodyProducer(StringIO(body)) d = self.factory.agent.request('POST', url, Headers({ 'Content-Type': ['application/x-www-form-urlencoded'], 'User-Agent': ['privacyIDEA-LDAP-Proxy'] }), producer) return d
def __init__(self, domain, username, pw, server, use_ssl, policy_key=0, server_version="14.0", device_type="iPhone", device_id=None, verbose=False): self.use_ssl = use_ssl self.domain = domain self.username = username self.password = pw self.server = server self.device_id = device_id if not self.device_id: self.device_id = str(uuid.uuid4()).replace("-","")[:32] self.server_version = server_version self.device_type = device_type self.policy_key = policy_key self.folder_data = {} self.verbose = verbose self.collection_data = {} clientContext = WebClientContextFactory() self.agent = Agent(reactor, clientContext) self.operation_queue = defer.DeferredQueue() self.queue_deferred = self.operation_queue.get() self.queue_deferred.addCallback(self.queue_full) # Response processing
def provision(self): prov_url = self.add_parameters(self.get_url(), {"Cmd":"Provision", "User":self.username, "DeviceId":self.device_id, "DeviceType":self.device_type}) d = self.agent.request( 'POST', prov_url, Headers({'User-Agent': ['python-EAS-Client %s'%version], 'Authorization': [self.authorization_header()], 'MS-ASProtocolVersion': [self.server_version], 'X-MS-PolicyKey': [str(self.policy_key)], 'Content-Type': ["application/vnd.ms-sync.wbxml"]}), ProvisionProducer(verbose=self.verbose)) d.addCallback(self.wbxml_response) d.addCallback(self.process_policy_key) d.addCallback(self.acknowledge) d.addErrback(self.activesync_error) return d
def folder_sync(self, sync_key=0): if sync_key == 0 and "key" in self.folder_data: sync_key = self.folder_data["key"] sync_url = self.add_parameters(self.get_url(), {"Cmd":"FolderSync", "User":self.username, "DeviceId":self.device_id, "DeviceType":self.device_type}) d = self.agent.request( 'POST', sync_url, Headers({'User-Agent': ['python-EAS-Client %s'%version], 'Authorization': [self.authorization_header()], 'MS-ASProtocolVersion': [self.server_version], 'X-MS-PolicyKey': [str(self.policy_key)], 'Content-Type': ["application/vnd.ms-sync.wbxml"]}), FolderSyncProducer(sync_key, verbose=self.verbose)) d.addCallback(self.wbxml_response) d.addCallback(self.process_folder_sync) d.addErrback(self.activesync_error) return d
def sync(self, collectionId, sync_key=0, get_body=False): if sync_key == 0 and collectionId in self.collection_data: sync_key = self.collection_data[collectionId]["key"] sync_url = self.add_parameters(self.get_url(), {"Cmd":"Sync", "User":self.username, "DeviceId":self.device_id, "DeviceType":self.device_type}) d = self.agent.request( 'POST', sync_url, Headers({'User-Agent': ['python-EAS-Client %s'%version], 'Authorization': [self.authorization_header()], 'MS-ASProtocolVersion': [self.server_version], 'X-MS-PolicyKey': [str(self.policy_key)], 'Content-Type': ["application/vnd.ms-sync.wbxml"]}), SyncProducer(collectionId, sync_key, get_body, verbose=self.verbose)) d.addCallback(self.wbxml_response) d.addCallback(self.process_sync, collectionId) d.addErrback(self.activesync_error) return d
def __init__(self): parlay.ParlayCommandItem.__init__(self, "parlay.items.cloud_link", "Cloud Link") self._http_agent = Agent(self._reactor) self.channel_uri = None self.cloud_factory = None if CloudLinkSettings.PRIVATE_KEY_LOCATION is None: raise RuntimeError("CloudLinkSettings.PRIVATE_KEY_LOCATION must be set for cloud to work") if CloudLinkSettings.UUID_LOCATION is None: raise RuntimeError("CloudLinkSettings.UUID_LOCATION must be set for cloud to work") try: with open(CloudLinkSettings.UUID_LOCATION, 'r') as uuid_file: self.uuid = uuid_file.read() except IOError: logger.warn("Error reading UUID file. Has this device been registered?") self.uuid = ""
def http_request(self, path="/", encode='zlib'): """ Do an http request on the broker :type path str """ url = "http://localhost:" + str(Broker.get_instance().http_port)+path print url # http"://localhost:broker.http_port request = self._http_agent.request( 'GET', url, Headers({'User-Agent': ['Twisted Web Client']}), None) request.addCallback(lambda response: readBody(response)) request.addCallback(lambda html: (base64.b64encode(html.encode(encode))) if encode == "zlib" else html) return request
def testValidOptionsRequest(self): """ Makes sure that a "regular" OPTIONS request doesn't include the CORS specific headers in the response. """ agent = Agent(reactor) headers = Headers({'origin': ['http://localhost']}) response = yield agent.request('OPTIONS', self.uri, headers) # Check we get the correct status. self.assertEqual(http.OK, response.code) # Check we get the correct length self.assertEqual(0, response.length) # Check we get the right headers back self.assertTrue(response.headers.hasHeader('Allow')) self.assertFalse( response.headers.hasHeader('Access-Control-Allow-Origin')) self.assertFalse(response.headers.hasHeader('Access-Control-Max-Age')) self.assertFalse( response.headers.hasHeader('Access-Control-Allow-Credentials')) self.assertFalse( response.headers.hasHeader('Access-Control-Allow-Methods'))
def testViaAgent(self): """ This is a manual check of a POST to /objects which uses L{twisted.web.client.Agent} to make the request. We do not use txFluidDB because we need to check that a Location header is received and that we receive both a 'URI' and an 'id' in the JSON response payload. """ URI = self.txEndpoint.getRootURL() + defaults.httpObjectCategoryName basicAuth = 'Basic %s' % b64encode('%s:%s' % ('testuser1', 'secret')) headers = Headers({'accept': ['application/json'], 'authorization': [basicAuth]}) agent = Agent(reactor) response = yield agent.request('POST', URI, headers) self.assertEqual(http.CREATED, response.code) self.assertTrue(response.headers.hasHeader('location')) d = defer.Deferred() bodyGetter = ResponseGetter(d) response.deliverBody(bodyGetter) body = yield d responseDict = json.loads(body) self.assertIn('URI', responseDict) self.assertIn('id', responseDict)
def testQueryUnicodePath(self): """A query on a non-existent Unicode tag, should 404. Part of the point here is to make sure that no other error occurs due to passing in a Unicode tag path. """ path = u'çóñ/???' query = '%s = "hi"' % path URI = '%s/%s?query=%s' % ( self.endpoint, defaults.httpObjectCategoryName, urllib.quote(query.encode('utf-8'))) headers = Headers({'accept': ['application/json']}) agent = Agent(reactor) response = yield agent.request('GET', URI, headers) self.assertEqual(http.NOT_FOUND, response.code)
def testValidCORSRequest(self): """ Sanity check to make sure we get the valid headers back for a CORS based request. """ agent = Agent(reactor) headers = Headers() # The origin to use in the tests dummy_origin = 'http://foo.com' headers.addRawHeader('Origin', dummy_origin) response = yield agent.request('GET', self.uri, headers) # Check we get the correct status. self.assertEqual(http.OK, response.code) # Check we get the right headers back self.assertTrue( response.headers.hasHeader('Access-Control-Allow-Origin')) self.assertTrue( response.headers.hasHeader('Access-Control-Allow-Credentials')) self.assertTrue( dummy_origin in response.headers.getRawHeaders('Access-Control-Allow-Origin'))
def testVersionGets404(self): """ Version numbers used to be able to be given in API calls, but are no longer supported. """ version = 20100808 URI = '%s/%d/%s/%s' % ( self.endpoint, version, defaults.httpNamespaceCategoryName, defaults.adminUsername) headers = Headers({'accept': ['application/json']}) agent = Agent(reactor) response = yield agent.request('GET', URI, headers) self.assertEqual(http.NOT_FOUND, response.code) # TODO: Add a test for a namespace that we don't have LIST perm on. # although that might be done in permissions.py when that finally gets # added.
def test_nonPersistent(self): """ If C{persistent} is set to C{False} when creating the L{HTTPConnectionPool}, C{Request}s are created with their C{persistent} flag set to C{False}. Elsewhere in the tests for the underlying HTTP code we ensure that this will result in the disconnection of the HTTP protocol once the request is done, so that the connection will not be returned to the pool. """ pool = HTTPConnectionPool(self.reactor, persistent=False) agent = client.Agent(self.reactor, pool=pool) agent._getEndpoint = lambda *args: self agent.request(b"GET", b"http://127.0.0.1") self.assertEqual(self.protocol.requests[0][0].persistent, False)
def test_headersUnmodified(self): """ If a I{Host} header must be added to the request, the L{Headers} instance passed to L{Agent.request} is not modified. """ headers = http_headers.Headers() self.agent._getEndpoint = lambda *args: self self.agent.request( b'GET', b'http://example.com/foo', headers) protocol = self.protocol # The request should have been issued. self.assertEqual(len(protocol.requests), 1) # And the headers object passed in should not have changed. self.assertEqual(headers, http_headers.Headers())
def makeEndpoint(self, host=b'example.com', port=443): """ Create an L{Agent} with an https scheme and return its endpoint created according to the arguments. @param host: The host for the endpoint. @type host: L{bytes} @param port: The port for the endpoint. @type port: L{int} @return: An endpoint of an L{Agent} constructed according to args. @rtype: L{SSL4ClientEndpoint} """ return client.Agent(self.createReactor())._getEndpoint( URI.fromBytes(b'https://' + host + b":" + intToBytes(port) + b"/"))
def test_deprecatedDuckPolicy(self): """ Passing something that duck-types I{like} a L{web client context factory <twisted.web.client.WebClientContextFactory>} - something that does not provide L{IPolicyForHTTPS} - to L{Agent} emits a L{DeprecationWarning} even if you don't actually C{import WebClientContextFactory} to do it. """ def warnMe(): client.Agent(MemoryReactorClock(), "does-not-provide-IPolicyForHTTPS") warnMe() warnings = self.flushWarnings([warnMe]) self.assertEqual(len(warnings), 1) [warning] = warnings self.assertEqual(warning['category'], DeprecationWarning) self.assertEqual( warning['message'], "'does-not-provide-IPolicyForHTTPS' was passed as the HTTPS " "policy for an Agent, but it does not provide IPolicyForHTTPS. " "Since Twisted 14.0, you must pass a provider of IPolicyForHTTPS." )
def integrationTest(self, hostName, expectedAddress, addressType): """ Wrap L{AgentTestsMixin.integrationTest} with TLS. """ authority, server = certificatesForAuthorityAndServer(hostName .decode('ascii')) def tlsify(serverFactory): return TLSMemoryBIOFactory(server.options(), False, serverFactory) def tlsagent(reactor): from twisted.web.iweb import IPolicyForHTTPS from zope.interface import implementer @implementer(IPolicyForHTTPS) class Policy(object): def creatorForNetloc(self, hostname, port): return optionsForClientTLS(hostname.decode("ascii"), trustRoot=authority) return client.Agent(reactor, contextFactory=Policy()) (super(AgentHTTPSTests, self) .integrationTest(hostName, expectedAddress, addressType, serverWrapper=tlsify, createAgent=tlsagent, scheme=b'https'))
def test_noRedirect(self): """ L{client.RedirectAgent} behaves like L{client.Agent} if the response doesn't contain a redirect. """ deferred = self.agent.request(b'GET', b'http://example.com/foo') req, res = self.protocol.requests.pop() headers = http_headers.Headers() response = Response((b'HTTP', 1, 1), 200, b'OK', headers, None) res.callback(response) self.assertEqual(0, len(self.protocol.requests)) result = self.successResultOf(deferred) self.assertIdentical(response, result) self.assertIdentical(result.previousResponse, None)
def test_protectedServerAndDate(self): """ If the CGI script emits a I{Server} or I{Date} header, these are ignored. """ cgiFilename = self.writeCGI(SPECIAL_HEADER_CGI) portnum = self.startServer(cgiFilename) url = "http://localhost:%d/cgi" % (portnum,) agent = client.Agent(reactor) d = agent.request(b"GET", url) d.addCallback(discardBody) def checkResponse(response): self.assertNotIn('monkeys', response.headers.getRawHeaders('server')) self.assertNotIn('last year', response.headers.getRawHeaders('date')) d.addCallback(checkResponse) return d
def test_noDuplicateContentTypeHeaders(self): """ If the CGI script emits a I{content-type} header, make sure that the server doesn't add an additional (duplicate) one, as per ticket 4786. """ cgiFilename = self.writeCGI(NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI) portnum = self.startServer(cgiFilename) url = "http://localhost:%d/cgi" % (portnum,) agent = client.Agent(reactor) d = agent.request(b"GET", url) d.addCallback(discardBody) def checkResponse(response): self.assertEqual( response.headers.getRawHeaders('content-type'), ['text/cgi-duplicate-test']) return response d.addCallback(checkResponse) return d
def test_duplicateHeaderCGI(self): """ If a CGI script emits two instances of the same header, both are sent in the response. """ cgiFilename = self.writeCGI(DUAL_HEADER_CGI) portnum = self.startServer(cgiFilename) url = "http://localhost:%d/cgi" % (portnum,) agent = client.Agent(reactor) d = agent.request(b"GET", url) d.addCallback(discardBody) def checkResponse(response): self.assertEqual( response.headers.getRawHeaders('header'), ['spam', 'eggs']) d.addCallback(checkResponse) return d
def test_malformedHeaderCGI(self): """ Check for the error message in the duplicated header """ cgiFilename = self.writeCGI(BROKEN_HEADER_CGI) portnum = self.startServer(cgiFilename) url = "http://localhost:%d/cgi" % (portnum,) agent = client.Agent(reactor) d = agent.request(b"GET", url) d.addCallback(discardBody) loggedMessages = [] def addMessage(eventDict): loggedMessages.append(log.textFromEventDict(eventDict)) log.addObserver(addMessage) self.addCleanup(log.removeObserver, addMessage) def checkResponse(ignored): self.assertIn("ignoring malformed CGI header: 'XYZ'", loggedMessages) d.addCallback(checkResponse) return d
def testReadInput(self): cgiFilename = os.path.abspath(self.mktemp()) with open(cgiFilename, 'wt') as cgiFile: cgiFile.write(READINPUT_CGI) portnum = self.startServer(cgiFilename) agent = client.Agent(reactor) d = agent.request( uri="http://localhost:%d/cgi" % (portnum,), method=b"POST", bodyProducer=client.FileBodyProducer( BytesIO(b"Here is your stdin")), ) d.addCallback(client.readBody) d.addCallback(self._testReadInput_1) return d
def testDistrib(self): # site1 is the publisher r1 = resource.Resource() r1.putChild("there", static.Data("root", "text/plain")) site1 = server.Site(r1) self.f1 = PBServerFactory(distrib.ResourcePublisher(site1)) self.port1 = reactor.listenTCP(0, self.f1) self.sub = distrib.ResourceSubscription("127.0.0.1", self.port1.getHost().port) r2 = resource.Resource() r2.putChild("here", self.sub) f2 = MySite(r2) self.port2 = reactor.listenTCP(0, f2) agent = client.Agent(reactor) d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \ (self.port2.getHost().port,)) d.addCallback(client.readBody) d.addCallback(self.assertEqual, 'root') return d
def _requestTest(self, child, **kwargs): """ Set up a resource on a distrib site using L{ResourcePublisher} and then retrieve it from a L{ResourceSubscription} via an HTTP client. @param child: The resource to publish using distrib. @param **kwargs: Extra keyword arguments to pass to L{Agent.request} when requesting the resource. @return: A L{Deferred} which fires with the result of the request. """ mainPort, mainAddr = self._setupDistribServer(child) agent = client.Agent(reactor) url = "http://%s:%s/child" % (mainAddr.host, mainAddr.port) d = agent.request(b"GET", url, **kwargs) d.addCallback(client.readBody) return d
def _get_stats_from_node(self): agent = Agent(reactor) url = self.config.get('url', 'http://%s:8098/stats' % self.hostname) ua = self.config.get('useragent', 'Duct Riak stats checker') headers = Headers({'User-Agent': [ua]}) request = yield agent.request('GET'.encode(), url.encode(), headers) if (request.length) and (request.code == 200): d = defer.Deferred() request.deliverBody(BodyReceiver(d)) b = yield d body = b.read() else: body = "{}" defer.returnValue(json.loads(body))
def network_kubernetes(**kw): """ Create a new ``IKubernetes`` provider which can be used to create clients. :param twisted.python.url.URL base_url: The root of the Kubernetes HTTPS API to interact with. :param twisted.web.iweb.IAgent agent: An HTTP agent to use to issue requests. Defaults to a new ``twisted.web.client.Agent`` instance. See ``txkube.authenticate_with_serviceaccount`` and ``txkube.authenticate_with_certificate`` for helpers for creating agents that interact well with Kubernetes servers. :return IKubernetes: The Kubernetes service. """ return _NetworkKubernetes(**kw)
def authenticate_with_serviceaccount(reactor, **kw): """ Create an ``IAgent`` which can issue authenticated requests to a particular Kubernetes server using a service account token. :param reactor: The reactor with which to configure the resulting agent. :param bytes path: The location of the service account directory. The default should work fine for normal use within a container. :return IAgent: An agent which will authenticate itself to a particular Kubernetes server and which will verify that server or refuse to interact with it. """ config = KubeConfig.from_service_account(**kw) policy = https_policy_from_config(config) token = config.user["token"] agent = HeaderInjectingAgent( _to_inject=Headers({u"authorization": [u"Bearer {}".format(token)]}), _agent=Agent(reactor, contextFactory=policy), ) return agent
def _pollForResult(self): if self._canceled: Log.d("Auth Request canceled") return if self._user_code.expired(): self._onError(self.ERROR_CREDENTIALS_REQUEST_EXPIRED) return d = self._agent.request( 'POST', self.AUTH_RESPONSE_URI, Headers({ 'User-Agent' : [self.USER_AGENT], 'Content-Type' : ["application/x-www-form-urlencoded"], }), StringProducer("client_id=%s&client_secret=%s&code=%s&grant_type=%s" % (self.CLIENT_ID, self.CLIENT_SECRET, str(self._user_code.device_code), self.GRANT_TYPE_DEVICE_AUTH)) ) d.addCallbacks(self._onCredentialsPollResponse, self._onCredentialsPollError) self._responseDeferred = d return d;
def create_txacme_client_creator(reactor, url, key, alg=jose.RS256): """ Create a creator for txacme clients to provide to the txacme service. See ``txacme.client.Client.from_url()``. We create the underlying JWSClient with a non-persistent pool to avoid https://github.com/mithrandi/txacme/issues/86. :return: a callable that returns a deffered that returns the client """ # Creating an Agent without specifying a pool gives us the default pool # which is non-persistent. jws_client = JWSClient(HTTPClient(agent=Agent(reactor)), key, alg) return partial(txacme_Client.from_url, reactor, url, key, alg, jws_client)
def test_default_client(self): """ When default_client is passed a client it should return that client. """ client = treq_HTTPClient(Agent(reactor)) assert_that(default_client(client, reactor), Is(client))
def default_client(client, reactor): """ Set up a default client if one is not provided. Set up the default ``twisted.web.client.Agent`` using the provided reactor. """ if client is None: from twisted.web.client import Agent client = treq_HTTPClient(Agent(reactor)) return client
def twisted_fetch(self, url, runner): # http://twistedmatrix.com/documents/current/web/howto/client.html chunks = [] client = Agent(self.reactor) d = client.request(b'GET', utf8(url)) class Accumulator(Protocol): def __init__(self, finished): self.finished = finished def dataReceived(self, data): chunks.append(data) def connectionLost(self, reason): self.finished.callback(None) def callback(response): finished = Deferred() response.deliverBody(Accumulator(finished)) return finished d.addCallback(callback) def shutdown(failure): if hasattr(self, 'stop_loop'): self.stop_loop() elif failure is not None: # loop hasn't been initialized yet; try our best to # get an error message out. (the runner() interaction # should probably be refactored). try: failure.raiseException() except: logging.error('exception before starting loop', exc_info=True) d.addBoth(shutdown) runner() self.assertTrue(chunks) return ''.join(chunks)
def __init__(self, reactor, url, pool=None, timeout=None, connect_timeout=None): """ :param rector: Twisted reactor to use. :type reactor: class :param url: etcd URL, eg `http://localhost:2379` :type url: str :param pool: Twisted Web agent connection pool :type pool: :param timeout: If given, a global request timeout used for all requests to etcd. :type timeout: float or None :param connect_timeout: If given, a global connection timeout used when opening a new HTTP connection to etcd. :type connect_timeout: float or None """ if type(url) != six.text_type: raise TypeError('url must be of type unicode, was {}'.format(type(url))) self._url = url self._timeout = timeout self._pool = pool or HTTPConnectionPool(reactor, persistent=True) self._pool._factory.noisy = False self._agent = Agent(reactor, connectTimeout=connect_timeout, pool=self._pool)
def cookieAgentFactory(verify_path, connectTimeout=30): customPolicy = BrowserLikePolicyForHTTPS( Certificate.loadPEM(FilePath(verify_path).getContent())) agent = Agent(reactor, customPolicy, connectTimeout=connectTimeout) cookiejar = cookielib.CookieJar() return CookieAgent(agent, cookiejar)
def __init__(self): self.request_count = 0 self.error_count = 0 self.agent = Agent(reactor) self.completion_callback = None self.error_callback = None self.request_path = None
def __init__(self, email): self.email = email self.email_domain = email.split("@")[1] self.agent = Agent(reactor) self.state = AutoDiscover.STATE_INIT self.redirect_urls = []
def handle_redirect(self, new_url): if new_url in self.redirect_urls: raise Exception("AutoDiscover", "Circular redirection") self.redirect_urls.append(new_url) self.state = AutoDiscover.STATE_REDIRECT print "Making request to",new_url d = self.agent.request( 'GET', new_url, Headers({'User-Agent': ['python-EAS-Client %s'%version]}), AutoDiscoveryProducer(self.email)) d.addCallback(self.autodiscover_response) d.addErrback(self.autodiscover_error) return d
def autodiscover(self): self.state += 1 if self.state in AutoDiscover.AD_REQUESTS: print "Making request to",AutoDiscover.AD_REQUESTS[self.state]%self.email_domain body = AutoDiscoveryProducer(self.email) d = self.agent.request( 'GET', AutoDiscover.AD_REQUESTS[self.state]%self.email_domain, Headers({'User-Agent': ['python-EAS-Client %s'%version]}), body) d.addCallback(self.autodiscover_response) d.addErrback(self.autodiscover_error) return d else: raise Exception("Unsupported state",str(self.state))
def get_options(self): if self.verbose: print "Options, get URL:",self.get_url(),"Authorization",self.authorization_header() d = self.agent.request( 'OPTIONS', self.get_url(), Headers({'User-Agent': ['python-EAS-Client %s'%version], 'Authorization': [self.authorization_header()]}), None) d.addCallback(self.options_response) d.addErrback(self.activesync_error) return d