Python server 模块,create_app() 实例源码

我们从Python开源项目中,提取了以下16个代码示例,用于说明如何使用server.create_app()

项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()

        test_user = User()
        test_user.id = 123456
        test_user.username = 'Thinkwhere'

        self.task_stub = Task()
        self.task_stub.id = 1
        self.task_stub.project_id = 1
        self.task_stub.task_status = 0
        self.task_stub.locked_by = 123456
        self.task_stub.lock_holder = test_user

        self.lock_task_dto = LockTaskDTO()
        self.lock_task_dto.user_id = 123456

        self.mapped_task_dto = MappedTaskDTO()
        self.mapped_task_dto.status = TaskStatus.MAPPED.name
        self.mapped_task_dto.user_id = 123456
项目:logistics-wizard-controller    作者:IBM-Cloud    | 项目源码 | 文件源码
def start_app():
    """
    Run in development mode, never used in production.
    """
    port = int(os.getenv("PORT", 5000))
    try:
        app = create_app()
        app.run(host='0.0.0.0', port=port)
    except APIException as e:
        print ("Application failed to start")
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()

        self.unlock_task_stub = Task()
        self.unlock_task_stub.task_status = TaskStatus.MAPPED.value
        self.unlock_task_stub.lock_holder_id = 123456
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        """
        Setup test context so we can connect to database
        """
        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()

        if self.skip_tests:
            return
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        if self.skip_tests:
            return

        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        if self.skip_tests:
            return

        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()

        self.test_user = create_canned_user()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        if self.skip_tests:
            return

        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()

        self.test_project, self.test_user = create_canned_project()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        if self.skip_tests:
            return

        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        if self.skip_tests:
            return

        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        if self.skip_tests:
            return

        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()

        self.test_project, self.test_user = create_canned_project()
项目:tasking-manager    作者:hotosm    | 项目源码 | 文件源码
def setUp(self):
        if self.skip_tests:
            return

        self.app = create_app()
        self.ctx = self.app.app_context()
        self.ctx.push()
项目:ohlife_replacement_server    作者:paulx3    | 项目源码 | 文件源码
def create_app(self):
        """
        create app
        Returns: flask app object

        """
        self.app = create_app("sqlite:///", True)
        self.app.config["TESTING"] = True
        self.app.config['PRESERVE_CONTEXT_ON_EXCEPTION'] = False
        return self.app
项目:xFlow    作者:dsouzajude    | 项目源码 | 文件源码
def main():
    args = _get_args()
    level = args['log_level'].upper()
    level = log_levels.get(level, logging.INFO)
    log = setup_logging(log_level=level)

    import core, utils, server

    config_file = args['CONFIG']
    if not utils.file_exists(config_file):
        log.error('File %s does not exist' % (config_file))
        sys.exit(1)

    log.info('Validating config')
    try:
        core.Engine.validate_config(config_file)
    except core.ConfigValidationError as ex:
        log.error('Invalid config. %s' % (str(ex)))
        sys.exit(1)

    log.info('Initializing xFlow engine')
    engine = core.Engine(config_file)
    log.info('Config is valid')

    # Run as server
    if args['s']:
        logging.info('Configuring xFlow Engine')
        engine.configure()
        app = server.create_app(engine)
        logging.info('Running as server')
        app.run(host='0.0.0.0', port=80, server='waitress', loglevel='warning')

    # Configure the lambdas, streams and subscriptions
    if args['c']:
        logging.info('Configuring xFlow Engine')
        engine.configure()
        logging.info('xFlow Engine configured')

    # Publish json data to stream
    if args['p']:
        stream = args['p'][0]
        data = args['p'][1]
        log.info('\n\n\nPublishing to stream: %s\n\nData: %s' % (stream, data))
        try:
            engine.publish(stream, data)
            log.info('Published')
        except core.KinesisStreamDoesNotExist:
            sys.exit(1)

    # Track a workflow
    if args['t']:
        workflow_id = args['t'][0]
        execution_id = args['t'][1]
        log.info("\n\n\nTracking workflow, workflow_id=%s, execution_id=%s" % (workflow_id, execution_id))
        try:
            tracking_info = engine.track(workflow_id, execution_id)
            print json.dumps(tracking_info, indent=4)
        except (core.CloudWatchStreamDoesNotExist,
                core.WorkflowDoesNotExist,
                core.CloudWatchLogDoesNotExist):
            sys.exit(1)