我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用cherrypy.lib()。
def default(self, *vpath, **params): rpcparams, rpcmethod = _xmlrpc.process_body() subhandler = self for attr in str(rpcmethod).split('.'): subhandler = getattr(subhandler, attr, None) if subhandler and getattr(subhandler, 'exposed', False): body = subhandler(*(vpath + rpcparams), **params) else: # https://github.com/cherrypy/cherrypy/issues/533 # if a method is not found, an xmlrpclib.Fault should be returned # raising an exception here will do that; see # cherrypy.lib.xmlrpcutil.on_error raise Exception('method "%s" is not supported' % attr) conf = cherrypy.serving.request.toolmaps['tools'].get('xmlrpc', {}) _xmlrpc.respond(body, conf.get('encoding', 'utf-8'), conf.get('allow_none', 0)) return cherrypy.serving.response.body
def default(self, *vpath, **params): rpcparams, rpcmethod = _xmlrpc.process_body() subhandler = self for attr in str(rpcmethod).split('.'): subhandler = getattr(subhandler, attr, None) if subhandler and getattr(subhandler, "exposed", False): body = subhandler(*(vpath + rpcparams), **params) else: # https://github.com/cherrypy/cherrypy/issues/533 # if a method is not found, an xmlrpclib.Fault should be returned # raising an exception here will do that; see # cherrypy.lib.xmlrpcutil.on_error raise Exception('method "%s" is not supported' % attr) conf = cherrypy.serving.request.toolmaps['tools'].get("xmlrpc", {}) _xmlrpc.respond(body, conf.get('encoding', 'utf-8'), conf.get('allow_none', 0)) return cherrypy.serving.response.body
def XMLRPCDispatcher(next_dispatcher=Dispatcher()): from cherrypy.lib import xmlrpcutil def xmlrpc_dispatch(path_info): path_info = xmlrpcutil.patched_path(path_info) return next_dispatcher(path_info) return xmlrpc_dispatch
def hooks_namespace(k, v): """Attach bare hooks declared in config.""" # Use split again to allow multiple hooks for a single # hookpoint per path (e.g. "hooks.before_handler.1"). # Little-known fact you only get from reading source ;) hookpoint = k.split('.', 1)[0] if isinstance(v, text_or_bytes): v = cherrypy.lib.attributes(v) if not isinstance(v, Hook): v = Hook(v) cherrypy.serving.request.hooks[hookpoint].append(v)
def hooks_namespace(k, v): """Attach bare hooks declared in config.""" # Use split again to allow multiple hooks for a single # hookpoint per path (e.g. "hooks.before_handler.1"). # Little-known fact you only get from reading source ;) hookpoint = k.split(".", 1)[0] if isinstance(v, text_or_bytes): v = cherrypy.lib.attributes(v) if not isinstance(v, Hook): v = Hook(v) cherrypy.serving.request.hooks[hookpoint].append(v)
def setup_server(): class Root: @cherrypy.expose def index(self): return "This is public." class DigestProtected: @cherrypy.expose def index(self): return "Hello %s, you've been authorized." % ( cherrypy.request.login) def fetch_users(): return {'test': 'test'} get_ha1 = cherrypy.lib.auth_digest.get_ha1_dict_plain(fetch_users()) conf = {'/digest': {'tools.auth_digest.on': True, 'tools.auth_digest.realm': 'localhost', 'tools.auth_digest.get_ha1': get_ha1, 'tools.auth_digest.key': 'a565c27146791cfb', 'tools.auth_digest.debug': 'True'}} root = Root() root.digest = DigestProtected() cherrypy.tree.mount(root, config=conf)
def test_1_Ram_Concurrency(self): self.getPage('/set_session_cls/cherrypy.lib.sessions.RamSession') self._test_Concurrency()
def test_2_File_Concurrency(self): self.getPage('/set_session_cls/cherrypy.lib.sessions.FileSession') self._test_Concurrency()
def finalize(self): """Transform headers (and cookies) into self.header_list. (Core)""" try: code, reason, _ = httputil.valid_status(self.status) except ValueError: raise cherrypy.HTTPError(500, sys.exc_info()[1].args[0]) headers = self.headers self.status = '%s %s' % (code, reason) self.output_status = ntob(str(code), 'ascii') + \ ntob(' ') + headers.encode(reason) if self.stream: # The upshot: wsgiserver will chunk the response if # you pop Content-Length (or set it explicitly to None). # Note that lib.static sets C-L to the file's st_size. if dict.get(headers, 'Content-Length') is None: dict.pop(headers, 'Content-Length', None) elif code < 200 or code in (204, 205, 304): # "All 1xx (informational), 204 (no content), # and 304 (not modified) responses MUST NOT # include a message-body." dict.pop(headers, 'Content-Length', None) self.body = ntob('') else: # Responses which are not streamed should have a Content-Length, # but allow user code to set Content-Length if desired. if dict.get(headers, 'Content-Length') is None: content = self.collapse_body() dict.__setitem__(headers, 'Content-Length', len(content)) # Transform our header dict into a list of tuples. self.header_list = h = headers.output() cookie = self.cookie.output() if cookie: for line in cookie.split('\r\n'): name, value = line.split(': ', 1) if isinstance(name, six.text_type): name = name.encode('ISO-8859-1') if isinstance(value, six.text_type): value = headers.encode(value) h.append((name, value))
def finalize(self): """Transform headers (and cookies) into self.header_list. (Core)""" try: code, reason, _ = httputil.valid_status(self.status) except ValueError: raise cherrypy.HTTPError(500, sys.exc_info()[1].args[0]) headers = self.headers self.status = "%s %s" % (code, reason) self.output_status = ntob(str(code), 'ascii') + \ ntob(" ") + headers.encode(reason) if self.stream: # The upshot: wsgiserver will chunk the response if # you pop Content-Length (or set it explicitly to None). # Note that lib.static sets C-L to the file's st_size. if dict.get(headers, 'Content-Length') is None: dict.pop(headers, 'Content-Length', None) elif code < 200 or code in (204, 205, 304): # "All 1xx (informational), 204 (no content), # and 304 (not modified) responses MUST NOT # include a message-body." dict.pop(headers, 'Content-Length', None) self.body = ntob("") else: # Responses which are not streamed should have a Content-Length, # but allow user code to set Content-Length if desired. if dict.get(headers, 'Content-Length') is None: content = self.collapse_body() dict.__setitem__(headers, 'Content-Length', len(content)) # Transform our header dict into a list of tuples. self.header_list = h = headers.output() cookie = self.cookie.output() if cookie: for line in cookie.split("\n"): if line.endswith("\r"): # Python 2.4 emits cookies joined by LF but 2.5+ by CRLF. line = line[:-1] name, value = line.split(": ", 1) if isinstance(name, six.text_type): name = name.encode("ISO-8859-1") if isinstance(value, six.text_type): value = headers.encode(value) h.append((name, value))
def _setup_server(cls, supervisor, conf): v = sys.version.split()[0] log.info("Python version used to run this test script: %s" % v) log.info("CherryPy version: %s" % cherrypy.__version__) if supervisor.scheme == "https": ssl = " (ssl)" else: ssl = "" log.info("HTTP server version: %s%s" % (supervisor.protocol, ssl)) log.info("PID: %s" % os.getpid()) cherrypy.server.using_apache = supervisor.using_apache cherrypy.server.using_wsgi = supervisor.using_wsgi if sys.platform[:4] == 'java': cherrypy.config.update({'server.nodelay': False}) if isinstance(conf, text_or_bytes): parser = cherrypy.lib.reprconf.Parser() conf = parser.dict_from_file(conf).get('global', {}) else: conf = conf or {} baseconf = conf.copy() baseconf.update({'server.socket_host': supervisor.host, 'server.socket_port': supervisor.port, 'server.protocol_version': supervisor.protocol, 'environment': "test_suite", }) if supervisor.scheme == "https": #baseconf['server.ssl_module'] = 'builtin' baseconf['server.ssl_certificate'] = serverpem baseconf['server.ssl_private_key'] = serverpem # helper must be imported lazily so the coverage tool # can run against module-level statements within cherrypy. # Also, we have to do "from cherrypy.test import helper", # exactly like each test module does, because a relative import # would stick a second instance of webtest in sys.modules, # and we wouldn't be able to globally override the port anymore. if supervisor.scheme == "https": webtest.WebCase.HTTP_CONN = HTTPSConnection return baseconf
def test_7_session_cookies(self): self.getPage('/set_session_cls/cherrypy.lib.sessions.RamSession') self.getPage('/clear') self.getPage('/session_cookie') # grab the cookie ID cookie_parts = dict([p.strip().split('=') for p in self.cookies[0][1].split(";")]) # Assert there is no 'expires' param self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) id1 = cookie_parts['temp'] self.assertEqual(copykeys(sessions.RamSession.cache), [id1]) # Send another request in the same "browser session". self.getPage('/session_cookie', self.cookies) cookie_parts = dict([p.strip().split('=') for p in self.cookies[0][1].split(";")]) # Assert there is no 'expires' param self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) self.assertBody(id1) self.assertEqual(copykeys(sessions.RamSession.cache), [id1]) # Simulate a browser close by just not sending the cookies self.getPage('/session_cookie') # grab the cookie ID cookie_parts = dict([p.strip().split('=') for p in self.cookies[0][1].split(";")]) # Assert there is no 'expires' param self.assertEqual(set(cookie_parts.keys()), set(['temp', 'Path'])) # Assert a new id has been generated... id2 = cookie_parts['temp'] self.assertNotEqual(id1, id2) self.assertEqual(set(sessions.RamSession.cache.keys()), set([id1, id2])) # Wait for the session.timeout on both sessions time.sleep(2.5) cache = copykeys(sessions.RamSession.cache) if cache: if cache == [id2]: self.fail("The second session did not time out.") else: self.fail("Unknown session id in cache: %r", cache)