我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.web.server.Request()。
def getClientIP(request, useForwardedHeader=False): """Get the client's IP address from the ``'X-Forwarded-For:'`` header, or from the :api:`request <twisted.web.server.Request>`. :type request: :api:`twisted.web.http.Request` :param request: A ``Request`` for a :api:`twisted.web.resource.Resource`. :param bool useForwardedHeader: If ``True``, attempt to get the client's IP address from the ``'X-Forwarded-For:'`` header. :rtype: ``None`` or :any:`str` :returns: The client's IP address, if it was obtainable. """ ip = None if useForwardedHeader: header = request.getHeader("X-Forwarded-For") if header: ip = header.split(",")[-1].strip() if not isIPAddress(ip): log.msg("Got weird X-Forwarded-For value %r" % header) ip = None else: ip = request.getClientIP() return ip
def getResourceFor(self, request): """Get a resource for a request. This iterates through the resource heirarchy, calling getChildWithDefault on each resource it finds for a path element, stopping when it hits an element where isLeaf is true. """ log.msg("Request for resource at %s" % request.path) request.site = self # Sitepath is used to determine cookie names between distributed # servers and disconnected sites. request.sitepath = copy.copy(request.prepath) child = resource.getChildForRequest(self.resource, request) log.msg(child) return child
def setup(self): self.empty_resp = '' if sys.version_info[0] >= 3: self.empty_resp = b'' self.mock_redir = Mock(spec_set=VaultRedirector) type(self.mock_redir).active_node_ip_port = 'mynode:1234' type(self.mock_redir).last_poll_time = 'myPollTime' type(self.mock_redir).log_enabled = True type(self.mock_redir).consul_scheme = 'http' type(self.mock_redir).consul_host_port = 'consul:5678' self.cls = VaultRedirectorSite(self.mock_redir) # mock client request self.mock_request = Mock(spec_set=Request) type(self.mock_request).method = 'GET' client_addr = IPv4Address('TCP', '1.2.3.4', 12345) type(self.mock_request).client = client_addr type(self.mock_request).clientproto = 'HTTP/1.1' headers = Headers() headers.setRawHeaders('date', ['Mon, 11 Apr 2016 15:26:42 GMT']) headers.setRawHeaders('server', ['TwistedWeb/16.1.0']) type(self.mock_request).responseHeaders = headers type(self.mock_request).queued = 0
def test_headersAndCode(self): """ L{redirectTo} will set the C{Location} and C{Content-Type} headers on its request, and set the response code to C{FOUND}, so the browser will be redirected. """ request = Request(DummyChannel(), True) request.method = b'GET' targetURL = b"http://target.example.com/4321" redirectTo(targetURL, request) self.assertEqual(request.code, FOUND) self.assertEqual( request.responseHeaders.getRawHeaders(b'location'), [targetURL]) self.assertEqual( request.responseHeaders.getRawHeaders(b'content-type'), [b'text/html; charset=utf-8'])
def test_processingFailedNoTraceback(self): """ L{Request.processingFailed} when the site has C{displayTracebacks} set to C{False} does not write out the failure, but give a generic error message. """ d = DummyChannel() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.site.displayTracebacks = False fail = failure.Failure(Exception("Oh no!")) request.processingFailed(fail) self.assertNotIn(b"Oh no!", request.transport.written.getvalue()) self.assertIn( b"Processing Failed", request.transport.written.getvalue() ) # Since we didn't "handle" the exception, flush it to prevent a test # failure self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTraceback(self): """ L{Request.processingFailed} when the site has C{displayTracebacks} set to C{True} writes out the failure. """ d = DummyChannel() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.site.displayTracebacks = True fail = failure.Failure(Exception("Oh no!")) request.processingFailed(fail) self.assertIn(b"Oh no!", request.transport.written.getvalue()) # Since we didn't "handle" the exception, flush it to prevent a test # failure self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTracebackHandlesUnicode(self): """ L{Request.processingFailed} when the site has C{displayTracebacks} set to C{True} writes out the failure, making UTF-8 items into HTML entities. """ d = DummyChannel() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.site.displayTracebacks = True fail = failure.Failure(Exception(u"\u2603")) request.processingFailed(fail) self.assertIn(b"☃", request.transport.written.getvalue()) # On some platforms, we get a UnicodeError when trying to # display the Failure with twisted.python.log because # the default encoding cannot display u"\u2603". Windows for example # uses a default encodig of cp437 which does not support u"\u2603". self.flushLoggedErrors(UnicodeError) # Since we didn't "handle" the exception, flush it to prevent a test # failure self.assertEqual(1, len(self.flushLoggedErrors()))
def test_sessionDifferentFromSecureSession(self): """ L{Request.session} and L{Request.secure_session} should be two separate sessions with unique ids and different cookies. """ d = DummyChannel() d.transport = DummyChannel.SSL() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.sitepath = [] secureSession = request.getSession() self.assertIsNotNone(secureSession) self.addCleanup(secureSession.expire) self.assertEqual(request.cookies[0].split(b"=")[0], b"TWISTED_SECURE_SESSION") session = request.getSession(forceNotSecure=True) self.assertIsNotNone(session) self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION") self.addCleanup(session.expire) self.assertNotEqual(session.uid, secureSession.uid)
def test_retrieveNonExistentSession(self): """ L{Request.getSession} generates a new session if the relevant cookie is set in the incoming request. """ site = server.Site(resource.Resource()) d = DummyChannel() request = server.Request(d, 1) request.site = site request.sitepath = [] request.received_cookies[b'TWISTED_SESSION'] = b"does-not-exist" session = request.getSession() self.assertIsNotNone(session) self.addCleanup(session.expire) self.assertTrue(request.cookies[0].startswith(b'TWISTED_SESSION=')) # It should be a new session ID. self.assertNotIn(b"does-not-exist", request.cookies[0])
def test_interfaces(self): """ L{server.GzipEncoderFactory} implements the L{iweb._IRequestEncoderFactory} and its C{encoderForRequest} returns an instance of L{server._GzipEncoder} which implements L{iweb._IRequestEncoder}. """ request = server.Request(self.channel, False) request.gotLength(0) request.requestHeaders.setRawHeaders(b"Accept-Encoding", [b"gzip,deflate"]) factory = server.GzipEncoderFactory() self.assertTrue(verifyObject(iweb._IRequestEncoderFactory, factory)) encoder = factory.encoderForRequest(request) self.assertTrue(verifyObject(iweb._IRequestEncoder, encoder))
def test_encoding(self): """ If the client request passes a I{Accept-Encoding} header which mentions gzip, L{server._GzipEncoder} automatically compresses the data. """ request = server.Request(self.channel, False) request.gotLength(0) request.requestHeaders.setRawHeaders(b"Accept-Encoding", [b"gzip,deflate"]) request.requestReceived(b'GET', b'/foo', b'HTTP/1.0') data = self.channel.transport.written.getvalue() self.assertNotIn(b"Content-Length", data) self.assertIn(b"Content-Encoding: gzip\r\n", data) body = data[data.find(b"\r\n\r\n") + 4:] self.assertEqual(b"Some data", zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_multipleAccept(self): """ If there are multiple I{Accept-Encoding} header, L{server.GzipEncoderFactory} reads them properly to detect if gzip is supported. """ request = server.Request(self.channel, False) request.gotLength(0) request.requestHeaders.setRawHeaders(b"Accept-Encoding", [b"deflate", b"gzip"]) request.requestReceived(b'GET', b'/foo', b'HTTP/1.0') data = self.channel.transport.written.getvalue() self.assertNotIn(b"Content-Length", data) self.assertIn(b"Content-Encoding: gzip\r\n", data) body = data[data.find(b"\r\n\r\n") + 4:] self.assertEqual(b"Some data", zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_alreadyEncoded(self): """ If the content is already encoded and the I{Content-Encoding} header is set, L{server.GzipEncoderFactory} properly appends gzip to it. """ request = server.Request(self.channel, False) request.gotLength(0) request.requestHeaders.setRawHeaders(b"Accept-Encoding", [b"deflate", b"gzip"]) request.responseHeaders.setRawHeaders(b"Content-Encoding", [b"deflate"]) request.requestReceived(b'GET', b'/foo', b'HTTP/1.0') data = self.channel.transport.written.getvalue() self.assertNotIn(b"Content-Length", data) self.assertIn(b"Content-Encoding: deflate,gzip\r\n", data) body = data[data.find(b"\r\n\r\n") + 4:] self.assertEqual(b"Some data", zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_multipleEncodingLines(self): """ If there are several I{Content-Encoding} headers, L{server.GzipEncoderFactory} normalizes it and appends gzip to the field value. """ request = server.Request(self.channel, False) request.gotLength(0) request.requestHeaders.setRawHeaders(b"Accept-Encoding", [b"deflate", b"gzip"]) request.responseHeaders.setRawHeaders(b"Content-Encoding", [b"foo", b"bar"]) request.requestReceived(b'GET', b'/foo', b'HTTP/1.0') data = self.channel.transport.written.getvalue() self.assertNotIn(b"Content-Length", data) self.assertIn(b"Content-Encoding: foo,bar,gzip\r\n", data) body = data[data.find(b"\r\n\r\n") + 4:] self.assertEqual(b"Some data", zlib.decompress(body, 16 + zlib.MAX_WBITS))
def test_requestWriteAfterFinish(self): app = self.app request = requestMock(b"/") @app.route("/") def root(request): request.finish() return b'foo' d = _render(self.kr, request) self.assertFired(d) self.assertEqual(request.writeCount, 2) self.assertEqual(request.getWrittenData(), b'') [failure] = self.flushLoggedErrors(RuntimeError) self.assertEqual( str(failure.value), ("Request.write called on a request after Request.finish was " "called."))
def getClientIP(self, request): """Get the client's IP address from the ``'X-Forwarded-For:'`` header, or from the :api:`request <twisted.web.server.Request>`. :type request: :api:`twisted.web.http.Request` :param request: A ``Request`` for a :api:`twisted.web.resource.Resource`. :rtype: ``None`` or :any:`str` :returns: The client's IP address, if it was obtainable. """ return getClientIP(request, self.useForwardedHeader)
def getCaptchaImage(self, request): """Get a random CAPTCHA image from our **captchaDir**. Creates a :class:`~farfetchd.captcha.GimpCaptcha`, and calls its :meth:`~farfetchd.captcha.GimpCaptcha.get` method to return a random CAPTCHA and challenge string. :type request: :api:`twisted.web.http.Request` :param request: A client's initial request for some other resource which is protected by this one (i.e. protected by a CAPTCHA). :returns: A 2-tuple of ``(image, challenge)``, where:: - ``image`` is a string holding a binary, JPEG-encoded image. - ``challenge`` is a unique string associated with the request. """ # Create a new HMAC key, specific to requests from this client: clientIP = self.getClientIP(request) clientHMACKey = crypto.getHMAC(self.hmacKey, clientIP) capt = captcha.GimpCaptcha(self.publicKey, self.secretKey, clientHMACKey, self.captchaDir) try: capt.get() except captcha.GimpCaptchaError as error: log.msg(error) except Exception as error: # pragma: no cover log.err("Unhandled error while retrieving Gimp captcha!") log.err(error) return (capt.image, capt.challenge)
def render_GET(self, request): """Retrieve a ReCaptcha from the API server and serve it to the client. :type request: :api:`twisted.web.http.Request` :param request: A ``Request`` object for a CAPTCHA. :rtype: str :returns: A JSON blob containing the following fields: * "version": The Farfetchd protocol version. * "image": A base64-encoded CAPTCHA JPEG image. * "challenge": A base64-encoded, encrypted challenge. The client will need to hold on to the and pass it back later, along with their challenge response. * "error": An ASCII error message. Any of the above JSON fields may be "null". """ image, challenge = self.getCaptchaImage(request) json = { 'data': { 'id': 1, 'type': self.responseType, 'version': FARFETCHD_API_VERSION, 'image': image, 'challenge': challenge, # The challenge is already base64-encoded } } try: json["data"]["image"] = base64.b64encode(image) except Exception as err: log.err("Could not construct or encode captcha!") log.err(err) return self.formatResponse(json, request)
def render_POST(self, request): """Process a client's CAPTCHA solution. If the client's CAPTCHA solution is valid (according to :meth:`checkSolution`), process and serve their original request. Otherwise, redirect them back to a new CAPTCHA page. :type request: :api:`twisted.web.http.Request` :param request: A ``Request`` object, including POST arguments which should include two key/value pairs: one key being ``'captcha_challenge_field'``, and the other, ``'captcha_response_field'``. These POST arguments should be obtained from :meth:`render_GET`. :rtype: str :returns: A rendered HTML page containing a ReCaptcha challenge image for the client to solve. """ data = { "data": { "id": 3, "type": self.responseType, "version": FARFETCHD_API_VERSION, "result": False, } } try: if self.checkSolution(request) is True: data["data"]["result"] = True return self.formatResponse(data, request) else: return self.failureResponse(request) except Exception as err: log.err(err) #class FarfetchdRequestHandler(http.Request):
def __init__(self, *args, **kwargs): http.Request.__init__(self, *args, **kwargs)
def process(self): """Process an incoming request to the Farfetchd server.""" # Get site from channel self.site = self.channel.site # Set various default headers self.setHeader(b"Content-Type", b"application/vnd.api+json") self.setHeader(b"Server", "Farfetchd v%s" % FARFETCHD_API_VERSION) self.setHeader(b"Date", http.datetimeToString()) # Resource Identification self.prepath = [] self.postpath = list(map(http.unquote, self.path[1:].split(b'/'))) log.msg("postpath is %s" % self.postpath) log.msg("self.resource is %s" % self.resource) #requested_resource = self.resource.getChildForRequest(self) requested_resource = resource.getChildForRequest(self.resource, self) try: requested_resource = self.site.getResourceFor(self) #self.render(requested_resource) log.msg("Requested resource is %s" % requested_resource) log.msg("Requested resource entities are %s" % requested_resource.listEntities()) if requested_resource: if requested_resource.responseType: log.msg("Request will be handled by %r" % requested_resource.__class__.__name__) self.checkRequestHeaders() #requested_resource.render(self) self.render(requested_resource) else: self.setResponseCode(http.NOT_FOUND) self.write(b"No such resource") except: self.processingFailed(failure.Failure()) if not self.finished: self.finish()
def testChildLink(self): request = server.Request(DummyChannel(), 1) request.gotLength(0) request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') self.assertEqual(request.childLink('baz'), 'bar/baz') request = server.Request(DummyChannel(), 1) request.gotLength(0) request.requestReceived('GET', '/foo/bar/', 'HTTP/1.0') self.assertEqual(request.childLink('baz'), 'baz')
def testPrePathURLSimple(self): request = server.Request(DummyChannel(), 1) request.gotLength(0) request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') request.setHost('example.com', 80) self.assertEqual(request.prePathURL(), 'http://example.com/foo/bar')
def testPrePathURLNonDefault(self): d = DummyChannel() d.transport = DummyChannel.TCP() d.transport.port = 81 request = server.Request(d, 1) request.setHost('example.com', 81) request.gotLength(0) request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') self.assertEqual(request.prePathURL(), 'http://example.com:81/foo/bar')
def testPrePathURLSSLPort(self): d = DummyChannel() d.transport = DummyChannel.TCP() d.transport.port = 443 request = server.Request(d, 1) request.setHost('example.com', 443) request.gotLength(0) request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') self.assertEqual(request.prePathURL(), 'http://example.com:443/foo/bar')
def testPrePathURLSSLPortAndSSL(self): d = DummyChannel() d.transport = DummyChannel.SSL() d.transport.port = 443 request = server.Request(d, 1) request.setHost('example.com', 443) request.gotLength(0) request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') self.assertEqual(request.prePathURL(), 'https://example.com/foo/bar')
def testPrePathURLSSLNonDefault(self): d = DummyChannel() d.transport = DummyChannel.SSL() d.transport.port = 81 request = server.Request(d, 1) request.setHost('example.com', 81) request.gotLength(0) request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') self.assertEqual(request.prePathURL(), 'https://example.com:81/foo/bar')
def testPrePathURLSetSSLHost(self): d = DummyChannel() d.transport = DummyChannel.TCP() d.transport.port = 81 request = server.Request(d, 1) request.setHost('foo.com', 81, 1) request.gotLength(0) request.requestReceived('GET', '/foo/bar', 'HTTP/1.0') self.assertEqual(request.prePathURL(), 'https://foo.com:81/foo/bar')
def testNotifyFinishConnectionLost(self): d = DummyChannel() d.transport = DummyChannel.TCP() request = server.Request(d, 1) finished = request.notifyFinish() request.connectionLost(error.ConnectionDone("Connection done")) return self.assertFailure(finished, error.ConnectionDone)
def testSimple(self): r = resource.Resource() r.isLeaf=0 rr = RootResource() r.putChild('foo', rr) rr.putChild('', rr) rr.putChild('bar', resource.Resource()) chan = self.createServer(r) for url in ['/foo/', '/foo/bar', '/foo/bar/baz', '/foo/bar/']: request = server.Request(chan, 1) request.setHost('example.com', 81) request.gotLength(0) request.requestReceived('GET', url, 'HTTP/1.0') self.assertEqual(request.getRootURL(), "http://example.com/foo")
def testRoot(self): rr = RootResource() rr.putChild('', rr) rr.putChild('bar', resource.Resource()) chan = self.createServer(rr) for url in ['/', '/bar', '/bar/baz', '/bar/']: request = server.Request(chan, 1) request.setHost('example.com', 81) request.gotLength(0) request.requestReceived('GET', url, 'HTTP/1.0') self.assertEqual(request.getRootURL(), "http://example.com/")
def __init__(self, *args, **kw): server.Request.__init__(self, *args, **kw) self._cookieCache = {} from cStringIO import StringIO self.content = StringIO() self.received_headers['host'] = 'fake.com' self.written = StringIO()
def write(self, data): self.written.write(data) server.Request.write(self, data)
def addCookie(self, k, v, *args,**kw): server.Request.addCookie(self,k,v,*args,**kw) assert not self._cookieCache.has_key(k), "Should not be setting duplicate cookies!" self._cookieCache[k] = v self.channel.received_cookies[k] = v
def testNoUICaching(self): ui_caching = False site = CacheControlledSite(ui_caching, self._resource) request = server.Request(DummyChannel(), False) request.prepath = [b""] request.postpath = [b""] site.getResourceFor(request) self.assertTrue(request.responseHeaders.getRawHeaders("cache-control") == ["no-store, must-revalidate"])
def testUICaching(self): ui_caching = True site = CacheControlledSite(ui_caching, self._resource) request = server.Request(DummyChannel(), False) request.prepath = [b""] request.postpath = [b""] site.getResourceFor(request) self.assertTrue(request.responseHeaders.getRawHeaders("cache-control") == ["max-age={}".format(FRESHNESS_TIME_SECS)])
def __init__(self, *args, **kw): server.Request.__init__(self, *args, **kw) self._logger = logging.getLogger(self.__class__.__name__)