我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.web.server.start()。
def _cli_parse(args): # pragma: no coverage from argparse import ArgumentParser parser = ArgumentParser(usage="usage: %sprog [options] package.module:app") opt = parser.add_argument opt('app', help='WSGI app entry point.') opt("--version", action="store_true", help="show version number.") opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.") opt("-s", "--server", default='wsgiref', help="use SERVER as backend.") opt("-p", "--plugin", action="append", help="install additional plugin/s.") opt("-c", "--conf", action="append", metavar="FILE", help="load config values from FILE.") opt("-C", "--param", action="append", metavar="NAME=VALUE", help="override config values.") opt("--debug", action="store_true", help="start server in debug mode.") opt("--reload", action="store_true", help="auto-reload on file changes.") cli_args = parser.parse_args(args) return cli_args, parser
def _itertokens(self, rule): offset, prefix = 0, '' for match in self.rule_syntax.finditer(rule): prefix += rule[offset:match.start()] g = match.groups() if g[2] is not None: depr(0, 13, "Use of old route syntax.", "Use <name> instead of :name in routes.") if len(g[0]) % 2: # Escaped wildcard prefix += match.group(0)[len(g[0]):] offset = match.end() continue if prefix: yield prefix, None, None name, filtr, conf = g[4:7] if g[2] is None else g[1:4] yield name, filtr or 'default', conf or None offset, prefix = match.end(), '' if offset <= len(rule) or prefix: yield prefix + rule[offset:], None, None
def parse_range_header(header, maxlen=0): """ Yield (start, end) ranges parsed from a HTTP Range header. Skip unsatisfiable ranges. The end index is non-inclusive.""" if not header or header[:6] != 'bytes=': return ranges = [r.split('-', 1) for r in header[6:].split(',') if '-' in r] for start, end in ranges: try: if not start: # bytes=-100 -> last 100 bytes start, end = max(0, maxlen - int(end)), maxlen elif not end: # bytes=100- -> all but the first 99 bytes start, end = int(start), maxlen else: # bytes=100-200 -> bytes 100-200 (inclusive) start, end = int(start), min(int(end) + 1, maxlen) if 0 <= start < end <= maxlen: yield start, end except ValueError: pass #: Header tokenizer used by _parse_http_header()
def run(self, handler): # pragma: no cover from cherrypy import wsgiserver self.options['bind_addr'] = (self.host, self.port) self.options['wsgi_app'] = handler certfile = self.options.get('certfile') if certfile: del self.options['certfile'] keyfile = self.options.get('keyfile') if keyfile: del self.options['keyfile'] server = wsgiserver.CherryPyWSGIServer(**self.options) if certfile: server.ssl_certificate = certfile if keyfile: server.ssl_private_key = keyfile try: server.start() finally: server.stop()
def translate(self): if self.offset: raise RuntimeError('Parser is a one time instance.') while True: m = self.re_split.search(self.source, pos=self.offset) if m: text = self.source[self.offset:m.start()] self.text_buffer.append(text) self.offset = m.end() if m.group(1): # Escape syntax line, sep, _ = self.source[self.offset:].partition('\n') self.text_buffer.append(self.source[m.start():m.start(1)] + m.group(2) + line + sep) self.offset += len(line + sep) continue self.flush_text() self.offset += self.read_code(self.source[self.offset:], multiline=bool(m.group(4))) else: break self.text_buffer.append(self.source[self.offset:]) self.flush_text() return ''.join(self.code_buffer)
def flush_text(self): text = ''.join(self.text_buffer) del self.text_buffer[:] if not text: return parts, pos, nl = [], 0, '\\\n' + ' ' * self.indent for m in self.re_inl.finditer(text): prefix, pos = text[pos:m.start()], m.end() if prefix: parts.append(nl.join(map(repr, prefix.splitlines(True)))) if prefix.endswith('\n'): parts[-1] += nl parts.append(self.process_inline(m.group(1).strip())) if pos < len(text): prefix = text[pos:] lines = prefix.splitlines(True) if lines[-1].endswith('\\\\\n'): lines[-1] = lines[-1][:-3] elif lines[-1].endswith('\\\\\r\n'): lines[-1] = lines[-1][:-4] parts.append(nl.join(map(repr, lines))) code = '_printlist((%s,))' % ', '.join(parts) self.lineno += code.count('\n') + 1 self.write_code(code)
def _itertokens(self, rule): offset, prefix = 0, '' for match in self.rule_syntax.finditer(rule): prefix += rule[offset:match.start()] g = match.groups() if len(g[0])%2: # Escaped wildcard prefix += match.group(0)[len(g[0]):] offset = match.end() continue if prefix: yield prefix, None, None name, filtr, conf = g[4:7] if g[2] is None else g[1:4] yield name, filtr or 'default', conf or None offset, prefix = match.end(), '' if offset <= len(rule) or prefix: yield prefix+rule[offset:], None, None
def parse_range_header(header, maxlen=0): ''' Yield (start, end) ranges parsed from a HTTP Range header. Skip unsatisfiable ranges. The end index is non-inclusive.''' if not header or header[:6] != 'bytes=': return ranges = [r.split('-', 1) for r in header[6:].split(',') if '-' in r] for start, end in ranges: try: if not start: # bytes=-100 -> last 100 bytes start, end = max(0, maxlen-int(end)), maxlen elif not end: # bytes=100- -> all but the first 99 bytes start, end = int(start), maxlen else: # bytes=100-200 -> bytes 100-200 (inclusive) start, end = int(start), min(int(end)+1, maxlen) if 0 <= start < end <= maxlen: yield start, end except ValueError: pass
def run(self, handler): # pragma: no cover import fapws._evwsgi as evwsgi from fapws import base, config port = self.port if float(config.SERVER_IDENT[-2:]) > 0.4: # fapws3 silently changed its API in 0.5 port = str(port) evwsgi.start(self.host, port) # fapws3 never releases the GIL. Complain upstream. I tried. No luck. if 'BOTTLE_CHILD' in os.environ and not self.quiet: _stderr("WARNING: Auto-reloading does not work with Fapws3.\n") _stderr(" (Fapws3 breaks python thread support)\n") evwsgi.set_base_module(base) def app(environ, start_response): environ['wsgi.multiprocess'] = False return handler(environ, start_response) evwsgi.wsgi_cb(('', app)) evwsgi.run()
def translate(self): if self.offset: raise RuntimeError('Parser is a one time instance.') while True: m = self.re_split.search(self.source[self.offset:]) if m: text = self.source[self.offset:self.offset+m.start()] self.text_buffer.append(text) self.offset += m.end() if m.group(1): # New escape syntax line, sep, _ = self.source[self.offset:].partition('\n') self.text_buffer.append(m.group(2)+m.group(5)+line+sep) self.offset += len(line+sep)+1 continue elif m.group(5): # Old escape syntax depr('Escape code lines with a backslash.') #0.12 line, sep, _ = self.source[self.offset:].partition('\n') self.text_buffer.append(m.group(2)+line+sep) self.offset += len(line+sep)+1 continue self.flush_text() self.read_code(multiline=bool(m.group(4))) else: break self.text_buffer.append(self.source[self.offset:]) self.flush_text() return ''.join(self.code_buffer)
def flush_text(self): text = ''.join(self.text_buffer) del self.text_buffer[:] if not text: return parts, pos, nl = [], 0, '\\\n'+' '*self.indent for m in self.re_inl.finditer(text): prefix, pos = text[pos:m.start()], m.end() if prefix: parts.append(nl.join(map(repr, prefix.splitlines(True)))) if prefix.endswith('\n'): parts[-1] += nl parts.append(self.process_inline(m.group(1).strip())) if pos < len(text): prefix = text[pos:] lines = prefix.splitlines(True) if lines[-1].endswith('\\\\\n'): lines[-1] = lines[-1][:-3] elif lines[-1].endswith('\\\\\r\n'): lines[-1] = lines[-1][:-4] parts.append(nl.join(map(repr, lines))) code = '_printlist((%s,))' % ', '.join(parts) self.lineno += code.count('\n')+1 self.write_code(code)
def parse_range_header(header, maxlen=0): """ Yield (start, end) ranges parsed from a HTTP Range header. Skip unsatisfiable ranges. The end index is non-inclusive.""" if not header or header[:6] != 'bytes=': return ranges = [r.split('-', 1) for r in header[6:].split(',') if '-' in r] for start, end in ranges: try: if not start: # bytes=-100 -> last 100 bytes start, end = max(0, maxlen-int(end)), maxlen elif not end: # bytes=100- -> all but the first 99 bytes start, end = int(start), maxlen else: # bytes=100-200 -> bytes 100-200 (inclusive) start, end = int(start), min(int(end)+1, maxlen) if 0 <= start < end <= maxlen: yield start, end except ValueError: pass
def translate(self): if self.offset: raise RuntimeError('Parser is a one time instance.') while True: m = self.re_split.search(self.source[self.offset:]) if m: text = self.source[self.offset:self.offset+m.start()] self.text_buffer.append(text) offs = self.offset self.offset += m.end() if m.group(1): # Escape syntax line, sep, _ = self.source[self.offset:].partition('\n') self.text_buffer.append(self.source[offs+m.start():offs+m.start(1)]+m.group(2)+line+sep) self.offset += len(line+sep) continue self.flush_text() self.read_code(multiline=bool(m.group(4))) else: break self.text_buffer.append(self.source[self.offset:]) self.flush_text() return ''.join(self.code_buffer)
def parse_rule(self, rule): ''' Parses a rule into a (name, filter, conf) token stream. If mode is None, name contains a static rule part. ''' offset, prefix = 0, '' for match in self.rule_syntax.finditer(rule): prefix += rule[offset:match.start()] g = match.groups() if len(g[0])%2: # Escaped wildcard prefix += match.group(0)[len(g[0]):] offset = match.end() continue if prefix: yield prefix, None, None name, filtr, conf = g[1:4] if not g[2] is None else g[4:7] if not filtr: filtr = self.default_filter yield name, filtr, conf or None offset, prefix = match.end(), '' if offset <= len(rule) or prefix: yield prefix+rule[offset:], None, None
def _cli_parse(args): from optparse import OptionParser parser = OptionParser( usage="usage: %prog [options] package.module:app") opt = parser.add_option opt("--version", action="store_true", help="show version number.") opt("-b", "--bind", metavar="ADDRESS", help="bind socket to ADDRESS.") opt("-s", "--server", default='wsgiref', help="use SERVER as backend.") opt("-p", "--plugin", action="append", help="install additional plugin/s.") opt("-c", "--conf", action="append", metavar="FILE", help="load config values from FILE.") opt("-C", "--param", action="append", metavar="NAME=VALUE", help="override config values.") opt("--debug", action="store_true", help="start server in debug mode.") opt("--reload", action="store_true", help="auto-reload on file changes.") opts, args = parser.parse_args(args[1:]) return opts, args, parser