Python twisted.web.resource 模块,Resource() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.web.resource.Resource()

项目:farfetchd    作者:isislovecruft    | 项目源码 | 文件源码
def setUp(self):
        """Create a :class:`CaptchaFetchResource`."""
        secretKey, publicKey = crypto.getRSAKey('captcha.key', bits=1024)

        # Set up our resources to fake a minimal HTTP(S) server:
        self.pagename = b'fetch'
        self.root = Resource()

        shutil.copytree('../captchas', os.path.sep.join([os.getcwd(), 'captchas']))

        self.captchaDir = os.path.sep.join([os.getcwd(), 'captchas'])
        self.captchaResource = server.CaptchaFetchResource(
            secretKey=secretKey,
            publicKey=publicKey,
            hmacKey='abcdefghijklmnopqrstuvwxyz012345',
            captchaDir=self.captchaDir,
            useForwardedHeader=True)

        self.root.putChild(self.pagename, self.captchaResource)

        # Set up the basic parts of our faked request:
        self.request = DummyRequest([self.pagename])
项目:farfetchd    作者:isislovecruft    | 项目源码 | 文件源码
def setUp(self):
        """Create a :class:`CaptchaFetchResource`.
        """
        secretKey, publicKey = crypto.getRSAKey('captcha.key', bits=1024)

        # Set up our resources to fake a minimal HTTP(S) server:
        self.pagename = b'check'
        self.root = Resource()

        shutil.copytree('../captchas', os.path.sep.join([os.getcwd(), 'captchas']))

        self.captchaDir = os.path.sep.join([os.getcwd(), 'captchas'])
        self.captchaResource = server.CaptchaCheckResource(
            secretKey=secretKey,
            publicKey=publicKey,
            hmacKey='abcdefghijklmnopqrstuvwxyz012345',
            useForwardedHeader=True)

        self.root.putChild(self.pagename, self.captchaResource)

        # Set up the basic parts of our faked request:
        self.request = DummyRequest([self.pagename])
项目:farfetchd    作者:isislovecruft    | 项目源码 | 文件源码
def getClientIP(request, useForwardedHeader=False):
    """Get the client's IP address from the ``'X-Forwarded-For:'``
    header, or from the :api:`request <twisted.web.server.Request>`.

    :type request: :api:`twisted.web.http.Request`
    :param request: A ``Request`` for a :api:`twisted.web.resource.Resource`.
    :param bool useForwardedHeader: If ``True``, attempt to get the client's
        IP address from the ``'X-Forwarded-For:'`` header.
    :rtype: ``None`` or :any:`str`
    :returns: The client's IP address, if it was obtainable.
    """
    ip = None

    if useForwardedHeader:
        header = request.getHeader("X-Forwarded-For")
        if header:
            ip = header.split(",")[-1].strip()
            if not isIPAddress(ip):
                log.msg("Got weird X-Forwarded-For value %r" % header)
                ip = None
    else:
        ip = request.getClientIP()

    return ip
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, m, inputhandlers=None, view=None, controllers=None, templateDirectory = None):
        #self.start = now()
        resource.Resource.__init__(self)
        self.model = m
        # It's the responsibility of the calling code to make sure setView is
        # called on this controller before it's rendered.
        self.view = None
        self.subcontrollers = []
        if self.setupStacks:
            self.setupControllerStack()
        if inputhandlers is None and controllers is None:
            self._inputhandlers = []
        elif inputhandlers:
            print "The inputhandlers arg is deprecated, please use controllers instead"
            self._inputhandlers = inputhandlers
        else:
            self._inputhandlers = controllers
        if templateDirectory is not None:
            self.templateDirectory = templateDirectory
        self._valid = {}
        self._invalid = {}
        self._process = {}
        self._parent = None
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def ResourceScript(path, registry):
    """
    I am a normal py file which must define a 'resource' global, which should
    be an instance of (a subclass of) web.resource.Resource; it will be
    renderred.
    """
    cs = CacheScanner(path, registry)
    glob = {'__file__': path,
            'resource': noRsrc,
            'registry': registry,
            'cache': cs.cache,
            'recache': cs.recache}
    try:
        execfile(path, glob, glob)
    except AlreadyCached, ac:
        return ac.args[0]
    rsrc = glob['resource']
    if cs.doCache and rsrc is not noRsrc:
        registry.cachePath(path, rsrc)
    return rsrc
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        d = client.getPage("http://127.0.0.1:%d/here/there" % \
                           self.port2.getHost().port)
        d.addCallback(self.failUnlessEqual, 'root')
        return d
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def __init__(self, path, defaultType="text/html", ignoredExts=(), registry=None, allowExt=0):
        """Create a file with the given path.
        """
        resource.Resource.__init__(self)
        filepath.FilePath.__init__(self, path)
        # Remove the dots from the path to split
        self.defaultType = defaultType
        if ignoredExts in (0, 1) or allowExt:
            warnings.warn("ignoredExts should receive a list, not a boolean")
            if ignoredExts or allowExt:
                self.ignoredExts = ['*']
            else:
                self.ignoredExts = []
        else:
            self.ignoredExts = list(ignoredExts)
        self.registry = registry or Registry()
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def alias(aliasPath, sourcePath):
    """
    I am not a very good aliaser. But I'm the best I can be. If I'm
    aliasing to a Resource that generates links, and it uses any parts
    of request.prepath to do so, the links will not be relative to the
    aliased path, but rather to the aliased-to path. That I can't
    alias static.File directory listings that nicely. However, I can
    still be useful, as many resources will play nice.
    """
    sourcePath = sourcePath.split('/')
    aliasPath = aliasPath.split('/')
    def rewriter(request):
        if request.postpath[:len(aliasPath)] == aliasPath:
            after = request.postpath[len(aliasPath):]
            request.postpath = sourcePath + after
            request.path = '/'+'/'.join(request.prepath+request.postpath)
    return rewriter
项目:p2pool-bch    作者:amarian12    | 项目源码 | 文件源码
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
项目:p2pool-bch    作者:amarian12    | 项目源码 | 文件源码
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
项目:p2pool-unitus    作者:amarian12    | 项目源码 | 文件源码
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
项目:p2pool-unitus    作者:amarian12    | 项目源码 | 文件源码
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
项目:flowder    作者:amir-khakshour    | 项目源码 | 文件源码
def __init__(self, app, config):
        resource.Resource.__init__(self)
        self.app = app
        self.IApp = IServiceCollection(app, app)
        self.debug = config.getboolean('debug', False)
        self.nodename = config.get('node_name', socket.gethostname())
        static_serve_path = config.get('static_serve_path', 'files')
        storage_path = config.get('storage_path')

        self.putChild('', Home(self))
        self.putChild('jobs', Jobs(self))
        self.putChild(static_serve_path, File(storage_path))

        services = config.items('services', ())
        for servName, servClsName in services:
            servCls = load_object(servClsName)
            self.putChild(servName, servCls(self))
项目:p2pool-dgb-sha256    作者:ilsawa    | 项目源码 | 文件源码
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
项目:p2pool-dgb-sha256    作者:ilsawa    | 项目源码 | 文件源码
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
项目:wslink    作者:Kitware    | 项目源码 | 文件源码
def handle_complex_resource_path(path, root, resource):
    # Handle complex endpoint. Twisted expects byte-type URIs.
    fullpath = path.encode('utf-8').split(b'/')
    parent_path_item_resource = root
    for path_item in fullpath:
        if path_item == fullpath[-1]:
            parent_path_item_resource.putChild(path_item, resource)
        else:
            new_resource = Resource()
            parent_path_item_resource.putChild(path_item, new_resource)
            parent_path_item_resource = new_resource


# =============================================================================
# Start webserver
# =============================================================================
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, m, inputhandlers=None, view=None, controllers=None, templateDirectory = None):
        #self.start = now()
        resource.Resource.__init__(self)
        self.model = m
        # It's the responsibility of the calling code to make sure setView is
        # called on this controller before it's rendered.
        self.view = None
        self.subcontrollers = []
        if self.setupStacks:
            self.setupControllerStack()
        if inputhandlers is None and controllers is None:
            self._inputhandlers = []
        elif inputhandlers:
            print "The inputhandlers arg is deprecated, please use controllers instead"
            self._inputhandlers = inputhandlers
        else:
            self._inputhandlers = controllers
        if templateDirectory is not None:
            self.templateDirectory = templateDirectory
        self._valid = {}
        self._invalid = {}
        self._process = {}
        self._parent = None
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def ResourceScript(path, registry):
    """
    I am a normal py file which must define a 'resource' global, which should
    be an instance of (a subclass of) web.resource.Resource; it will be
    renderred.
    """
    cs = CacheScanner(path, registry)
    glob = {'__file__': path,
            'resource': noRsrc,
            'registry': registry,
            'cache': cs.cache,
            'recache': cs.recache}
    try:
        execfile(path, glob, glob)
    except AlreadyCached, ac:
        return ac.args[0]
    rsrc = glob['resource']
    if cs.doCache and rsrc is not noRsrc:
        registry.cachePath(path, rsrc)
    return rsrc
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        d = client.getPage("http://127.0.0.1:%d/here/there" % \
                           self.port2.getHost().port)
        d.addCallback(self.failUnlessEqual, 'root')
        return d
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def __init__(self, path, defaultType="text/html", ignoredExts=(), registry=None, allowExt=0):
        """Create a file with the given path.
        """
        resource.Resource.__init__(self)
        filepath.FilePath.__init__(self, path)
        # Remove the dots from the path to split
        self.defaultType = defaultType
        if ignoredExts in (0, 1) or allowExt:
            warnings.warn("ignoredExts should receive a list, not a boolean")
            if ignoredExts or allowExt:
                self.ignoredExts = ['*']
            else:
                self.ignoredExts = []
        else:
            self.ignoredExts = list(ignoredExts)
        self.registry = registry or Registry()
项目:sslstrip-hsts-openwrt    作者:adde88    | 项目源码 | 文件源码
def alias(aliasPath, sourcePath):
    """
    I am not a very good aliaser. But I'm the best I can be. If I'm
    aliasing to a Resource that generates links, and it uses any parts
    of request.prepath to do so, the links will not be relative to the
    aliased path, but rather to the aliased-to path. That I can't
    alias static.File directory listings that nicely. However, I can
    still be useful, as many resources will play nice.
    """
    sourcePath = sourcePath.split('/')
    aliasPath = aliasPath.split('/')
    def rewriter(request):
        if request.postpath[:len(aliasPath)] == aliasPath:
            after = request.postpath[len(aliasPath):]
            request.postpath = sourcePath + after
            request.path = '/'+'/'.join(request.prepath+request.postpath)
    return rewriter
项目:p2pool-ltc    作者:ilsawa    | 项目源码 | 文件源码
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3, args=math.Object(donation_percentage=random.uniform(0, 10), address='foo', worker_fee=3, timeaddresses=1000), pubkeys=main.keypool(), bitcoind=bitcoind)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
项目:p2pool-ltc    作者:ilsawa    | 项目源码 | 文件源码
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
项目:p2pool-bsty    作者:amarian12    | 项目源码 | 文件源码
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
项目:p2pool-bsty    作者:amarian12    | 项目源码 | 文件源码
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
项目:p2pool-cann    作者:ilsawa    | 项目源码 | 文件源码
def start(cls, net, factory, bitcoind, peer_ports, merged_urls):
        self = cls()

        self.n = node.Node(factory, bitcoind, [], [], net)
        yield self.n.start()

        self.n.p2p_node = node.P2PNode(self.n, port=0, max_incoming_conns=1000000, addr_store={}, connect_addrs=[('127.0.0.1', peer_port) for peer_port in peer_ports])
        self.n.p2p_node.start()

        wb = work.WorkerBridge(node=self.n, my_pubkey_hash=random.randrange(2**160), donation_percentage=random.uniform(0, 10), merged_urls=merged_urls, worker_fee=3)
        self.wb = wb
        web_root = resource.Resource()
        worker_interface.WorkerInterface(wb).attach_to(web_root)
        self.web_port = reactor.listenTCP(0, server.Site(web_root))

        defer.returnValue(self)
项目:p2pool-cann    作者:ilsawa    | 项目源码 | 文件源码
def render(self, request):
        def finish(x):
            if request.channel is None: # disconnected
                return
            if x is not None:
                request.write(x)
            request.finish()

        def finish_error(fail):
            if request.channel is None: # disconnected
                return
            request.setResponseCode(500) # won't do anything if already written to
            request.write('---ERROR---')
            request.finish()
            log.err(fail, "Error in DeferredResource handler:")

        defer.maybeDeferred(resource.Resource.render, self, request).addCallbacks(finish, finish_error)
        return server.NOT_DONE_YET
项目:vmware_exporter    作者:rverchere    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description='VMWare metrics exporter for Prometheus')
    parser.add_argument('-c', '--config', dest='config_file',
                        default='config.yml', help="configuration file")
    parser.add_argument('-p', '--port', dest='port', type=int,
                        default=9272, help="HTTP port to expose metrics")

    args = parser.parse_args()

    # Start up the server to expose the metrics.
    root = Resource()
    root.putChild(b'metrics', VMWareMetricsResource(args))

    factory = Site(root)
    print("Starting web server on port {}".format(args.port))
    reactor.listenTCP(args.port, factory)
    reactor.run()
项目:bwscanner    作者:TheTorProject    | 项目源码 | 文件源码
def setUp(self):
        yield super(TestBwscan, self).setUp()
        yield setconf_fetch_all_descs(self.tor)

        class DummyResource(Resource):
            isLeaf = True
            def render_GET(self, request):
                size = request.uri.split('/')[-1]
                if 'k' in size:
                    size = int(size[:-1])*(2**10)
                elif 'M' in size:
                    size = int(size[:-1])*(2**20)
                return 'a'*size

        self.port = yield available_tcp_port(reactor)
        self.test_service = yield reactor.listenTCP(
            self.port, Site(DummyResource()))
项目:bwscanner    作者:TheTorProject    | 项目源码 | 文件源码
def setUp(self):
        yield super(TestStreamBandwidthListener, self).setUp()
        self.fetch_size = 8*2**20 # 8MB
        self.stream_bandwidth_listener = yield StreamBandwidthListener(self.tor)

        class DummyResource(Resource):
            isLeaf = True
            def render_GET(self, request):
                return 'a'*8*2**20

        self.port = yield available_tcp_port(reactor)
        self.site = Site(DummyResource())
        self.test_service = yield reactor.listenTCP(self.port, self.site)

        self.not_enough_measurements = NotEnoughMeasurements(
            "Not enough measurements to calculate STREAM_BW samples.")
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def ResourceScript(path, registry):
    """
    I am a normal py file which must define a 'resource' global, which should
    be an instance of (a subclass of) web.resource.Resource; it will be
    renderred.
    """
    cs = CacheScanner(path, registry)
    glob = {'__file__': _coerceToFilesystemEncoding("", path),
            'resource': noRsrc,
            'registry': registry,
            'cache': cs.cache,
            'recache': cs.recache}
    try:
        execfile(path, glob, glob)
    except AlreadyCached as ac:
        return ac.args[0]
    rsrc = glob['resource']
    if cs.doCache and rsrc is not noRsrc:
        registry.cachePath(path, rsrc)
    return rsrc
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_sessionUIDGeneration(self):
        """
        L{site.getSession} generates L{Session} objects with distinct UIDs from
        a secure source of entropy.
        """
        site = server.Site(resource.Resource())
        # Ensure that we _would_ use the unpredictable random source if the
        # test didn't stub it.
        self.assertIdentical(site._entropy, os.urandom)

        def predictableEntropy(n):
            predictableEntropy.x += 1
            return (unichr(predictableEntropy.x) * n).encode("charmap")
        predictableEntropy.x = 0
        self.patch(site, "_entropy", predictableEntropy)
        a = self.getAutoExpiringSession(site)
        b = self.getAutoExpiringSession(site)
        self.assertEqual(a.uid, b"01" * 0x20)
        self.assertEqual(b.uid, b"02" * 0x20)
        # This functionality is silly (the value is no longer used in session
        # generation), but 'counter' was a public attribute since time
        # immemorial so we should make sure if anyone was using it to get site
        # metrics or something it keeps working.
        self.assertEqual(site.counter, 2)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_processingFailedNoTraceback(self):
        """
        L{Request.processingFailed} when the site has C{displayTracebacks} set
        to C{False} does not write out the failure, but give a generic error
        message.
        """
        d = DummyChannel()
        request = server.Request(d, 1)
        request.site = server.Site(resource.Resource())
        request.site.displayTracebacks = False
        fail = failure.Failure(Exception("Oh no!"))
        request.processingFailed(fail)

        self.assertNotIn(b"Oh no!", request.transport.written.getvalue())
        self.assertIn(
            b"Processing Failed", request.transport.written.getvalue()
        )

        # Since we didn't "handle" the exception, flush it to prevent a test
        # failure
        self.assertEqual(1, len(self.flushLoggedErrors()))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_processingFailedDisplayTraceback(self):
        """
        L{Request.processingFailed} when the site has C{displayTracebacks} set
        to C{True} writes out the failure.
        """
        d = DummyChannel()
        request = server.Request(d, 1)
        request.site = server.Site(resource.Resource())
        request.site.displayTracebacks = True
        fail = failure.Failure(Exception("Oh no!"))
        request.processingFailed(fail)

        self.assertIn(b"Oh no!", request.transport.written.getvalue())

        # Since we didn't "handle" the exception, flush it to prevent a test
        # failure
        self.assertEqual(1, len(self.flushLoggedErrors()))
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_sessionDifferentFromSecureSession(self):
        """
        L{Request.session} and L{Request.secure_session} should be two separate
        sessions with unique ids and different cookies.
        """
        d = DummyChannel()
        d.transport = DummyChannel.SSL()
        request = server.Request(d, 1)
        request.site = server.Site(resource.Resource())
        request.sitepath = []
        secureSession = request.getSession()
        self.assertIsNotNone(secureSession)
        self.addCleanup(secureSession.expire)
        self.assertEqual(request.cookies[0].split(b"=")[0],
                         b"TWISTED_SECURE_SESSION")
        session = request.getSession(forceNotSecure=True)
        self.assertIsNotNone(session)
        self.assertEqual(request.cookies[1].split(b"=")[0], b"TWISTED_SESSION")
        self.addCleanup(session.expire)
        self.assertNotEqual(session.uid, secureSession.uid)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_sessionAttribute(self):
        """
        On a L{Request}, the C{session} attribute retrieves the associated
        L{Session} only if it has been initialized.  If the request is secure,
        it retrieves the secure session.
        """
        site = server.Site(resource.Resource())
        d = DummyChannel()
        d.transport = DummyChannel.SSL()
        request = server.Request(d, 1)
        request.site = site
        request.sitepath = []
        self.assertIs(request.session, None)
        insecureSession = request.getSession(forceNotSecure=True)
        self.addCleanup(insecureSession.expire)
        self.assertIs(request.session, None)
        secureSession = request.getSession()
        self.addCleanup(secureSession.expire)
        self.assertIsNot(secureSession, None)
        self.assertIsNot(secureSession, insecureSession)
        self.assertIs(request.session, secureSession)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_retrieveNonExistentSession(self):
        """
        L{Request.getSession} generates a new session if the relevant cookie is
        set in the incoming request.
        """
        site = server.Site(resource.Resource())
        d = DummyChannel()
        request = server.Request(d, 1)
        request.site = site
        request.sitepath = []
        request.received_cookies[b'TWISTED_SESSION'] = b"does-not-exist"
        session = request.getSession()
        self.assertIsNotNone(session)
        self.addCleanup(session.expire)
        self.assertTrue(request.cookies[0].startswith(b'TWISTED_SESSION='))
        # It should be a new session ID.
        self.assertNotIn(b"does-not-exist", request.cookies[0])
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_getChildWithDefaultAuthorized(self):
        """
        Resource traversal which encounters an L{HTTPAuthSessionWrapper}
        results in an L{IResource} which renders the L{IResource} avatar
        retrieved from the portal when the request has a valid I{Authorization}
        header.
        """
        self.credentialFactories.append(BasicCredentialFactory('example.com'))
        request = self.makeRequest([self.childName])
        child = self._authorizedBasicLogin(request)
        d = request.notifyFinish()
        def cbFinished(ignored):
            self.assertEqual(request.written, [self.childContent])
        d.addCallback(cbFinished)
        request.render(child)
        return d
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def _logoutTest(self):
        """
        Issue a request for an authentication-protected resource using valid
        credentials and then return the C{DummyRequest} instance which was
        used.

        This is a helper for tests about the behavior of the logout
        callback.
        """
        self.credentialFactories.append(BasicCredentialFactory('example.com'))

        class SlowerResource(Resource):
            def render(self, request):
                return NOT_DONE_YET

        self.avatar.putChild(self.childName, SlowerResource())
        request = self.makeRequest([self.childName])
        child = self._authorizedBasicLogin(request)
        request.render(child)
        self.assertEqual(self.realm.loggedOut, 0)
        return request
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_anonymousAccess(self):
        """
        Anonymous requests are allowed if a L{Portal} has an anonymous checker
        registered.
        """
        unprotectedContents = b"contents of the unprotected child resource"

        self.avatars[ANONYMOUS] = Resource()
        self.avatars[ANONYMOUS].putChild(
            self.childName, Data(unprotectedContents, 'text/plain'))
        self.portal.registerChecker(AllowAnonymousAccess())

        self.credentialFactories.append(BasicCredentialFactory('example.com'))
        request = self.makeRequest([self.childName])
        child = getChildForRequest(self.wrapper, request)
        d = request.notifyFinish()
        def cbFinished(ignored):
            self.assertEqual(request.written, [unprotectedContents])
        d.addCallback(cbFinished)
        request.render(child)
        return d
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_render(self):
        """
        L{ResourceScriptDirectory.getChild} returns a resource which renders a
        response with the HTTP 200 status code and the content of the rpy's
        C{request} global.
        """
        tmp = FilePath(self.mktemp())
        tmp.makedirs()
        tmp.child("test.rpy").setContent(b"""
from twisted.web.resource import Resource
class TestResource(Resource):
    isLeaf = True
    def render_GET(self, request):
        return b'ok'
resource = TestResource()""")
        resource = ResourceScriptDirectory(tmp._asBytesPath())
        request = DummyRequest([b''])
        child = resource.getChild(b"test.rpy", request)
        d = _render(child, request)
        def cbRendered(ignored):
            self.assertEqual(b"".join(request.written), b"ok")
        d.addCallback(cbRendered)
        return d
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def testDistrib(self):
        # site1 is the publisher
        r1 = resource.Resource()
        r1.putChild("there", static.Data("root", "text/plain"))
        site1 = server.Site(r1)
        self.f1 = PBServerFactory(distrib.ResourcePublisher(site1))
        self.port1 = reactor.listenTCP(0, self.f1)
        self.sub = distrib.ResourceSubscription("127.0.0.1",
                                                self.port1.getHost().port)
        r2 = resource.Resource()
        r2.putChild("here", self.sub)
        f2 = MySite(r2)
        self.port2 = reactor.listenTCP(0, f2)
        agent = client.Agent(reactor)
        d = agent.request(b"GET", "http://127.0.0.1:%d/here/there" % \
                          (self.port2.getHost().port,))
        d.addCallback(client.readBody)
        d.addCallback(self.assertEqual, 'root')
        return d
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_requestHeaders(self):
        """
        The request headers are available on the request object passed to a
        distributed resource's C{render} method.
        """
        requestHeaders = {}

        class ReportRequestHeaders(resource.Resource):
            def render(self, request):
                requestHeaders.update(dict(
                    request.requestHeaders.getAllRawHeaders()))
                return ""

        request = self._requestTest(
            ReportRequestHeaders(), headers=Headers({'foo': ['bar']}))
        def cbRequested(result):
            self.assertEqual(requestHeaders['Foo'], ['bar'])
        request.addCallback(cbRequested)
        return request
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_requestResponseCode(self):
        """
        The response code can be set by the request object passed to a
        distributed resource's C{render} method.
        """
        class SetResponseCode(resource.Resource):
            def render(self, request):
                request.setResponseCode(200)
                return ""

        request = self._requestAgentTest(SetResponseCode())
        def cbRequested(result):
            self.assertEqual(result[0].data, "")
            self.assertEqual(result[1].code, 200)
            self.assertEqual(result[1].phrase, "OK")
        request.addCallback(cbRequested)
        return request
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_requestResponseCodeMessage(self):
        """
        The response code and message can be set by the request object passed to
        a distributed resource's C{render} method.
        """
        class SetResponseCode(resource.Resource):
            def render(self, request):
                request.setResponseCode(200, "some-message")
                return ""

        request = self._requestAgentTest(SetResponseCode())
        def cbRequested(result):
            self.assertEqual(result[0].data, "")
            self.assertEqual(result[1].code, 200)
            self.assertEqual(result[1].phrase, "some-message")
        request.addCallback(cbRequested)
        return request
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def alias(aliasPath, sourcePath):
    """
    I am not a very good aliaser. But I'm the best I can be. If I'm
    aliasing to a Resource that generates links, and it uses any parts
    of request.prepath to do so, the links will not be relative to the
    aliased path, but rather to the aliased-to path. That I can't
    alias static.File directory listings that nicely. However, I can
    still be useful, as many resources will play nice.
    """
    sourcePath = sourcePath.split('/')
    aliasPath = aliasPath.split('/')
    def rewriter(request):
        if request.postpath[:len(aliasPath)] == aliasPath:
            after = request.postpath[len(aliasPath):]
            request.postpath = sourcePath + after
            request.path = '/'+'/'.join(request.prepath+request.postpath)
    return rewriter
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def test_URLPath_traversedResource(self):
        app = self.app
        request = requestMock(b'/resource/foo')

        request_url = [None]

        class URLPathResource(Resource):
            def render(self, request):
                request_url[0] = request.URLPath()

            def getChild(self, request, segment):
                return self

        @app.route("/resource/", branch=True)
        def root(request):
            return URLPathResource()

        d = _render(self.kr, request)

        self.assertFired(d)
        self.assertEqual(str(request_url[0]),
            'http://localhost:8080/resource/foo')
项目:txasgiresource    作者:JohnDoee    | 项目源码 | 文件源码
def __init__(self,
                 channel_layer,
                 root_path='',
                 http_timeout=120,
                 websocket_timeout=None,
                 ping_interval=20,
                 ping_timeout=30,
                 ws_protocols=None,
                 start_scheduler=True,
                 use_proxy_headers=False,
                 use_proxy_proto_header=False,
                 use_x_sendfile=False):
        self.manager = ChannelLayerManager(channel_layer, start_scheduler=True)
        self.root_path = root_path

        self.http_timeout = http_timeout
        self.websocket_timeout = websocket_timeout or getattr(channel_layer, "group_expiry", 86400)
        self.ping_interval = ping_interval
        self.ping_timeout = ping_timeout
        self.ws_protocols = ws_protocols
        self.use_proxy_headers = use_proxy_headers
        self.use_proxy_proto_header = use_proxy_proto_header
        self.use_x_sendfile = use_x_sendfile

        resource.Resource.__init__(self)
项目:retwist    作者:trustyou    | 项目源码 | 文件源码
def _render(resource, request):
    # type: (Resource, Union[DummyRequest, Request]) -> Any
    """
    Simulate rendering of a Twisted resource.
    :param resource: Twisted Resource with render() method.
    :param request: Request (or mock object).
    :return: Deferred 
    """
    result = resource.render(request)
    if isinstance(result, bytes):
        request.write(result)
        request.finish()
        return succeed(None)
    elif result == NOT_DONE_YET:
        if request.finished:
            return succeed(None)
        else:
            return request.notifyFinish()
    else:
        raise ValueError("Unexpected return value: %r" % (result,))
项目:retwist    作者:trustyou    | 项目源码 | 文件源码
def getResourceFor(self, request):
        # type: (Request) -> Resource
        """
        Check if a route matches this request. Fall back to Twisted default lookup behavior otherwise.
        :param request: Twisted request instance
        :return: Resource to handle this request
        """
        request.site = self

        for route_re, resource in self.routes.items():
            path = request.path.decode()
            match = route_re.match(path)
            if match:
                request.path_args = match.groupdict() or match.groups()
                return resource

        return Site.getResourceFor(self, request)