我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用app.app()。
def __init__(self): self.db_conn = mariadb.connect(user=Globals.DB['username'], password=Globals.DB['password'], database=Globals.DB['dbname']) self.CURSOR = self.db_conn.cursor() #mysql = MySQL() #mysql.init_app(app) #self.conn = mysql.connect() #CURSOR = self.conn.cursor() #print('MYSQL CONNECT:') #print(self.conn.cursor()) #print(CURSOR) # CURSOR = db.cursor() # cursor.execute(""" # select 3 from your_table # """) # result = cursor.fetchall() # print result
def viewLog(mode, data=None): ''' ??? mode? ?? ?? ??? ????. message : ???, ??, ??? ????. json??? data add : ???? ????. json??? data block, exit : ???? ????. string??? data fail : ?? request ?? ?? ?? ''' if mode is "message": app.logger.info("[message] user_key : {}, type : {}, content : {}".format( data["user_key"], data["type"], data["content"])) elif mode is "keyboard": app.logger.info("[keyboard] call home keyboard") elif mode is "add": app.logger.info("[join] user_key : {}".format(data["user_key"])) elif mode is "block": app.logger.info("[block] user_key : {}".format(data)) elif mode is "exit": app.logger.info("[exit] user_key : {}".format(data)) elif mode is "fail": app.logger.info("[fail] request process fail")
def token_check(func): @functools.wraps(func) def wrapper(*args,**kwargs): if not request.headers.get('Authorization'): return base_result(failed,msg='no token',error_code=error_code.token_no_exist_error) token = request.headers['Authorization'] # print(token) payload = util.parser_token(token) if payload is None: return base_result(failed,msg='no token',error_code=error_code.token_no_exist_error) print('['+payload.identify+']'+payload.username+' auth in '+str(datetime.fromtimestamp(payload.timestamp))) nowtime = int(datetime.now().timestamp()) if not payload: return base_result(failed,msg='illegal token',error_code=error_code.token_illegal_error) if nowtime - payload.timestamp > configs.app.exprire: return base_result(failed,msg='expire token',error_code=error_code.token_expire_error) else: return func(*args,**kwargs) return wrapper #???token?????
def list_routes(): import urllib output = [] for rule in app.url_map.iter_rules(): line = urllib.unquote("{:20s}\t{}\t{:50s}".format( ','.join(rule.methods), rule.rule, rule.endpoint)) output.append(line) for line in sorted(output): print line
def make_shell_context(): """??????""" return dict( app=app )
def __init__(self): mysql = MySQL() mysql.init_app(app) self.conn = mysql.connect() self.cursor = self.conn.cursor()
def make_shell_context(): from app.schedule.models import CrontabSchedule from app.schedule.models import IntervalSchedule from app.schedule.models import ScheduleTask from app.schedule.models import ScheduleMeta from app.schedule.models import ScheduleInfo return dict(app=app, db=db, CrontabSchedule=CrontabSchedule, IntervalSchedule=IntervalSchedule, ScheduleTask=ScheduleTask, ScheduleMeta=ScheduleMeta, ScheduleInfo=ScheduleInfo)
def make_shell_context(): return { 'db': db, 'PasteFile': PasteFile, 'app': app }
def main(): parser = argparse.ArgumentParser() parser.add_argument('-p', '--port', help='server port', type=int, default=9000) args = parser.parse_args() http_server = WSGIServer(('', args.port), app) http_server.serve_forever()
def main(): define('port', default=9000, type=int, help='Port on which to listen.') parse_command_line() http_server = HTTPServer(WSGIContainer(app)) http_server.listen(options.port) IOLoop.instance().start()
def create_app(self): """ Create an instance of the app with the testing configuration :return: """ app.config.from_object('app.config.TestingConfig') return app
def main(): print 'Starting TRex HTTP proxy on port: ', server_config['port'] http_server = WSGIServer(('', server_config['port']), app) http_server.serve_forever() # Start web server
def handle(event, context): # Cloudfront isn't configured to pass Host headers, so the provided Host header is the API Gateway hostname event['headers']['Host'] = os.environ['SERVER_NAME'] # Cloudfront drops X-Forwarded-Proto, so the value provided is from API Gateway event['headers']['X-Forwarded-Proto'] = 'http' return awsgi.response(tiler, event, context)
def __init__(self, url): self.host, port_str = url.split(':') self.port = int(port_str) self.server = None # create the thread object self.thread = threading.Thread(target=self._start_listening_blocking) # wrap Flask application with socketio's middleware self.app = socketio.Middleware(sio, app)
def _start_listening_blocking(self): # deploy as an eventlet WSGI server listener = eventlet.listen((self.host, self.port)) self.server = wsgi.server(listener, self.app, log_output=False, debug=False)
def setLogger(app, level): app.logger.addHandler(handler) app.logger.setLevel(level)
def customLog(msg): app.logger.info(msg)
def managerLog(mode, user_key): app.logger.info("[{}] {} {} processing completed".format(mode, user_key, mode))
def lambda_handler(event, context=None): # Periodic if event.get('detail-type') == 'Scheduled Event': debug(event, context) return app.on_timer(event) # SNS / Dynamodb / Kinesis elif event.get('Records'): records = event['Records'] if records and records[0]['EventSource'] == 'aws:sns': return app.on_config_message(records) else: return debug(event, context) elif not event.get('path'): return debug(event, context) # API Gateway if app.config.get('sentry-dsn'): from raven import Client from raven.contrib.bottle import Sentry client = Client(app.config['sentry-dsn']) app.app.catchall = False wrapped_app = Sentry(app.app, client) else: wrapped_app = app.app return wsgigw.invoke(wrapped_app, event)
def local(reload, port): """run local app server, assumes into the account """ import logging from bottle import run from app import controller, app from c7n.resources import load_resources load_resources() print("Loaded resources definitions") logging.basicConfig(level=logging.DEBUG) logging.getLogger('botocore').setLevel(logging.WARNING) if controller.db.provision(): print("Table Created") run(app, reloader=reload, port=port)
def list_routes(): import urllib output = [] for rule in app.url_map.iter_rules(): endpoint = rule.endpoint methods = ','.join(rule.methods) line = '{:50s} {:20s} {}'.format(endpoint, methods, rule.rule) line = urllib.unquote(line) output.append(line) for line in sorted(output): print line
def _make_context(): return dict(app=app, db=db, models=models)
def __init__(self): """ ????????????????logging ???? """ todaydetail = datetime.datetime.today() d = todaydetail.strftime("%Y%m%d") logName = d + '_' + self.logPrefix logs.init_app(app, logName)
def info(self, msg): app.logger.info(msg) print(msg)
def error(self, msg): app.logger.error(msg) print(msg) sys.exit()
def app(): from app import app return app
def get_apps(app_type, page): _, response = app.test_client.get('/apps/{}/page/{}?t={}'.format(app_type, page, time.time())) response_normal_check(response) json = ujson.loads(response.text) assert json.get('datas') is not None return response
def upload(): data = FormData() data.add_field('package', open('tests/QXmokuai_3.apk', 'rb'), filename='QXmokuai_3.apk') data.add_field('msg', 'test upload') _, response = app.test_client.post('/upload/app', data=data) response_normal_check(response) return response
def get_app_detail(app_id): _, response = app.test_client.get('/apps/{}'.format(app_id)) response_normal_check(response) return response
def get_app_versions(app_id, page): _, response = app.test_client.get('/apps/{}/versions/page/{}?t={}'.format(app_id, page, time.time())) response_normal_check(response) return response
def put_app_detail(app_id, name, short_chain, detail): _, response = app.test_client.put('/apps/{}'.format(app_id), data=ujson.dumps(dict(name=name, short_chain=short_chain, detail=detail))) response_normal_check(response) return response
def del_app_version(app_id, package_id): _, response = app.test_client.delete('/apps/{}/versions/{}'.format(app_id, package_id)) response_normal_check(response) return response
def make_shell_context(): return dict(app=app)
def client(): return testing.TestClient(app.app)