我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.web.resource.Resource()。
def setUp(self): """Create a :class:`CaptchaFetchResource`.""" secretKey, publicKey = crypto.getRSAKey('captcha.key', bits=1024) # Set up our resources to fake a minimal HTTP(S) server: self.pagename = b'fetch' self.root = Resource() shutil.copytree('../captchas', os.path.sep.join([os.getcwd(), 'captchas'])) self.captchaDir = os.path.sep.join([os.getcwd(), 'captchas']) self.captchaResource = server.CaptchaFetchResource( secretKey=secretKey, publicKey=publicKey, hmacKey='abcdefghijklmnopqrstuvwxyz012345', captchaDir=self.captchaDir, useForwardedHeader=True) self.root.putChild(self.pagename, self.captchaResource) # Set up the basic parts of our faked request: self.request = DummyRequest([self.pagename])
def setUp(self): """Create a :class:`CaptchaFetchResource`. """ secretKey, publicKey = crypto.getRSAKey('captcha.key', bits=1024) # Set up our resources to fake a minimal HTTP(S) server: self.pagename = b'check' self.root = Resource() shutil.copytree('../captchas', os.path.sep.join([os.getcwd(), 'captchas'])) self.captchaDir = os.path.sep.join([os.getcwd(), 'captchas']) self.captchaResource = server.CaptchaCheckResource( secretKey=secretKey, publicKey=publicKey, hmacKey='abcdefghijklmnopqrstuvwxyz012345', useForwardedHeader=True) self.root.putChild(self.pagename, self.captchaResource) # Set up the basic parts of our faked request: self.request = DummyRequest([self.pagename])
def getClientIP(request, useForwardedHeader=False): """Get the client's IP address from the ``'X-Forwarded-For:'`` header, or from the :api:`request <twisted.web.server.Request>`. :type request: :api:`twisted.web.http.Request` :param request: A ``Request`` for a :api:`twisted.web.resource.Resource`. :param bool useForwardedHeader: If ``True``, attempt to get the client's IP address from the ``'X-Forwarded-For:'`` header. :rtype: ``None`` or :any:`str` :returns: The client's IP address, if it was obtainable. """ ip = None if useForwardedHeader: header = request.getHeader("X-Forwarded-For") if header: ip = header.split(",")[-1].strip() if not isIPAddress(ip): log.msg("Got weird X-Forwarded-For value %r" % header) ip = None else: ip = request.getClientIP() return ip
def __init__(self, m, inputhandlers=None, view=None, controllers=None, templateDirectory = None): #self.start = now() resource.Resource.__init__(self) self.model = m # It's the responsibility of the calling code to make sure setView is # called on this controller before it's rendered. self.view = None self.subcontrollers = [] if self.setupStacks: self.setupControllerStack() if inputhandlers is None and controllers is None: self._inputhandlers = [] elif inputhandlers: print "The inputhandlers arg is deprecated, please use controllers instead" self._inputhandlers = inputhandlers else: self._inputhandlers = controllers if templateDirectory is not None: self.templateDirectory = templateDirectory self._valid = {} self._invalid = {} self._process = {} self._parent = None
def ResourceScript(path, registry): """ I am a normal py file which must define a 'resource' global, which should be an instance of (a subclass of) web.resource.Resource; it will be renderred. """ cs = CacheScanner(path, registry) glob = {'__file__': path, 'resource': noRsrc, 'registry': registry, 'cache': cs.cache, 'recache': cs.recache} try: execfile(path, glob, glob) except AlreadyCached, ac: return ac.args[0] rsrc = glob['resource'] if cs.doCache and rsrc is not noRsrc: registry.cachePath(path, rsrc) return rsrc
def testDistrib(self): # site1 is the publisher r1 = resource.Resource() r1.putChild("there", static.Data("root", "text/plain")) site1 = server.Site(r1) self.f1 = PBServerFactory(distrib.ResourcePublisher(site1)) self.port1 = reactor.listenTCP(0, self.f1) self.sub = distrib.ResourceSubscription("127.0.0.1", self.port1.getHost().port) r2 = resource.Resource() r2.putChild("here", self.sub) f2 = MySite(r2) self.port2 = reactor.listenTCP(0, f2) d = client.getPage("http://127.0.0.1:%d/here/there" % \ self.port2.getHost().port) d.addCallback(self.failUnlessEqual, 'root') return d
def __init__(self, path, defaultType="text/html", ignoredExts=(), registry=None, allowExt=0): """Create a file with the given path. """ resource.Resource.__init__(self) filepath.FilePath.__init__(self, path) # Remove the dots from the path to split self.defaultType = defaultType if ignoredExts in (0, 1) or allowExt: warnings.warn("ignoredExts should receive a list, not a boolean") if ignoredExts or allowExt: self.ignoredExts = ['*'] else: self.ignoredExts = [] else: self.ignoredExts = list(ignoredExts) self.registry = registry or Registry()
def alias(aliasPath, sourcePath): """ I am not a very good aliaser. But I'm the best I can be. If I'm aliasing to a Resource that generates links, and it uses any parts of request.prepath to do so, the links will not be relative to the aliased path, but rather to the aliased-to path. That I can't alias static.File directory listings that nicely. However, I can still be useful, as many resources will play nice. """ sourcePath = sourcePath.split('/') aliasPath = aliasPath.split('/') def rewriter(request): if request.postpath[:len(aliasPath)] == aliasPath: after = request.postpath[len(aliasPath):] request.postpath = sourcePath + after request.path = '/'+'/'.join(request.prepath+request.postpath) return rewriter
def start(cls, net, factory, bitcoind, peer_ports, merged_urls): self = cls() self.n = node.Node(factory, bitcoind, [], [], net) yield self.n.start() self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports]) self.n.p2p_node.start() wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind) self.wb = wb web_root = resource.Resource() worker_interface.WorkerInterface(wb).attach_to(web_root) self.web_port = reactor.listenTCP(0, server.Site(web_root)) defer.returnValue(self)
def render(self, request): def finish(x): if request.channel is None: # disconnected return if x is not None: request.write(x) request.finish() def finish_error(fail): if request.channel is None: # disconnected return request.setResponseCode(500) # won't do anything if already written to request.write('---ERROR---') request.finish() log.err(fail, "Error in DeferredResource handler:") defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error) return server.NOT_DONE_YET
def start(cls, net, factory, bitcoind, peer_ports, merged_urls): self = cls() self.n = node.Node(factory, bitcoind, [], [], net) yield self.n.start() self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports]) self.n.p2p_node.start() wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3) self.wb = wb web_root = resource.Resource() worker_interface.WorkerInterface(wb).attach_to(web_root) self.web_port = reactor.listenTCP(0, server.Site(web_root)) defer.returnValue(self)
def __init__(self, app, config): resource.Resource.__init__(self) self.app = app self.IApp = IServiceCollection(app, app) self.debug = config.getboolean('debug', False) self.nodename = config.get('node_name', socket.gethostname()) static_serve_path = config.get('static_serve_path', 'files') storage_path = config.get('storage_path') self.putChild('', Home(self)) self.putChild('jobs', Jobs(self)) self.putChild(static_serve_path, File(storage_path)) services = config.items('services', ()) for servName, servClsName in services: servCls = load_object(servClsName) self.putChild(servName, servCls(self))
def handle_complex_resource_path(path, root, resource): # Handle complex endpoint. Twisted expects byte-type URIs. fullpath = path.encode('utf-8').split(b'/') parent_path_item_resource = root for path_item in fullpath: if path_item == fullpath[-1]: parent_path_item_resource.putChild(path_item, resource) else: new_resource = Resource() parent_path_item_resource.putChild(path_item, new_resource) parent_path_item_resource = new_resource # ============================================================================= # Start webserver # =============================================================================
def main(): parser = argparse.ArgumentParser(description='VMWare metrics exporter for Prometheus') parser.add_argument('-c', '--config', dest='config_file', default='config.yml', help="configuration file") parser.add_argument('-p', '--port', dest='port', type=int, default=9272, help="HTTP port to expose metrics") args = parser.parse_args() # Start up the server to expose the metrics. root = Resource() root.putChild(b'metrics', VMWareMetricsResource(args)) factory = Site(root) print("Starting web server on port {}".format(args.port)) reactor.listenTCP(args.port, factory) reactor.run()
def setUp(self): yield super(TestBwscan, self).setUp() yield setconf_fetch_all_descs(self.tor) class DummyResource(Resource): isLeaf = True def render_GET(self, request): size = request.uri.split('/')[-1] if 'k' in size: size = int(size[:-1])*(2**10) elif 'M' in size: size = int(size[:-1])*(2**20) return 'a'*size self.port = yield available_tcp_port(reactor) self.test_service = yield reactor.listenTCP( self.port, Site(DummyResource()))
def setUp(self): yield super(TestStreamBandwidthListener, self).setUp() self.fetch_size = 8*2**20 # 8MB self.stream_bandwidth_listener = yield StreamBandwidthListener(self.tor) class DummyResource(Resource): isLeaf = True def render_GET(self, request): return 'a'*8*2**20 self.port = yield available_tcp_port(reactor) self.site = Site(DummyResource()) self.test_service = yield reactor.listenTCP(self.port, self.site) self.not_enough_measurements = NotEnoughMeasurements( "Not enough measurements to calculate STREAM_BW samples.")
def ResourceScript(path, registry): """ I am a normal py file which must define a 'resource' global, which should be an instance of (a subclass of) web.resource.Resource; it will be renderred. """ cs = CacheScanner(path, registry) glob = {'__file__': _coerceToFilesystemEncoding("", path), 'resource': noRsrc, 'registry': registry, 'cache': cs.cache, 'recache': cs.recache} try: execfile(path, glob, glob) except AlreadyCached as ac: return ac.args[0] rsrc = glob['resource'] if cs.doCache and rsrc is not noRsrc: registry.cachePath(path, rsrc) return rsrc
def test_sessionUIDGeneration(self): """ L{site.getSession} generates L{Session} objects with distinct UIDs from a secure source of entropy. """ site = server.Site(resource.Resource()) # Ensure that we _would_ use the unpredictable random source if the # test didn't stub it. self.assertIdentical(site._entropy, os.urandom) def predictableEntropy(n): predictableEntropy.x += 1 return (unichr(predictableEntropy.x) * n).encode("charmap") predictableEntropy.x = 0 self.patch(site, "_entropy", predictableEntropy) a = self.getAutoExpiringSession(site) b = self.getAutoExpiringSession(site) self.assertEqual(a.uid, b"01" * 0x20) self.assertEqual(b.uid, b"02" * 0x20) # This functionality is silly (the value is no longer used in session # generation), but 'counter' was a public attribute since time # immemorial so we should make sure if anyone was using it to get site # metrics or something it keeps working. self.assertEqual(site.counter, 2)
def test_processingFailedNoTraceback(self): """ L{Request.processingFailed} when the site has C{displayTracebacks} set to C{False} does not write out the failure, but give a generic error message. """ d = DummyChannel() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.site.displayTracebacks = False fail = failure.Failure(Exception("Oh no!")) request.processingFailed(fail) self.assertNotIn(b"Oh no!", request.transport.written.getvalue()) self.assertIn( b"Processing Failed", request.transport.written.getvalue() ) # Since we didn't "handle" the exception, flush it to prevent a test # failure self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTraceback(self): """ L{Request.processingFailed} when the site has C{displayTracebacks} set to C{True} writes out the failure. """ d = DummyChannel() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.site.displayTracebacks = True fail = failure.Failure(Exception("Oh no!")) request.processingFailed(fail) self.assertIn(b"Oh no!", request.transport.written.getvalue()) # Since we didn't "handle" the exception, flush it to prevent a test # failure self.assertEqual(1, len(self.flushLoggedErrors()))
def test_sessionDifferentFromSecureSession(self): """ L{Request.session} and L{Request.secure_session} should be two separate sessions with unique ids and different cookies. """ d = DummyChannel() d.transport = DummyChannel.SSL() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.sitepath = [] secureSession = request.getSession() self.assertIsNotNone(secureSession) self.addCleanup(secureSession.expire) self.assertEqual(request.cookies[0].split(b"=")[0], b"TWISTED_SECURE_SESSION") session = request.getSession(forceNotSecure=True) self.assertIsNotNone(session) self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION") self.addCleanup(session.expire) self.assertNotEqual(session.uid, secureSession.uid)
def test_sessionAttribute(self): """ On a L{Request}, the C{session} attribute retrieves the associated L{Session} only if it has been initialized. If the request is secure, it retrieves the secure session. """ site = server.Site(resource.Resource()) d = DummyChannel() d.transport = DummyChannel.SSL() request = server.Request(d, 1) request.site = site request.sitepath = [] self.assertIs(request.session, None) insecureSession = request.getSession(forceNotSecure=True) self.addCleanup(insecureSession.expire) self.assertIs(request.session, None) secureSession = request.getSession() self.addCleanup(secureSession.expire) self.assertIsNot(secureSession, None) self.assertIsNot(secureSession, insecureSession) self.assertIs(request.session, secureSession)
def test_retrieveNonExistentSession(self): """ L{Request.getSession} generates a new session if the relevant cookie is set in the incoming request. """ site = server.Site(resource.Resource()) d = DummyChannel() request = server.Request(d, 1) request.site = site request.sitepath = [] request.received_cookies[b'TWISTED_SESSION'] = b"does-not-exist" session = request.getSession() self.assertIsNotNone(session) self.addCleanup(session.expire) self.assertTrue(request.cookies[0].startswith(b'TWISTED_SESSION=')) # It should be a new session ID. self.assertNotIn(b"does-not-exist", request.cookies[0])
def test_getChildWithDefaultAuthorized(self): """ Resource traversal which encounters an L{HTTPAuthSessionWrapper} results in an L{IResource} which renders the L{IResource} avatar retrieved from the portal when the request has a valid I{Authorization} header. """ self.credentialFactories.append(BasicCredentialFactory('example.com')) request = self.makeRequest([self.childName]) child = self._authorizedBasicLogin(request) d = request.notifyFinish() def cbFinished(ignored): self.assertEqual(request.written, [self.childContent]) d.addCallback(cbFinished) request.render(child) return d
def _logoutTest(self): """ Issue a request for an authentication-protected resource using valid credentials and then return the C{DummyRequest} instance which was used. This is a helper for tests about the behavior of the logout callback. """ self.credentialFactories.append(BasicCredentialFactory('example.com')) class SlowerResource(Resource): def render(self, request): return NOT_DONE_YET self.avatar.putChild(self.childName, SlowerResource()) request = self.makeRequest([self.childName]) child = self._authorizedBasicLogin(request) request.render(child) self.assertEqual(self.realm.loggedOut, 0) return request
def test_anonymousAccess(self): """ Anonymous requests are allowed if a L{Portal} has an anonymous checker registered. """ unprotectedContents = b"contents of the unprotected child resource" self.avatars[ANONYMOUS] = Resource() self.avatars[ANONYMOUS].putChild( self.childName, Data(unprotectedContents, 'text/plain')) self.portal.registerChecker(AllowAnonymousAccess()) self.credentialFactories.append(BasicCredentialFactory('example.com')) request = self.makeRequest([self.childName]) child = getChildForRequest(self.wrapper, request) d = request.notifyFinish() def cbFinished(ignored): self.assertEqual(request.written, [unprotectedContents]) d.addCallback(cbFinished) request.render(child) return d
def test_render(self): """ L{ResourceScriptDirectory.getChild} returns a resource which renders a response with the HTTP 200 status code and the content of the rpy's C{request} global. """ tmp = FilePath(self.mktemp()) tmp.makedirs() tmp.child("test.rpy").setContent(b""" from twisted.web.resource import Resource class TestResource(Resource): isLeaf = True def render_GET(self, request): return b'ok' resource = TestResource()""") resource = ResourceScriptDirectory(tmp._asBytesPath()) request = DummyRequest([b'']) child = resource.getChild(b"test.rpy", request) d = _render(child, request) def cbRendered(ignored): self.assertEqual(b"".join(request.written), b"ok") d.addCallback(cbRendered) return d
def testDistrib(self): # site1 is the publisher r1 = resource.Resource() r1.putChild("there", static.Data("root", "text/plain")) site1 = server.Site(r1) self.f1 = PBServerFactory(distrib.ResourcePublisher(site1)) self.port1 = reactor.listenTCP(0, self.f1) self.sub = distrib.ResourceSubscription("127.0.0.1", self.port1.getHost().port) r2 = resource.Resource() r2.putChild("here", self.sub) f2 = MySite(r2) self.port2 = reactor.listenTCP(0, f2) agent = client.Agent(reactor) d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \ (self.port2.getHost().port,)) d.addCallback(client.readBody) d.addCallback(self.assertEqual, 'root') return d
def test_requestHeaders(self): """ The request headers are available on the request object passed to a distributed resource's C{render} method. """ requestHeaders = {} class ReportRequestHeaders(resource.Resource): def render(self, request): requestHeaders.update(dict( request.requestHeaders.getAllRawHeaders())) return "" request = self._requestTest( ReportRequestHeaders(), headers=Headers({'foo': ['bar']})) def cbRequested(result): self.assertEqual(requestHeaders['Foo'], ['bar']) request.addCallback(cbRequested) return request
def test_requestResponseCode(self): """ The response code can be set by the request object passed to a distributed resource's C{render} method. """ class SetResponseCode(resource.Resource): def render(self, request): request.setResponseCode(200) return "" request = self._requestAgentTest(SetResponseCode()) def cbRequested(result): self.assertEqual(result[0].data, "") self.assertEqual(result[1].code, 200) self.assertEqual(result[1].phrase, "OK") request.addCallback(cbRequested) return request
def test_requestResponseCodeMessage(self): """ The response code and message can be set by the request object passed to a distributed resource's C{render} method. """ class SetResponseCode(resource.Resource): def render(self, request): request.setResponseCode(200, "some-message") return "" request = self._requestAgentTest(SetResponseCode()) def cbRequested(result): self.assertEqual(result[0].data, "") self.assertEqual(result[1].code, 200) self.assertEqual(result[1].phrase, "some-message") request.addCallback(cbRequested) return request
def test_URLPath_traversedResource(self): app = self.app request = requestMock(b'/resource/foo') request_url = [None] class URLPathResource(Resource): def render(self, request): request_url[0] = request.URLPath() def getChild(self, request, segment): return self @app.route("/resource/", branch=True) def root(request): return URLPathResource() d = _render(self.kr, request) self.assertFired(d) self.assertEqual(str(request_url[0]), 'http://localhost:8080/resource/foo')
def __init__(self, channel_layer, root_path='', http_timeout=120, websocket_timeout=None, ping_interval=20, ping_timeout=30, ws_protocols=None, start_scheduler=True, use_proxy_headers=False, use_proxy_proto_header=False, use_x_sendfile=False): self.manager = ChannelLayerManager(channel_layer, start_scheduler=True) self.root_path = root_path self.http_timeout = http_timeout self.websocket_timeout = websocket_timeout or getattr(channel_layer, "group_expiry", 86400) self.ping_interval = ping_interval self.ping_timeout = ping_timeout self.ws_protocols = ws_protocols self.use_proxy_headers = use_proxy_headers self.use_proxy_proto_header = use_proxy_proto_header self.use_x_sendfile = use_x_sendfile resource.Resource.__init__(self)
def _render(resource, request): # type: (Resource, Union[DummyRequest, Request]) -> Any """ Simulate rendering of a Twisted resource. :param resource: Twisted Resource with render() method. :param request: Request (or mock object). :return: Deferred """ result = resource.render(request) if isinstance(result, bytes): request.write(result) request.finish() return succeed(None) elif result == NOT_DONE_YET: if request.finished: return succeed(None) else: return request.notifyFinish() else: raise ValueError("Unexpected return value: %r" % (result,))
def getResourceFor(self, request): # type: (Request) -> Resource """ Check if a route matches this request. Fall back to Twisted default lookup behavior otherwise. :param request: Twisted request instance :return: Resource to handle this request """ request.site = self for route_re, resource in self.routes.items(): path = request.path.decode() match = route_re.match(path) if match: request.path_args = match.groupdict() or match.groups() return resource return Site.getResourceFor(self, request)