我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.web.server.Site()。
def setUp(self): self.resrc = SimpleResource() self.resrc.putChild('', self.resrc) self.site = server.Site(self.resrc) self.site = server.Site(self.resrc) self.site.logFile = log.logfile # HELLLLLLLLLLP! This harness is Very Ugly. self.channel = self.site.buildProtocol(None) self.transport = http.StringTransport() self.transport.close = lambda *a, **kw: None self.transport.disconnecting = lambda *a, **kw: 0 self.transport.getPeer = lambda *a, **kw: "peer" self.transport.getHost = lambda *a, **kw: "host" self.channel.makeConnection(self.transport) for l in ["GET / HTTP/1.1", "Accept: text/html"]: self.channel.lineReceived(l)
def testDistrib(self): # site1 is the publisher r1 = resource.Resource() r1.putChild("there", static.Data("root", "text/plain")) site1 = server.Site(r1) self.f1 = PBServerFactory(distrib.ResourcePublisher(site1)) self.port1 = reactor.listenTCP(0, self.f1) self.sub = distrib.ResourceSubscription("127.0.0.1", self.port1.getHost().port) r2 = resource.Resource() r2.putChild("here", self.sub) f2 = MySite(r2) self.port2 = reactor.listenTCP(0, f2) d = client.getPage("http://127.0.0.1:%d/here/there" % \ self.port2.getHost().port) d.addCallback(self.failUnlessEqual, 'root') return d
def setUp(self): name = str(id(self)) + "_webclient" if not os.path.exists(name): os.mkdir(name) f = open(os.path.join(name, "file"), "wb") f.write("0123456789") f.close() r = static.File(name) r.putChild("redirect", util.Redirect("/file")) r.putChild("wait", LongTimeTakingResource()) r.putChild("error", ErrorResource()) r.putChild("nolength", NoLengthResource()) r.putChild("host", HostHeaderResource()) r.putChild("payload", PayloadResource()) r.putChild("broken", BrokenDownloadResource()) site = server.Site(r, timeout=None) self.port = self._listen(site) self.portno = self.port.getHost().port
def setUp(self): plainRoot = static.Data('not me', 'text/plain') tlsRoot = static.Data('me neither', 'text/plain') plainSite = server.Site(plainRoot, timeout=None) tlsSite = server.Site(tlsRoot, timeout=None) from twisted import test self.tlsPort = reactor.listenSSL(0, tlsSite, contextFactory=ssl.DefaultOpenSSLContextFactory( sibpath(test.__file__, 'server.pem'), sibpath(test.__file__, 'server.pem'), ), interface="127.0.0.1") self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1") self.plainPortno = self.plainPort.getHost().port self.tlsPortno = self.tlsPort.getHost().port plainRoot.putChild('one', util.Redirect(self.getHTTPS('two'))) tlsRoot.putChild('two', util.Redirect(self.getHTTP('three'))) plainRoot.putChild('three', util.Redirect(self.getHTTPS('four'))) tlsRoot.putChild('four', static.Data('FOUND IT!', 'text/plain'))
def start(cls, net, factory, bitcoind, peer_ports, merged_urls): self = cls() self.n = node.Node(factory, bitcoind, [], [], net) yield self.n.start() self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports]) self.n.p2p_node.start() wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind) self.wb = wb web_root = resource.Resource() worker_interface.WorkerInterface(wb).attach_to(web_root) self.web_port = reactor.listenTCP(0, server.Site(web_root)) defer.returnValue(self)
def start(cls, net, factory, bitcoind, peer_ports, merged_urls): self = cls() self.n = node.Node(factory, bitcoind, [], [], net) yield self.n.start() self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports]) self.n.p2p_node.start() wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3) self.wb = wb web_root = resource.Resource() worker_interface.WorkerInterface(wb).attach_to(web_root) self.web_port = reactor.listenTCP(0, server.Site(web_root)) defer.returnValue(self)
def run(self): """setup the site, start listening on port, setup the looping call to :py:meth:`~.update_active_node` every ``self.poll_interval`` seconds, and start the Twisted reactor""" # get the active node before we start anything... self.active_node_ip_port = self.get_active_node() if self.active_node_ip_port is None: logger.critical("ERROR: Could not get active vault node from " "Consul. Exiting.") raise SystemExit(3) logger.warning("Initial Vault active node: %s", self.active_node_ip_port) site = Site(VaultRedirectorSite(self)) # setup our HTTP(S) listener if self.tls_factory is not None: self.listentls(site) else: self.listentcp(site) # setup the update_active_node poll every POLL_INTERVAL seconds self.add_update_loop() logger.warning('Starting Twisted reactor (event loop)') self.run_reactor()
def main(): # Setup the blockchain blockchain = LevelDBBlockchain(settings.LEVELDB_PATH) Blockchain.RegisterBlockchain(blockchain) dbloop = task.LoopingCall(Blockchain.Default().PersistBlocks) dbloop.start(.1) NodeLeader.Instance().Start() # Disable smart contract events for external smart contracts settings.set_log_smart_contract_events(False) # Start a thread with custom code d = threading.Thread(target=custom_background_code) d.setDaemon(True) # daemonizing the thread will kill it when the main thread is quit d.start() # Hook up Klein API to Twisted reactor endpoint_description = "tcp:port=%s:interface=localhost" % API_PORT endpoint = endpoints.serverFromString(reactor, endpoint_description) endpoint.listen(Site(app.resource())) # Run all the things (blocking call) logger.info("Everything setup and running. Waiting for events...") reactor.run() logger.info("Shutting down.")
def main(): parser = argparse.ArgumentParser() parser.add_argument('resource') args = parser.parse_args() module_name, name = args.resource.rsplit('.', 1) sys.path.append('.') resource = getattr(import_module(module_name), name)() http_port = reactor.listenTCP(PORT, Site(resource)) def print_listening(): host = http_port.getHost() print('Mock server {} running at http://{}:{}'.format( resource, host.host, host.port)) reactor.callWhenRunning(print_listening) reactor.run()
def main(): parser = argparse.ArgumentParser(description='VMWare metrics exporter for Prometheus') parser.add_argument('-c', '--config', dest='config_file', default='config.yml', help="configuration file") parser.add_argument('-p', '--port', dest='port', type=int, default=9272, help="HTTP port to expose metrics") args = parser.parse_args() # Start up the server to expose the metrics. root = Resource() root.putChild(b'metrics', VMWareMetricsResource(args)) factory = Site(root) print("Starting web server on port {}".format(args.port)) reactor.listenTCP(args.port, factory) reactor.run()
def setUp(self): yield super(TestBwscan, self).setUp() yield setconf_fetch_all_descs(self.tor) class DummyResource(Resource): isLeaf = True def render_GET(self, request): size = request.uri.split('/')[-1] if 'k' in size: size = int(size[:-1])*(2**10) elif 'M' in size: size = int(size[:-1])*(2**20) return 'a'*size self.port = yield available_tcp_port(reactor) self.test_service = yield reactor.listenTCP( self.port, Site(DummyResource()))
def setUp(self): yield super(TestStreamBandwidthListener, self).setUp() self.fetch_size = 8*2**20 # 8MB self.stream_bandwidth_listener = yield StreamBandwidthListener(self.tor) class DummyResource(Resource): isLeaf = True def render_GET(self, request): return 'a'*8*2**20 self.port = yield available_tcp_port(reactor) self.site = Site(DummyResource()) self.test_service = yield reactor.listenTCP(self.port, self.site) self.not_enough_measurements = NotEnoughMeasurements( "Not enough measurements to calculate STREAM_BW samples.")
def startService(self): """ Start TCP listener on designated HTTP port, serving ``HttpChannelContainer`` as root resource. """ # Don't start service twice if self.running == 1: return self.running = 1 # Prepare startup http_listen = self.settings.kotori.http_listen http_port = int(self.settings.kotori.http_port) log.info('Starting HTTP service on {http_listen}:{http_port}', http_listen=http_listen, http_port=http_port) # Configure root Site object and start listening to requests. # This must take place only once - can't bind to the same port multiple times! factory = LocalSite(self.root) reactor.listenTCP(http_port, factory, interface=http_listen)
def boot_frontend(config, debug=False): """ Boot a Pyramid WSGI application as Twisted component """ http_port = int(config.get('config-web', 'http_port')) websocket_uri = unicode(config.get('wamp', 'listen')) # https://stackoverflow.com/questions/13122519/serving-pyramid-application-using-twistd/13138610#13138610 config = resource_filename('kotori.frontend', 'development.ini') application = get_app(config, 'main') # https://twistedmatrix.com/documents/13.1.0/web/howto/web-in-60/wsgi.html resource = WSGIResource(reactor, reactor.getThreadPool(), application) reactor.listenTCP(http_port, Site(resource))
def test_sessionUIDGeneration(self): """ L{site.getSession} generates L{Session} objects with distinct UIDs from a secure source of entropy. """ site = server.Site(resource.Resource()) # Ensure that we _would_ use the unpredictable random source if the # test didn't stub it. self.assertIdentical(site._entropy, os.urandom) def predictableEntropy(n): predictableEntropy.x += 1 return (unichr(predictableEntropy.x) * n).encode("charmap") predictableEntropy.x = 0 self.patch(site, "_entropy", predictableEntropy) a = self.getAutoExpiringSession(site) b = self.getAutoExpiringSession(site) self.assertEqual(a.uid, b"01" * 0x20) self.assertEqual(b.uid, b"02" * 0x20) # This functionality is silly (the value is no longer used in session # generation), but 'counter' was a public attribute since time # immemorial so we should make sure if anyone was using it to get site # metrics or something it keeps working. self.assertEqual(site.counter, 2)
def setUp(self): self.resrc = SimpleResource() self.resrc.putChild(b'', self.resrc) self.resrc.putChild(b'with-content-type', SimpleResource(b'image/jpeg')) self.site = server.Site(self.resrc) self.site.startFactory() self.addCleanup(self.site.stopFactory) # HELLLLLLLLLLP! This harness is Very Ugly. self.channel = self.site.buildProtocol(None) self.transport = http.StringTransport() self.transport.close = lambda *a, **kw: None self.transport.disconnecting = lambda *a, **kw: 0 self.transport.getPeer = lambda *a, **kw: "peer" self.transport.getHost = lambda *a, **kw: "host" self.channel.makeConnection(self.transport)
def test_processingFailedNoTraceback(self): """ L{Request.processingFailed} when the site has C{displayTracebacks} set to C{False} does not write out the failure, but give a generic error message. """ d = DummyChannel() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.site.displayTracebacks = False fail = failure.Failure(Exception("Oh no!")) request.processingFailed(fail) self.assertNotIn(b"Oh no!", request.transport.written.getvalue()) self.assertIn( b"Processing Failed", request.transport.written.getvalue() ) # Since we didn't "handle" the exception, flush it to prevent a test # failure self.assertEqual(1, len(self.flushLoggedErrors()))
def test_processingFailedDisplayTracebackHandlesUnicode(self): """ L{Request.processingFailed} when the site has C{displayTracebacks} set to C{True} writes out the failure, making UTF-8 items into HTML entities. """ d = DummyChannel() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.site.displayTracebacks = True fail = failure.Failure(Exception(u"\u2603")) request.processingFailed(fail) self.assertIn(b"☃", request.transport.written.getvalue()) # On some platforms, we get a UnicodeError when trying to # display the Failure with twisted.python.log because # the default encoding cannot display u"\u2603". Windows for example # uses a default encodig of cp437 which does not support u"\u2603". self.flushLoggedErrors(UnicodeError) # Since we didn't "handle" the exception, flush it to prevent a test # failure self.assertEqual(1, len(self.flushLoggedErrors()))
def test_sessionDifferentFromSecureSession(self): """ L{Request.session} and L{Request.secure_session} should be two separate sessions with unique ids and different cookies. """ d = DummyChannel() d.transport = DummyChannel.SSL() request = server.Request(d, 1) request.site = server.Site(resource.Resource()) request.sitepath = [] secureSession = request.getSession() self.assertIsNotNone(secureSession) self.addCleanup(secureSession.expire) self.assertEqual(request.cookies[0].split(b"=")[0], b"TWISTED_SECURE_SESSION") session = request.getSession(forceNotSecure=True) self.assertIsNotNone(session) self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION") self.addCleanup(session.expire) self.assertNotEqual(session.uid, secureSession.uid)
def test_sessionAttribute(self): """ On a L{Request}, the C{session} attribute retrieves the associated L{Session} only if it has been initialized. If the request is secure, it retrieves the secure session. """ site = server.Site(resource.Resource()) d = DummyChannel() d.transport = DummyChannel.SSL() request = server.Request(d, 1) request.site = site request.sitepath = [] self.assertIs(request.session, None) insecureSession = request.getSession(forceNotSecure=True) self.addCleanup(insecureSession.expire) self.assertIs(request.session, None) secureSession = request.getSession() self.addCleanup(secureSession.expire) self.assertIsNot(secureSession, None) self.assertIsNot(secureSession, insecureSession) self.assertIs(request.session, secureSession)
def testDistrib(self): # site1 is the publisher r1 = resource.Resource() r1.putChild("there", static.Data("root", "text/plain")) site1 = server.Site(r1) self.f1 = PBServerFactory(distrib.ResourcePublisher(site1)) self.port1 = reactor.listenTCP(0, self.f1) self.sub = distrib.ResourceSubscription("127.0.0.1", self.port1.getHost().port) r2 = resource.Resource() r2.putChild("here", self.sub) f2 = MySite(r2) self.port2 = reactor.listenTCP(0, f2) agent = client.Agent(reactor) d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \ (self.port2.getHost().port,)) d.addCallback(client.readBody) d.addCallback(self.assertEqual, 'root') return d
def setUp(self): plainRoot = Data(b'not me', 'text/plain') tlsRoot = Data(b'me neither', 'text/plain') plainSite = server.Site(plainRoot, timeout=None) tlsSite = server.Site(tlsRoot, timeout=None) self.tlsPort = reactor.listenSSL( 0, tlsSite, contextFactory=ssl.DefaultOpenSSLContextFactory( serverPEMPath, serverPEMPath), interface="127.0.0.1") self.plainPort = reactor.listenTCP(0, plainSite, interface="127.0.0.1") self.plainPortno = self.plainPort.getHost().port self.tlsPortno = self.tlsPort.getHost().port plainRoot.putChild(b'one', Redirect(self.getHTTPS('two'))) tlsRoot.putChild(b'two', Redirect(self.getHTTP('three'))) plainRoot.putChild(b'three', Redirect(self.getHTTPS('four'))) tlsRoot.putChild(b'four', Data(b'FOUND IT!', 'text/plain'))
def test_getPageDeprecated(self): """ L{client.getPage} is deprecated. """ port = reactor.listenTCP( 0, server.Site(Data(b'', 'text/plain')), interface="127.0.0.1") portno = port.getHost().port self.addCleanup(port.stopListening) url = networkString("http://127.0.0.1:%d" % (portno,)) d = client.getPage(url) warningInfo = self.flushWarnings([self.test_getPageDeprecated]) self.assertEqual(len(warningInfo), 1) self.assertEqual(warningInfo[0]['category'], DeprecationWarning) self.assertEqual( warningInfo[0]['message'], "twisted.web.client.getPage was deprecated in " "Twisted 16.7.0; please use https://pypi.org/project/treq/ or twisted.web.client.Agent instead") return d.addErrback(lambda _: None)
def test_downloadPageDeprecated(self): """ L{client.downloadPage} is deprecated. """ port = reactor.listenTCP( 0, server.Site(Data(b'', 'text/plain')), interface="127.0.0.1") portno = port.getHost().port self.addCleanup(port.stopListening) url = networkString("http://127.0.0.1:%d" % (portno,)) path = FilePath(self.mktemp()) d = client.downloadPage(url, path.path) warningInfo = self.flushWarnings([self.test_downloadPageDeprecated]) self.assertEqual(len(warningInfo), 1) self.assertEqual(warningInfo[0]['category'], DeprecationWarning) self.assertEqual( warningInfo[0]['message'], "twisted.web.client.downloadPage was deprecated in " "Twisted 16.7.0; please use https://pypi.org/project/treq/ or twisted.web.client.Agent instead") return d.addErrback(lambda _: None)
def start(web_cfg): """ Start the web service """ static = Static(web_cfg) root_resource = Root(web_cfg, static) site = Site(root_resource) return site
def run(self, handler): from twisted.web import server, wsgi from twisted.python.threadpool import ThreadPool from twisted.internet import reactor thread_pool = ThreadPool() thread_pool.start() reactor.addSystemEventTrigger('after', 'shutdown', thread_pool.stop) factory = server.Site(wsgi.WSGIResource(reactor, thread_pool, handler)) reactor.listenTCP(self.port, factory, interface=self.host) if not reactor.running: reactor.run()
def listen(self, reactor, endpoint_description): """ Run the server, i.e. start listening for requests on the given host and port. :param reactor: The ``IReactorTCP`` to use. :param endpoint_description: The Twisted description for the endpoint to listen on. :return: A deferred that returns an object that provides ``IListeningPort``. """ endpoint = serverFromString(reactor, endpoint_description) return endpoint.listen(Site(self.app.resource()))
def start_twisted_server(self): class HelloResource(Resource): isLeaf = True def render_GET(self, request): return "Hello from twisted!" site = Site(HelloResource()) port = self.reactor.listenTCP(0, site, interface='127.0.0.1') self.twisted_port = port.getHost().port
def setUp(self): BaseTestCase.setUp(self) self.gw = twisted.TwistedGateway(expose_request=False) root = resource.Resource() root.putChild('', self.gw) self.p = reactor.listenTCP(0, server.Site(root), interface="127.0.0.1") self.port = self.p.getHost().port
def testSimplestSite(self): sres1 = SimpleResource() sres2 = SimpleResource() sres1.putChild("",sres2) site = server.Site(sres1) assert site.getResourceFor(DummyRequest([''])) is sres2, "Got the wrong resource."
def setUp(self): self.site = server.Site(SimpleResource())
def createServer(self, r): chan = DummyChannel() chan.transport = DummyChannel.TCP() chan.site = server.Site(r) return chan
def setUp(self): self.p = reactor.listenTCP(0, server.Site(Test()), interface="127.0.0.1") self.port = self.p.getHost().port
def setUp(self): self.p = reactor.listenTCP( 0, server.Site(Test(allowNone=True)), interface="127.0.0.1") self.port = self.p.getHost().port
def setUp(self): self.p = reactor.listenTCP(0, server.Site(TestAuthHeader()), interface="127.0.0.1") self.port = self.p.getHost().port
def setUp(self): xmlrpc = Test() addIntrospection(xmlrpc) self.p = reactor.listenTCP(0, server.Site(xmlrpc),interface="127.0.0.1") self.port = self.p.getHost().port
def setUp(self): self.resource = static.File(__file__) self.resource.isLeaf = True self.port = reactor.listenTCP(0, server.Site(self.resource), interface='127.0.0.1')
def setUp(self): root = static.Data('El toro!', 'text/plain') root.putChild("cookiemirror", CookieMirrorResource()) root.putChild("rawcookiemirror", RawCookieMirrorResource()) site = server.Site(root, timeout=None) self.port = self._listen(site) self.portno = self.port.getHost().port
def getResourceFor(self, req): res = server.Site.getResourceFor(self,req) self.caughtRes = res return res