我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用server.create_app()。
def setUp(self): self.app = create_app() self.ctx = self.app.app_context() self.ctx.push() test_user = User() test_user.id = 123456 test_user.username = 'Thinkwhere' self.task_stub = Task() self.task_stub.id = 1 self.task_stub.project_id = 1 self.task_stub.task_status = 0 self.task_stub.locked_by = 123456 self.task_stub.lock_holder = test_user self.lock_task_dto = LockTaskDTO() self.lock_task_dto.user_id = 123456 self.mapped_task_dto = MappedTaskDTO() self.mapped_task_dto.status = TaskStatus.MAPPED.name self.mapped_task_dto.user_id = 123456
def start_app(): """ Run in development mode, never used in production. """ port = int(os.getenv("PORT", 5000)) try: app = create_app() app.run(host='0.0.0.0', port=port) except APIException as e: print ("Application failed to start")
def setUp(self): self.app = create_app() self.ctx = self.app.app_context() self.ctx.push()
def setUp(self): self.app = create_app() self.ctx = self.app.app_context() self.ctx.push() self.unlock_task_stub = Task() self.unlock_task_stub.task_status = TaskStatus.MAPPED.value self.unlock_task_stub.lock_holder_id = 123456
def setUp(self): """ Setup test context so we can connect to database """ self.app = create_app() self.ctx = self.app.app_context() self.ctx.push() if self.skip_tests: return
def setUp(self): if self.skip_tests: return self.app = create_app() self.ctx = self.app.app_context() self.ctx.push()
def setUp(self): if self.skip_tests: return self.app = create_app() self.ctx = self.app.app_context() self.ctx.push() self.test_user = create_canned_user()
def setUp(self): if self.skip_tests: return self.app = create_app() self.ctx = self.app.app_context() self.ctx.push() self.test_project, self.test_user = create_canned_project()
def create_app(self): """ create app Returns: flask app object """ self.app = create_app("sqlite:///", True) self.app.config["TESTING"] = True self.app.config['PRESERVE_CONTEXT_ON_EXCEPTION'] = False return self.app
def main(): args = _get_args() level = args['log_level'].upper() level = log_levels.get(level, logging.INFO) log = setup_logging(log_level=level) import core, utils, server config_file = args['CONFIG'] if not utils.file_exists(config_file): log.error('File %s does not exist' % (config_file)) sys.exit(1) log.info('Validating config') try: core.Engine.validate_config(config_file) except core.ConfigValidationError as ex: log.error('Invalid config. %s' % (str(ex))) sys.exit(1) log.info('Initializing xFlow engine') engine = core.Engine(config_file) log.info('Config is valid') # Run as server if args['s']: logging.info('Configuring xFlow Engine') engine.configure() app = server.create_app(engine) logging.info('Running as server') app.run(host='0.0.0.0', port=80, server='waitress', loglevel='warning') # Configure the lambdas, streams and subscriptions if args['c']: logging.info('Configuring xFlow Engine') engine.configure() logging.info('xFlow Engine configured') # Publish json data to stream if args['p']: stream = args['p'][0] data = args['p'][1] log.info('\n\n\nPublishing to stream: %s\n\nData: %s' % (stream, data)) try: engine.publish(stream, data) log.info('Published') except core.KinesisStreamDoesNotExist: sys.exit(1) # Track a workflow if args['t']: workflow_id = args['t'][0] execution_id = args['t'][1] log.info("\n\n\nTracking workflow, workflow_id=%s, execution_id=%s" % (workflow_id, execution_id)) try: tracking_info = engine.track(workflow_id, execution_id) print json.dumps(tracking_info, indent=4) except (core.CloudWatchStreamDoesNotExist, core.WorkflowDoesNotExist, core.CloudWatchLogDoesNotExist): sys.exit(1)