我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用bottle.run()。
def __init__(self, configuration): self.client_queue = multiprocessing.Queue(0) self.apply_patch() self.logger = self.init_logger() if ["debug", "html", "content_type", "notify", "ports"] not in configuration: raise PJFMissingArgument() if configuration.debug: print("[\033[92mINFO\033[0m] Starting HTTP ({0}) and HTTPS ({1}) built-in server...".format( configuration.ports["servers"]["HTTP_PORT"], configuration.ports["servers"]["HTTPS_PORT"] )) if not configuration.content_type: configuration.content_type = False if not configuration.content_type: configuration.content_type = "application/json" self.config = configuration self.json = PJFFactory(configuration) self.https = SSLWSGIRefServer(host="0.0.0.0", port=self.config.ports["servers"]["HTTPS_PORT"]) self.http = WSGIRefServer(host="0.0.0.0", port=self.config.ports["servers"]["HTTP_PORT"]) self.httpsd = multiprocessing.Process(target=run, kwargs={"server": self.https, "quiet": True}) self.httpd = multiprocessing.Process(target=run, kwargs={"server": self.http, "quiet": True}) if self.config.fuzz_web: self.request_checker = Thread(target=self.request_pool, args=()) self.logger.debug("[{0}] - PJFServer successfully initialized".format(time.strftime("%H:%M:%S")))
def main(): if len(sys.argv) > 1: print "Serves the demo. Needs bottle.py to run. Will serve via bjoern" print "if installed, otherwise via wsgi_ref. Reads its configuration " print "from config.ini." return # load configuration port = int(CONFIG.get('port', 9090)) staticdir = os.path.join(os.path.dirname(__file__), 'static') # start web server print "Starting web server on localhost:%d..." % port app.route('/static/:path#.+#', callback=lambda path: bottle.static_file(path, root=staticdir)) try: import bjoern except ImportError: bjoern = None bottle.run(app, host='localhost', port=port, server='bjoern' if bjoern else 'wsgiref')
def run(self, handler): from wsgiref.simple_server import make_server, WSGIRequestHandler import ssl if self.quiet: class QuietHandler(WSGIRequestHandler): def log_request(*args, **kw): pass self.options['handler_class'] = QuietHandler srv = make_server(self.host, self.port, handler, **self.options) srv.socket = ssl.wrap_socket( srv.socket, keyfile=self.keyfile, certfile=self.certfile, ca_certs=self.cafile, cert_reqs=ssl.CERT_REQUIRED, server_side=True) srv.serve_forever()
def db_create(): "Create database tables" try: print("- DROPPING DATABASE") r.db_drop('census').run(conn()) except: pass print("- CREATING DATABASE") r.db_create('census').run(conn()) print("- CREATING USER TABLE") r.table_create('users', primary_key='identity').run(conn()) print("- CREATING USER INDEXES") r.table('users').index_create("identity_check", [r.row["identity_type"], r.row["identity"]]).run(conn()) r.table('users').index_wait("identity_check").run(conn()) r.table_create('result_cache', primary_key='name').run(conn()) print("- CREATING QUESTION TABLES") for page in config['PAGES']: for question in page.questions: r.table_create("%s_answers" % question.name, primary_key='user_identity').run(conn())
def do_delete(keyword): keyword = keyword.replace("_", " ") new_file = "" with open("keywords.txt", "r") as f: file = f.readlines() f.close() for line in file: line = line.rstrip('\n\t ') line = line.strip('\t ') line = line.strip('\r') if line == keyword: continue else: new_file += line +"\n" new_file = new_file.rstrip('\n') with open("keywords.txt", "w") as f: f.write(new_file) return "Keyword was deleted successfully.\n" # run api
def run(self, handler): class QuietHandler(WSGIRequestHandler): def log_request(*args, **kw): pass def log_error(self, format, *args): pass self.options['handler_class'] = QuietHandler srv = make_server(self.host, self.port, handler, **self.options) srv.serve_forever()
def run(self, handler): class QuietHandler(WSGIRequestHandler): def log_request(*args, **kw): pass def log_error(self, format, *args): pass self.options['handler_class'] = QuietHandler srv = make_server(self.host, self.port, handler, **self.options) srv.socket = ssl.wrap_socket(srv.socket, certfile=CERT_PATH, server_side=True) srv.serve_forever()
def run(self): """ Start the servers """ route("/")(self.serve) if self.config.html: route("/<filepath:path>")(self.custom_html) if self.config.fuzz_web: self.request_checker.start() self.httpd.start() self.httpsd.start()
def __init__(self, host, port): # associate the session to our app local_app = SessionMiddleware(bottle.app()) bottle.run( app=local_app, server='cherrypy', host=host, port=port, reloader=True )
def execute(): app = request.forms['app'] user = request.forms['user'] cid = request.forms['cid'] desc = request.forms['desc'] np = request.forms['np'] appmod = pickle.loads(request.forms['appmod']) # remove the appmod key del request.forms['appmod'] appmod.write_params(request.forms, user) # if preprocess is set run the preprocessor try: if appmod.preprocess: run_params, _, _ = appmod.read_params(user, cid) base_dir = os.path.join(user_dir, user, app) process.preprocess(run_params, appmod.preprocess, base_dir) if appmod.preprocess == "terra.in": appmod.outfn = "out"+run_params['casenum']+".00" except: return template('error', err="There was an error with the preprocessor") # submit job to queue try: priority = db(users.user==user).select(users.priority).first().priority uid = users(user=user).id jid = sched.qsub(app, cid, uid, np, priority, desc) return str(jid) #redirect("http://localhost:"+str(config.port)+"/case?app="+str(app)+"&cid="+str(cid)+"&jid="+str(jid)) except OSError: return "ERROR: a problem occurred"
def main(): sched.poll() run(host='0.0.0.0', port=config.port+1, debug=False)
def run(self, handler): server = wsgiserver.CherryPyWSGIServer((self.host, self.port), handler) server.ssl_adapter = SecuredSSLServer(ssl_cert, ssl_key) try: server.start() finally: server.stop()
def main(): setup.setup_common() model.connect() if FLAGS.run_cron_in_background: _run_cron_in_background() bottle.run( # wsgiref is unstable with reloader. # See: https://github.com/bottlepy/bottle/issues/155 server='paste', port=FLAGS.port, host='0.0.0.0', reloader=True)
def start_listening(): logger.info("Running bottle app: quiet=%s, port=%s, host='0.0.0.0'" % (quiet_bottle, port)) bottle.run(app=app, quiet=quiet_bottle, port=port, host='0.0.0.0')
def web_server_worker(data_q, config): wsapp = WService(queue=data_q, config=config) wrouter = WRouter(wsapp) wrouter.route_server(config.server()) # Start SSL-wrapped bottle logging.info('Starting Webservice server (daemon) ') sslsrv = SSLWSGIRefServer(host=config.server()['web']['host'], port=config.server()['web']['port']) bottle.run(server=sslsrv, debug=config.server()['web']['debug'], quiet=config.server()['web']['quiet']) return
def web_client_worker(data_q, config): wsapp = WService(queue=data_q, config=config) wrouter = WRouter(wsapp) wrouter.route_client(config.client()) # Start SSL-wrapped bottle sslsrv = SSLWSGIRefServer(host=config.client()['web']['host'], port=config.client()['web']['port']) logging.info('Starting Webservice client (daemon) ') bottle.run(server=sslsrv, debug=config.client()['web']['debug'], quiet=config.client()['web']['quiet']) return
def main(): parser = argparse.ArgumentParser(description='') # @TODO fill in # server options parser.add_argument('-host', '--host', default="localhost", help='host for the server') parser.add_argument('-p', '--port', default=8080, help='port for the server') parser.add_argument('--debug', action='store_true', dest="debug", help='Debug mode (autoreloads the server)') parser.add_argument('--server', default="auto", help='Server backend to use (see http://bottlepy.org/docs/dev/deployment.html#switching-the-server-backend)') # elasticsearch options parser.add_argument('--es-host', default="localhost", help='host for the elasticsearch instance') parser.add_argument('--es-port', default=9200, help='port for the elasticsearch instance') parser.add_argument('--es-url-prefix', default='', help='Elasticsearch url prefix') parser.add_argument('--es-use-ssl', action='store_true', help='Use ssl to connect to elasticsearch') parser.add_argument('--es-index', default='charitysearch', help='index used to store charity data') parser.add_argument('--es-type', default='charity', help='type used to store charity data') args = parser.parse_args() app.config["es"] = Elasticsearch( host=args.es_host, port=args.es_port, url_prefix=args.es_url_prefix, use_ssl=args.es_use_ssl ) app.config["es_index"] = args.es_index app.config["es_type"] = args.es_type bottle.debug(args.debug) bottle.run(app, server=args.server, host=args.host, port=args.port, reloader=args.debug)
def local(reload, port): """run local app server, assumes into the account """ import logging from bottle import run from app import controller, app from c7n.resources import load_resources load_resources() print("Loaded resources definitions") logging.basicConfig(level=logging.DEBUG) logging.getLogger('botocore').setLevel(logging.WARNING) if controller.db.provision(): print("Table Created") run(app, reloader=reload, port=port)
def delete(entry_id): entry = GuestbookEntry.get(entry_id) if not entry: return "<h1>Entry not found.</h1>" entry.delete() return redirect("/") # [END delete-route] # [START run]
def do_login(): username = request.forms.username password = request.forms.password user = User.login(username, password) if not user: return {"error": "Invalid credentials."} create_session(user) return redirect("/") # [END login] # [START run]
def run(self): print("shutter dash")
def __init__(self, bot, **kwargs): super().__init__(**kwargs) self.bot = bot self.file = os.path.join('data', 'ricedb.json') datadir = os.path.dirname(self.file) try: with open(self.file, 'r') as fd: self.update(json.load(fd)) except FileNotFoundError: # Database file itself doesn't need to exist on first run, it will be created on first write. if not os.path.exists(datadir): os.mkdir(datadir) self.bot.log.debug('Created {0}/ directory'.format(datadir)) try: self.config = self.bot.config[__name__] if not self.config.get('enable_http_server'): return host, port = self.config['http_host'], int(self.config['http_port']) except KeyError: host, port = '127.0.0.1', 8080 bottle.hook('before_request')(self.__strip_path) bottle.route('/')(self.__http_index) bottle.route('/<user>')(self.__http_index) bottle.route('/<user>/<key>')(self.__http_index) bottle_thread = threading.Thread( target=bottle.run, kwargs={'quiet': True, 'host': host, 'port': port}, name='{0} HTTP server'.format(__name__), daemon=True ) bottle_thread.start() self.bot.log.info('{0} started on http://{1}:{2}'.format(bottle_thread.name, host, port))
def run(): bottle.debug(not PROD) bottle.run( app=app, host='localhost', port='2081', reloader=not PROD )
def start(): #http://stackoverflow.com/questions/13760109/ec2-bottle-py-connection app.run(host='0.0.0.0', port=18080) logging.info('completed %.3f' % time.clock())
def db_upgrade(): "Create non-existent question tables; use if questions are updated" tables = r.table_list().run(conn()) print(tables) for page in config['PAGES']: for question in page.questions: name = "%s_answers" % question.name if name not in tables: print("CREATING TABLE %s" % name) r.table_create(name, primary_key = 'user_identity').run(conn())
def followup(dry_run): if dry_run: print("DRY RUN -- Not sending actual emails") config['DEBUG'] = True "Run after changing the question schema; sends out email updates. Recommended at most once per month." emails = [] usrs = [] for user in r.table('users').run(conn()): if 'subscribed' not in user or user['subscribed'] == False: # Ignore anonymous or unsubscribed users continue # Get email for email/xenforo accounts usrs.append(user) emails.append(user['email'] if 'email' in user else user['identity']) print("Sending emails to the following %s users", (len(emails), emails)) desc = sys.stdin.read() for user in usrs: users.mail(user, 'Questions have been updated', """You are receiving this email because the questions have been updated. Please consider logging in and updating your response.<br> <br> Update notes:<br> %s<br> """ % desc.replace('\n', '<br>'), """You are receiving this email because the questions have been updated. Please conisder logging in and updating your response\n \n Update notes:%s\n""" % desc, use_flash = False)
def runserver(debug): "Run HTTP server" config['DEBUG'] = debug if debug == True: config['SITE_NAME'] = '%s debug' % (config['SITE_NAME']) config['DEBUG'] = True if 'ondebug' in config: config['ondebug'](config) run(app, host='0.0.0.0', port=8080, debug = debug, reloader = True)
def run(self): try: while True: buf = self.converter.stdout.read(512) if buf: self.websocket_server.manager.broadcast(buf, binary=True) elif self.converter.poll() is not None: break finally: self.converter.stdout.close()
def http_server(): bottle.run(app=get(), host='0.0.0.0', port=8000, server="gevent", debug=True)
def run_webserver(): app = bottle.app() app.catchall = False app = Sentry(app, raven.Client(SENTRY_DSN)) bottle.run(app, server='paste', host='0.0.0.0', port=3350)
def w2v(): subWordPython = (request.forms.get('submittedWord')).lower() #get IP address ipAddress = request.environ.get('REMOTE_ADDR') # logger.info('word: %s ip: %s' % (subWordPython, ipAddress)) #add subWordPython + ip address to user_words table every time. conn = sqlite3.connect('/w2v/hp_w2v.db') c = conn.cursor() c.execute("INSERT INTO user_words (submitted_word, ip) VALUES (?, ?)", (subWordPython, ipAddress)) conn.commit() conn.close() #run w2v, if word is not in HP corpus, return keyerror.tpl try: wordResults = w2v_results(subWordPython) except KeyError as err: errorPage = template('/w2v/keyerror.tpl', word = subWordPython) return (errorPage) #return results to user output = template('/w2v/results.tpl', rows = wordResults, word = subWordPython) return (output) #call BottleDaemon script and launch a daemon in the background
def theidfobject(idfindex, keyindex, objindex): idf, edges = eppystuff.an_idfedges(idfindex) objkeys = idf_helpers.idfobjectkeys(idf) objkey = objkeys[keyindex] idfobjects = idf.idfobjects[objkey] idfobject = idfobjects[objindex] fields = idfobject.objls values = idfobject.obj valuesfields = [(value, field) for value, field in zip(values, fields)] urls = ["%s/%s" % (objindex, field) for field in fields] linktags = ['<a href=%s>%s %s %s</a>' % (url, i, abullet, value,) for i, (url, value) in enumerate(zip(urls, values))] iddinfos = ['<a href=%s/iddinfo>%s</a>' % (url, '?') for url in urls] lines = ["%s %s %s %s %s" % (linktag, aspace, field, abullet, iddinfo) for linktag, field, iddinfo in zip(linktags, fields, iddinfos)] # --- lines.pop(0) url = 'showlinks/%s' % (objindex, ) showlinkslink = '<a href=%s>show links to other objects</a>' % (url, ) url = 'nodementions/%s' % (objindex, ) showmentionslink = '<a href=%s>show node connections</a> <hr>' % (url, ) url = 'objfunctions/%s' % (objindex, ) objfunctionslink = '<hr> <a href=%s>run functions of this object</a>' % (url, ) lines.append(objfunctionslink) url = 'refferingobjs/%s' % (objindex, ) refferingobjslink = '<a href=%s>Show objects that refer to this object</a> ->this runs slow :-(' % (url, ) lines.append(refferingobjslink) url = 'mentioningobjs/%s' % (objindex, ) mentioningobjslink = '<a href=%s>Show objects that mention this object</a>' % (url, ) lines.append(mentioningobjslink) # - hvac links url = 'hvacprevnext/%s' % (objindex, ) hvacprevnextlink = 'HVAC object in loop -> <a href=%s>prev objects & next objects</a>' % (url, ) lines.append(hvacprevnextlink) # - heading = '%s <a href=%s/key/iddinfo> %s</a>' % (objkey, objindex, '?') headingdoc = '<a href="%s" target="_blank">docs</a>' % (getdoclink(objkey.upper())) headingwithdoc = '%s (%s)' % (heading, headingdoc) lineswithtitle = [headingwithdoc, "=" * len(objkey)] + lines lineswithtitle.insert(0, showmentionslink) lineswithtitle.insert(0, showlinkslink) lineswithtitle = putfilenameontop(idf, lineswithtitle) html = '<br>'.join(lineswithtitle) return html
def _start_server(self): class BottleServerAdapter(bottle.ServerAdapter): proxy = self def close_session(self): self.proxy.ctx.model.log._session.remove() def run(self, app): class Server(wsgiref.simple_server.WSGIServer): allow_reuse_address = True bottle_server = self def handle_error(self, request, client_address): pass def serve_forever(self, poll_interval=0.5): try: wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval) finally: # Once shutdown is called, we need to close the session. # If the session is not closed properly, it might raise warnings, # or even lock the database. self.bottle_server.close_session() class Handler(wsgiref.simple_server.WSGIRequestHandler): def address_string(self): return self.client_address[0] def log_request(*args, **kwargs): # pylint: disable=no-method-argument if not self.quiet: return wsgiref.simple_server.WSGIRequestHandler.log_request(*args, **kwargs) server = wsgiref.simple_server.make_server( host=self.host, port=self.port, app=app, server_class=Server, handler_class=Handler) self.proxy.server = server self.proxy._started.put(True) server.serve_forever(poll_interval=0.1) def serve(): # Since task is a thread_local object, we need to patch it inside the server thread. self._ctx_patcher(self.ctx) bottle_app = bottle.Bottle() bottle_app.post('/', callback=self._request_handler) bottle.run( app=bottle_app, host='localhost', port=self.port, quiet=True, server=BottleServerAdapter) thread = threading.Thread(target=serve) thread.daemon = True thread.start() return thread
def daemon_run(host="localhost", port="8080", pidfile=None, logfile=None, keyfile='priv.key', certfile='pub.crt', cafile='ca.crt', action="start"): """ Get the bottle 'run' function running in the background as a daemonized process. :host: The host interface to listen for connections on. Enter 0.0.0.0 to listen on all interfaces. Defaults to localhost. :port: The host port to listen on. Defaults to 8080. :pidfile: The file to use as the process id file. Defaults to "bottle.pid" :logfile: The file to log stdout and stderr from bottle to. Defaults to "bottle.log" """ if pidfile is None: pidfile = os.path.join( os.getcwd(), "bottle.pid" ) if logfile is None: logfile = os.path.join( os.getcwd(), "bottle.log" ) if action == "start": log = open(logfile, "w+") context = daemon.DaemonContext( pidfile=__locked_pidfile(pidfile), stdout=log, stderr=log ) with context: # bottle.run(host=host, port=port) srv = SSLWSGIRefServer(host=host, port=port, keyfile=keyfile, certfile=certfile, cafile=cafile) bottle.run(server=srv) else: with open(pidfile, "r") as p: pid = int(p.read()) os.kill(pid, signal.SIGTERM)
def main(): starttime = time.time() #parser = argparse.ArgumentParser(description='') #parser.add_argument("action", choices=["start", "stop"]) #parser.add_argument("--basemodel", dest="model", help="base model path") #parser.add_argument("-t", "--test", action="store_true", # help="run a test instead of the server") #parser.add_argument("--log", action="store", dest="loglevel", default="WARNING", help="Log level") #options = parser.parse_args() numeric_level = getattr(logging, "DEBUG", None) #if not isinstance(numeric_level, int): # raise ValueError('Invalid log level: %s' % options.loglevel) #while len(logging.root.handlers) > 0: # logging.root.removeHandler(logging.root.handlers[-1]) logging_format = '%(asctime)s %(levelname)s %(filename)s:%(lineno)s:%(funcName)s %(message)s' logging.basicConfig(level=numeric_level, format=logging_format) logging.getLogger().setLevel(numeric_level) logging.debug("Initializing the server...") server = IBENT(entities=[("mirtex_train_mirna_sner", "stanfordner", "mirna"), ("chemdner_train_all", "stanfordner", "chemical"), ("banner", "banner", "gene"), ("genia_sample_gene", "stanfordner", "gene")], relations=[("all_ddi_train_slk", "jsre", "ddi"), ("mil_classifier4k", "smil", "mirna-gene")]) logging.debug("done.") # Test server bottle.route("/ibent/status")(server.hello) # Fetch an existing document bottle.route("/ibent/<doctag>")(server.get_document) # Create a new document bottle.route("/ibent/<doctag>", method='POST')(server.new_document) # Get new entity annotations i.e. run a classifier again bottle.route("/ibent/entities/<doctag>/<annotator>", method='POST')(server.run_entity_annotator) # Get entity annotations i.e. fetch from the database bottle.route("/ibent/entities/<doctag>/<annotator>")(server.get_annotations) # Get new entity annotations i.e. run a classifier again bottle.route("/ibent/relations/<doctag>/<annotator>", method='POST')(server.run_relation_annotator) # Get new entity annotations i.e. run a classifier again bottle.route("/ibent/relations/<doctag>/<annotator>")(server.get_relations) # Get entity annotations i.e. fetch from the database #bottle.route("/ibent/entities/<doctag>/<annotator>")(server.get_annotations) #bottle.route("/iice/chemical/<text>/<modeltype>", method='POST')(server.process) #bottle.route("/ibent/interactions", method='POST')(server.get_relations) #daemon_run(host='10.10.4.63', port=8080, logfile="server.log", pidfile="server.pid") bottle.run(host=config.host_ip, port=8080, DEBUG=True)