Python peewee 模块,DoesNotExist() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用peewee.DoesNotExist()

项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def get_player(self, login=None, pk=None, lock=True):
        """
        Get player by login or primary key.

        :param login: Login.
        :param pk: Primary Key identifier.
        :param lock: Lock for a sec when receiving.
        :return: Player or exception if not found
        :rtype: pyplanet.apps.core.maniaplanet.models.Player
        """
        try:
            if login:
                return await Player.get_by_login(login)
            elif pk:
                return await Player.get(pk=pk)
            else:
                raise PlayerNotFound('Player not found.')
        except DoesNotExist:
            if lock:
                await asyncio.sleep(4)
                return await self.get_player(login=login, pk=pk, lock=False)
            else:
                raise PlayerNotFound('Player not found.')
项目:sentinel-old    作者:monacocoin-net    | 项目源码 | 文件源码
def check_db_schema_version():
    """ Ensure DB schema is correct version. Drop tables if not. """
    db_schema_version = None

    try:
        db_schema_version = Setting.get(Setting.name == 'DB_SCHEMA_VERSION').value
    except (peewee.OperationalError, peewee.DoesNotExist, peewee.ProgrammingError) as e:
        printdbg("[info]: Can't get DB_SCHEMA_VERSION...")

    printdbg("[info]: SCHEMA_VERSION (code) = [%s]" % SCHEMA_VERSION)
    printdbg("[info]: DB_SCHEMA_VERSION = [%s]" % db_schema_version)
    if (SCHEMA_VERSION != db_schema_version):
        printdbg("[info]: Schema version mis-match. Syncing tables.")
        try:
            existing_table_names = db.get_tables()
            existing_models = [m for m in db_models() if m._meta.db_table in existing_table_names]
            if (existing_models):
                printdbg("[info]: Dropping tables...")
                db.drop_tables(existing_models, safe=False, cascade=False)
        except (peewee.InternalError, peewee.OperationalError, peewee.ProgrammingError) as e:
            print("[error] Could not drop tables: %s" % e)
项目:sentinel    作者:dashpay    | 项目源码 | 文件源码
def check_db_schema_version():
    """ Ensure DB schema is correct version. Drop tables if not. """
    db_schema_version = None

    try:
        db_schema_version = Setting.get(Setting.name == 'DB_SCHEMA_VERSION').value
    except (peewee.OperationalError, peewee.DoesNotExist, peewee.ProgrammingError) as e:
        printdbg("[info]: Can't get DB_SCHEMA_VERSION...")

    printdbg("[info]: SCHEMA_VERSION (code) = [%s]" % SCHEMA_VERSION)
    printdbg("[info]: DB_SCHEMA_VERSION = [%s]" % db_schema_version)
    if (SCHEMA_VERSION != db_schema_version):
        printdbg("[info]: Schema version mis-match. Syncing tables.")
        try:
            existing_table_names = db.get_tables()
            existing_models = [m for m in db_models() if m._meta.db_table in existing_table_names]
            if (existing_models):
                printdbg("[info]: Dropping tables...")
                db.drop_tables(existing_models, safe=False, cascade=False)
        except (peewee.InternalError, peewee.OperationalError, peewee.ProgrammingError) as e:
            print("[error] Could not drop tables: %s" % e)
项目:aryas    作者:lattkkthxbbye    | 项目源码 | 文件源码
def users(self, ctx: commands.Context, count=10):
        """
        Show users with the most messages
        """
        count = count
        # Show at least 1 user and 20 at most
        count = max(1, count)
        count = min(20, count)

        try:
            server = self.orm.Server.get(discord_id=ctx.message.server.id)
        except DoesNotExist as e:
            # If there's no server in the db an exception will be raised
            self.config.logger.error(e)
        else:
            users = await self.orm.query.user_top_list(count, server)
            embed = discord.Embed(color=discord.Color(self.config.constants.embed_color),
                                  timestamp=datetime.datetime.now())
            for user in users:
                # the user might not have a name if s/he hasn't sent a message already
                # so in that case use the id instead
                name = user.name if user.name != '' else user.discord_id
                embed.add_field(name=name, value='Total messages: {}'.format(user.count), inline=False)

            await self.bot.say(content='Top active users:', embed=embed)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def get_obfuscated_by_lower(obfuscated_lower, **kwargs):
    """Get an identifier row from the lowercase obfuscated value.

    Parameters
    ----------
    obfuscated_lower: str

    Returns
    -------
    identifier_row
    """
    try:
        identifier_row = get_row_by_fields(Identifier,
                                           obfuscated_lower=obfuscated_lower)
        return identifier_row.name
    except DoesNotExist:
        return None
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def get_obfuscated_name(identifier_name, **kwargs):
    """Get obfuscated name of an identifier.

    Parameters
    ----------
    identifier_name : str

    Returns
    -------
    obfuscated_name : str
    """
    try:
        identifier_row = get_identifier_by_name(identifier_name)
        return identifier_row.obfuscated_name
    except DoesNotExist:
        return identifier_name
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def add_attribs_reserveds_list(attrib_list):
    """Add attributes that follow a reserved name to reserveds list."""
    if len(attrib_list) > 1:  # A single item does not change
        is_reserved = False
        reserved_set = set()
        for name in attrib_list:
            if not is_reserved:
                try:
                    get_reserved_by_name(name)
                    is_reserved = True
                    package_name = name
                    continue  # Don't add already reserved name
                except DoesNotExist:
                    continue
            if is_reserved:
                reserved_set.add(name)
        if reserved_set:
            add_reserveds(package_name, reserved_set)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_logic_reserved_delete():
    from logic.reserved import delete_reserved, get_reserved, \
        save_reserved

    reserved_row = get_reserved(None)
    assert save_reserved(
        reserved_row,
        name=u'Reserved Two')

    assert get_reserved(2).id == 2, 'Record not present or wrong id'

    # Delete an existing reserved
    row = get_reserved(2)
    assert delete_reserved(row) == 1

    with pytest.raises(DoesNotExist):
        assert get_reserved(2).id == 2, 'Record not found'

    # Fail to delete a non-existing reserved
    with pytest.raises(DoesNotExist):
        assert delete_reserved(row)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_logic_identifier_delete():
    from logic.identifier import delete_identifier, get_identifier, \
        save_identifier

    identifier_row = get_identifier(None)
    assert save_identifier(
        identifier_row,
        name=u'Identifier Two')

    assert get_identifier(2).id == 2, 'Record not present or wrong id'

    # Delete an existing identifier
    row = get_identifier(2)
    assert delete_identifier(row) == 1

    with pytest.raises(DoesNotExist):
        assert get_identifier(2).id == 2, 'Record not found'

    # Fail to delete a non-existing identifier
    with pytest.raises(DoesNotExist):
        assert delete_identifier(row)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_create_decision():
    from logic.identifier import Identifier
    from logic.identifier import get_identifier

    time_before = datetime.datetime.now()
    identifier_row = get_identifier(None)
    time_after = datetime.datetime.now()

    assert not identifier_row.id
    assert identifier_row.name == Identifier.name.default
    assert identifier_row.obfuscated_name == Identifier.obfuscated_name.default
    assert time_before <= identifier_row.created_timestamp <= time_after

    # Fail at bad parameter
    with pytest.raises(DoesNotExist):
        get_identifier(999)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_add_from_import():
    bnf_parser = ObfuscatePythonBNF(get_obfuscated_name)

    # Reserve imported modules if first from module is reserved
    bnf_parser.from_import.parseString(
        "   from some_module.reserved_one import is_reserved, also_is")
    assert get_reserved_by_name('is_reserved')
    assert get_reserved_by_name('also_is')

    # Reserve imported modules if first from module is reserved
    bnf_parser.from_import.parseString(
        "   from reserved_one.some_module import is_reserved, also_reserved")
    assert get_reserved_by_name('is_reserved').primary_package == \
        'reserved_one'
    assert get_reserved_by_name('also_reserved').primary_package == \
        'reserved_one'

    # Import (without a from) should take no action
    bnf_parser.from_import.parseString(
        "   import reserved_one, not_reserved_1, also_not_1")
    with pytest.raises(DoesNotExist):
        get_reserved_by_name('not_reserved_1')
    with pytest.raises(DoesNotExist):
        get_reserved_by_name('also_not_1')
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_add_except_error():
    bnf_parser = ObfuscatePythonBNF(get_obfuscated_name)

    # Reserve exception names
    bnf_parser.except_error.parseString(
        "   except (FloatingPointError, SomeOtherError) as exc_err")
    assert get_reserved_by_name('FloatingPointError')
    assert get_reserved_by_name('SomeOtherError')
    with pytest.raises(DoesNotExist):
        assert get_reserved_by_name('exc_err')

    # Reserve exception names with double tab
    bnf_parser.except_error.parseString(
        "       except (FloatingPointError, SomeOtherError) as exc_err")
    assert get_reserved_by_name('FloatingPointError')
    assert get_reserved_by_name('SomeOtherError')
    with pytest.raises(DoesNotExist):
        assert get_reserved_by_name('exc_err')
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_add_kivy_import():
    bnf_parser = ObfuscateKivyBNF(get_obfuscated_name)

    # Reserve imported modules from first reserved module
    bnf_parser.kivy_import.parseString(
        "#: import is_reserved some_module.reserved_one")
    assert get_reserved_by_name('is_reserved').primary_package == \
        'reserved_one'
    with pytest.raises(DoesNotExist):
        get_reserved_by_name('some_module')

    # Reserve imported modules if first from module is reserved
    bnf_parser.kivy_import.parseString(
        "#: import is_reserved reserved_one.some_module")
    assert get_reserved_by_name('is_reserved').primary_package == \
        'reserved_one'
    assert get_reserved_by_name('some_module').primary_package == \
        'reserved_one'

    # Import without directive should take no action
    bnf_parser.kivy_import.parseString(
        "import not_reserved_1 reserved_one")
    with pytest.raises(DoesNotExist):
        get_reserved_by_name('not_reserved_1')
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def get_row(model, row_id, **kwargs):
    """Get a database row by its id.

    Parameters
    ----------
    model : BaseModel
    row_id : int or None

    Returns
    -------
    row
    """
    try:
        if row_id:
            try:
                row = model.get(model.id == row_id)
            except DoesNotExist:
                raise
        else:
            row = model()
    except ImportError:
        raise

    return row
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def get_row_by_name(model, row_name, FK_name=None, FK_id=None, **kwargs):
    """Get a unique row by its name or by name plus FK if a FK is needed.

    Parameters
    ----------
    FK_id : int
    FK_name : str
    model : BaseModel
    row_name : str

    Returns
    -------
    row
    """
    try:
        if FK_name:
            row = model.get(model.name == row_name,
                            getattr(model, FK_name) == FK_id)
        else:
            row = model.get(model.name == row_name)
    except DoesNotExist:
        raise
    else:
        return row
项目:argosd    作者:danielkoster    | 项目源码 | 文件源码
def _process_download_command(data):
        matches = re.search('download (\d{1,})', data)
        if matches is not None:
            episode_id = int(matches.group(1))
            try:
                episode = Episode.get(Episode.id == episode_id)
                torrentclient = Transmission()

                torrentclient.download_episode(episode)
                episode.is_downloaded = True
                episode.save()
                return True
            except DoesNotExist:
                error = 'Tried to download non-existing episode with ID: %d'
                logging.error(error % episode_id)
                return False
            except TorrentClientException as e:
                logging.error('TorrentClientException occured: %s' % str(e))
                return False
        else:
            logging.error('Incorrect download command received: %s' % data)
            return False
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def get_virtual_card():
    username = request.args.get("username")
    try:
        virtual_card = virtual_card_service.get_virtual_card(
            card_no=username
        )
        virtual_card = model_to_dict(virtual_card, recurse=False)
        return jsonify({'response': virtual_card}), 200

    except DoesNotExist as e:
        return jsonify({
            'response': {
                'error': e.args,
                'message': '????????'
            }
        }), 400


# ????
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def get_battery(serial_number):
    """
    get battery rent information

    :param serial_number: battery serial_number
    :return: information
    """
    try:
        battery = battery_rent_service.get_battery(serial_number)
        return jsonify({'response': model_to_dict(battery)}), 200
    except DoesNotExist as e:
        return jsonify({
            'response': {
                "error": '%s: %s' % (str(DoesNotExist), e.args),
                "message": '%s' % e.args
            }
        }), 400


# 2. ?????? ????
项目:peewee-validates    作者:timster    | 项目源码 | 文件源码
def validate(self, name, data):
        """
        If there is a problem with the data, raise ValidationError.

        :param name: The name of this field.
        :param data: Dictionary of data for all fields.
        :raises: ValidationError
        """
        super().validate(name, data)
        if self.value is not None:
            try:
                # self.query could be a query like "User.select()" or a model like "User"
                # so ".select().where()" handles both cases.
                self.value = [self.query.select().where(self.lookup_field == v).get() for v in self.value if v]
            except (AttributeError, ValueError, peewee.DoesNotExist):
                raise ValidationError('related', field=self.lookup_field.name, values=self.value)
项目:swefreq    作者:NBISweden    | 项目源码 | 文件源码
def add_image(dataset_pk, image):
    try:
        dataset = db.Dataset.get(db.Dataset.dataset == dataset_pk)
    except peewee.DoesNotExist:
        print("Can't find any dataset with id {}".format(dataset_pk))
        sys.exit(1)

    with open(image, 'rb') as f:
        data = f.read()

    try:
        mimetype = infer_mimetype( image )
    except NoMimetypeException:
        print("Can't find mime type for <{}>".format(image))
        sys.exit(2)

    print(len(data))
    with db.database.atomic():
        db.DatasetLogo.create(
            dataset  = dataset,
            mimetype = mimetype,
            data     = data
        )
项目:swefreq    作者:NBISweden    | 项目源码 | 文件源码
def get_current_user(self):
        email = self.get_secure_cookie('email')
        name  = self.get_secure_cookie('user')

        # Fix ridiculous bug with quotation marks showing on the web
        if name and (name[0] == '"') and (name[-1] == '"'):
            name = user[1:-1]

        if email:
            try:
                return db.User.select().where( db.User.email == email ).get()
            except peewee.DoesNotExist:
                ## Not saved in the database yet
                return db.User(email = email.decode('utf-8'),
                               name  = name.decode('utf-8'))
        else:
            return None
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def get_map(self, uid=None):
        """
        Get map instance by uid.

        :param uid: By uid (pk).
        :return: Player or exception if not found
        """
        try:
            return await Map.get_by_uid(uid)
        except DoesNotExist:
            raise MapNotFound('Map not found.')
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def get_map_by_index(self, index):
        """
        Get map instance by index id (primary key).

        :param index: Primary key index.
        :return: Map instance or raise exception.
        """
        try:
            return await Map.get(id=index)
        except DoesNotExist:
            raise MapNotFound('Map not found.')
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def register(self, name, description='', app=None, min_level=1, namespace=None):
        """
        Register a new permission.

        :param name: Name of permission
        :param description: Description in english.
        :param app: App instance to retrieve the label.
        :param min_level: Minimum level required.
        :param namespace: Namespace, only for core usage!
        :return: Permission instance.
        """
        if not namespace and app:
            namespace = app.label
        if not namespace:
            raise Exception('Namespace is required. You should give your app instance with app=app instead!')

        try:
            perm = await self.get_perm(namespace=namespace, name=name)

            # TODO: Implement overrides on min_level here.
            if perm.min_level != min_level:
                perm.min_level = min_level
                await perm.save()

        except DoesNotExist:
            perm = Permission(namespace=namespace, name=name, description=description, min_level=min_level)
            await perm.save()
        return perm
项目:sentinel-old    作者:monacocoin-net    | 项目源码 | 文件源码
def get(self, name):
        setting_name = "__transient_%s" % (name)

        try:
            the_setting = Setting.get(Setting.name == setting_name)
            t = Transient.from_setting(the_setting)
        except Setting.DoesNotExist as e:
            return False

        if t.is_expired():
            the_setting.delete_instance()
            return False
        else:
            return t.value
项目:sentinel-old    作者:monacocoin-net    | 项目源码 | 文件源码
def delete(self, name):
        setting_name = "__transient_%s" % (name)
        try:
            s = Setting.get(Setting.name == setting_name)
        except Setting.DoesNotExist as e:
            return False
        return s.delete_instance()

# === /models ===
项目:sentinel    作者:dashpay    | 项目源码 | 文件源码
def get(self, name):
        setting_name = "__transient_%s" % (name)

        try:
            the_setting = Setting.get(Setting.name == setting_name)
            t = Transient.from_setting(the_setting)
        except Setting.DoesNotExist as e:
            return False

        if t.is_expired():
            the_setting.delete_instance()
            return False
        else:
            return t.value
项目:sentinel    作者:dashpay    | 项目源码 | 文件源码
def delete(self, name):
        setting_name = "__transient_%s" % (name)
        try:
            s = Setting.get(Setting.name == setting_name)
        except Setting.DoesNotExist as e:
            return False
        return s.delete_instance()

# === /models ===
项目:aryas    作者:lattkkthxbbye    | 项目源码 | 文件源码
def total(self, ctx: commands.Context):
        """
        Show total amount of messages on server
        """
        try:
            server = self.orm.Server.get(discord_id=ctx.message.server.id)
        except DoesNotExist as e:
            # If there's no server in the db an exception will be raised
            self.config.logger.error(e)
        else:
            total_messages = await self.orm.query.server_total_messages(server)
            await self.bot.say('Total messages: {}'.format(total_messages))
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def add_identifiers(identifier_list=None, do_obfuscate=True):
    """Add identifier and obfuscated names to Identifiers table.

    Parameters
    ----------
    do_obfuscate : bool
    identifier_list : list
    """
    for identifier_name in identifier_list:
        # Skip identifiers in reserved
        try:
            get_reserved_by_name(identifier_name)
        except DoesNotExist:
            pass
        else:
            continue

        if not do_obfuscate \
                or identifier_name[0:2] == '__' \
                or identifier_name == '__init__.py' \
                or identifier_name.startswith('test_') \
                or identifier_name.endswith('_test.py'):
            obfuscated_name = identifier_name
        else:
            obfuscated_name = ''

        identifier_row = get_identifier(None)
        try:
            save_identifier(identifier_row,
                            name=identifier_name,
                            obfuscated_name=obfuscated_name)
        except IntegrityError as e:
            if 'unique' not in e.message.lower():
                raise
            # If should not be obfuscated, replace obfuscated, o/w pass
            if not do_obfuscate:
                identifier_row = get_identifier_by_name(identifier_name)
                if identifier_row.obfuscated_name != identifier_name:
                    save_identifier(identifier_row,
                                    obfuscated_name=identifier_name)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def validate_obfuscated_name(self):
        """Make sure the obfuscated name is unique."""
        if self.name != self.obfuscated_name:
            is_unique = False
            while not is_unique:
                if not self.obfuscated_name:
                    random_num = base_random_number(5)
                    self.obfuscated_name = base_alphabet_encode(random_num, 5)
                try:
                    get_reserved_by_name(self.obfuscated_name)
                except pwe.DoesNotExist:
                    is_unique = True
                else:
                    self.obfuscated_name = None
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def save_identifier(identifier_row, **kwargs):
    """Save and identifier row.

    Parameters
    ----------
    identifier_row

    Returns
    -------
    identifier_id : int
    """
    try:
        for name, value in kwargs.iteritems():
            getattr(identifier_row, name)  # Make sure column exists
            setattr(identifier_row, name, value)

    except AttributeError:
        raise

    with obfuscatedb.atomic():
        try:
            identifier_id = save_row(identifier_row, **kwargs)
        except IntegrityError as e:
            # Resave with different obfuscated_name if already exists
            if 'unique' in e.message.lower() \
                    and not kwargs.get('obfuscated_name', None) \
                    and 'obfuscated_name' in e.message:
                identifier_row.obfuscated_name = None
                identifier_id = save_identifier(identifier_row)
            else:
                raise
        except DoesNotExist:
            raise

        return identifier_id
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def add_from_import(self, from_import_list):
        """Add imported modules from reserved modules to reserved.

        Parameters
        ----------
        from_import_list : list
        """
        if not from_import_list or \
                from_import_list[0] != 'from' or \
                'import' not in from_import_list[:]:
            return

        reserved_list = set()
        import_index = from_import_list[:].index('import')
        package_name = ''
        is_reserved = False
        for reserve_name in from_import_list[1:import_index]:
            # Start with first reserved directory in tree (if one exists)
            if not is_reserved:
                try:
                    get_reserved_by_name(reserve_name)
                    is_reserved = True
                    package_name = reserve_name
                except DoesNotExist:
                    continue
            if is_reserved:
                if reserve_name[0].isalpha() or reserve_name[0] == '_':
                    reserved_list.add(reserve_name)

        if is_reserved:
            # Get imported items
            for reserve_name in from_import_list[import_index+1:]:
                if reserve_name[0].isalpha() or reserve_name[0] == '_':
                    reserved_list.add(reserve_name)
            add_reserveds(package_name, reserved_list)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def add_kivy_import(self, kivy_import_list):
        """Add imported modules from reserved modules to reserved.

        Parameters
        ----------
        kivy_import_list : list
            Kivy import statement.
        """
        if not kivy_import_list or \
                kivy_import_list[0].strip() != '#:' or \
                kivy_import_list[1] != 'import':
            return

        reserved_list = set()
        package_name = ''
        is_reserved = False
        for reserve_name in kivy_import_list[3:]:
            # Start with first reserved directory in tree (if one exists)
            if not is_reserved:
                try:
                    get_reserved_by_name(reserve_name)
                    is_reserved = True
                    package_name = reserve_name
                except DoesNotExist:
                    continue
            if is_reserved:
                if reserve_name[0].isalpha() or reserve_name[0] == '_':
                    reserved_list.add(reserve_name)
        if is_reserved:
            reserved_list.add(kivy_import_list[2])
            add_reserveds(package_name, reserved_list)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_get_reserved():

    # Get good reserved_id
    reserved_row = get_reserved(1)
    assert reserved_row.id == 1
    assert reserved_row.name == u'reserved_one'

    # Fail at getting bad reserved_id
    with pytest.raises(DoesNotExist):
        get_reserved(999)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_create_decision():
    from logic.reserved import Reserved
    from logic.reserved import get_reserved

    time_before = datetime.datetime.now()
    reserved_row = get_reserved(None)
    time_after = datetime.datetime.now()

    assert not reserved_row.id
    assert reserved_row.name == Reserved.name.default
    assert time_before <= reserved_row.created_timestamp <= time_after

    # Fail at bad parameter
    with pytest.raises(DoesNotExist):
        get_reserved(999)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_get_identifier():

    # Get good identifier_id
    identifier_row = get_identifier(1)
    assert identifier_row.id == 1
    assert identifier_row.name == u'identifier_one'

    # Fail at getting bad identifier_id
    with pytest.raises(DoesNotExist):
        get_identifier(999)
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_get_identifier_by_name():

    # Get good identifier_id
    identifier_row = get_identifier_by_name('identifier_one')
    assert identifier_row.id == 1
    assert identifier_row.name == u'identifier_one'

    # Fail at getting bad identifier_id
    with pytest.raises(DoesNotExist):
        get_identifier_by_name('junk')
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_add_reserveds():
    # Define values to be updated, and their new values
    names_to_update = {
        "variable": "v001",
        "variable_row": "v002",
        "scenario": "s001",
        "decision": "d001",
        "decision_row": "d002"
        }

    for ident_name, obfuscated_name in names_to_update.iteritems():
        identifier_row = get_identifier(None)
        save_identifier(
            identifier_row,
            name=ident_name,
            obfuscated_name=obfuscated_name
            )

    bnf_parser = ObfuscatePythonBNF(get_obfuscated_name)

    # Identifiers after leading reserved attributes should not be obfuscated
    # They should be added to reserved and removed from identifiers
    bnf_parser.attribs.parseString(
        "decision.scenario(variable) = reserved_one.variable "
        "+ view.reserved_one")

    assert bnf_parser.statement.transformString(
        "decision.scenario(variable) = reserved_one.variable") == \
        "d001.s001(variable)=reserved_one.variable"

    assert get_reserved_by_name('variable').primary_package == 'reserved_one'
    assert get_identifier_by_name('variable').obfuscated_name == 'variable'
    with pytest.raises(DoesNotExist):
        assert get_identifier_by_name('view').obfuscated_name != 'view'
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def test_add_reserveds():
    # Define values to be updated, and their new values
    names_to_update = {
        "variable": "v001",
        "variable_row": "v002",
        "scenario": "s001",
        "decision": "d001",
        "decision_row": "d002"
        }

    for ident_name, obfuscated_name in names_to_update.iteritems():
        identifier_row = get_identifier(None)
        save_identifier(
            identifier_row,
            name=ident_name,
            obfuscated_name=obfuscated_name
            )

    bnf_parser = ObfuscateKivyBNF(get_obfuscated_name)

    # Identifiers after leading reserved attributes should not be obfuscated
    # They should be added to reserved and removed from identifiers
    bnf_parser.attribs.parseString(
        "decision.scenario(variable) = reserved_one.variable "
        "+ view.reserved_one")

    assert bnf_parser.statement.transformString(
        "decision.scenario(variable) = reserved_one.variable") == \
        "d001.s001(variable)=reserved_one.variable"

    assert get_reserved_by_name('variable').primary_package == 'reserved_one'
    assert get_identifier_by_name('variable').obfuscated_name == 'variable'
    with pytest.raises(DoesNotExist):
        assert get_identifier_by_name('view').obfuscated_name != 'view'
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def get_row_by_FK(model, FK_name, FK_id, is_required=False, **kwargs):
    """Get a unique row a FK id or return an empty row.

    Parameters
    ----------
    FK_id : int
    FK_name : str
    is_required : bool
    model : BaseModel

    Returns
    -------
    row
    """
    if FK_id:
        try:
            row = model.get(getattr(model, FK_name) == FK_id)
        except DoesNotExist:
            if is_required:
                raise
            else:
                row = model()
    else:
        row = model()

    return row
项目:pymixup    作者:rdevost    | 项目源码 | 文件源码
def get_row_by_fields(model, **kwargs):
    """Get a unique row by arbitrary fields.

    Parameters
    ----------
    kwargs : dict of field names and values
    model : BaseModel

    Returns
    -------
    row
    """
    field_name = []
    field_value = []
    try:
        for name, value in kwargs.iteritems():
            field_name.append(getattr(model, name))  # Make sure column exists
            field_value.append(value)
    except AssertionError as e:
        raise

    try:
        if len(field_name) == 1:
            row = model.get(field_name[0] == field_value[0])
        elif len(field_name) == 2:
            row = model.get(field_name[1] == field_value[1])
        elif len(field_name) == 3:
            row = model.get(field_name[2] == field_value[2])
        elif len(field_name) == 4:
            row = model.get(field_name[3] == field_value[3])
        elif len(field_name) == 5:
            row = model.get(field_name[4] == field_value[4])
        else:
            raise ValueError('Too many arguments for row lookup')
    except DoesNotExist:
        raise
    else:
        return row
项目:argosd    作者:danielkoster    | 项目源码 | 文件源码
def _get_existing_episode_from_database(episode):
        """Retrieves an existing episode from the database.
        An episode is equal if it has the same show, season, episode
        and quality. We store multiple show+season+episode Episodes if
        the quality is different so later on we can decide which
        we want to download and perhaps wait for a better quality episode."""
        try:
            return Episode.select() \
                .where(Episode.show == episode.show) \
                .where(Episode.season == episode.season) \
                .where(Episode.episode == episode.episode) \
                .where(Episode.quality == episode.quality) \
                .get()
        except DoesNotExist:
            return None
项目:argosd    作者:danielkoster    | 项目源码 | 文件源码
def _is_episode_downloaded(episode):
        """Checks if this item has already been downloaded."""
        try:
            Episode.select() \
                .where(Episode.show == episode.show) \
                .where(Episode.season == episode.season) \
                .where(Episode.episode == episode.episode) \
                .where(Episode.is_downloaded == 1) \
                .get()
            return True
        except DoesNotExist:
            return False
项目:argosd    作者:danielkoster    | 项目源码 | 文件源码
def get(show_id):
        """Handles GET requests. Returns a single show."""
        try:
            show = Show.get(Show.id == show_id)
            return show.to_dict()
        except DoesNotExist:
            abort(404)
项目:argosd    作者:danielkoster    | 项目源码 | 文件源码
def delete(show_id):
        """Handles DELETE requests. Deletes a show."""
        try:
            show = Show.get(Show.id == show_id)
            show.delete_instance()
            return '', 204
        except DoesNotExist:
            abort(404)
项目:autobahn-sync    作者:Scille    | 项目源码 | 文件源码
def startup_library_service():
    wamp = autobahn_sync.AutobahnSync()
    wamp.run()
    db = pw.SqliteDatabase('books.db')

    class Book(pw.Model):
        title = pw.CharField()
        author = pw.CharField()

        class Meta:
            database = db

    try:
        db.create_table(Book)
    except pw.OperationalError:
        pass

    @wamp.register('com.library.get_book')
    def get_book(id):
        try:
            b = Book.get(id=id)
        except pw.DoesNotExist:
            return {'_error': "Doesn't exist"}
        return {'id': id, 'title': b.title, 'author': b.author}

    @wamp.register('com.library.new_book')
    def new_book(title, author):
        book = Book(title=title, author=author)
        book.save()
        wamp.session.publish('com.library.book_created', book.id)
        return {'id': book.id}
项目:mmpdb    作者:rdkit    | 项目源码 | 文件源码
def get_object_or_404(query_or_model, *query):
    if not isinstance(query_or_model, SelectQuery):
        query_or_model = query_or_model.select()
    try:
        return query_or_model.where(*query).get()
    except DoesNotExist:
        abort(404)
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def get_user_battery():
    username = get_jwt_identity()
    # username = request.args.get("username")
    try:
        battery, battery_record = battery_rent_service.get_user_battery(
            username=username
        )
        # battery = model_to_dict(battery)
        # battery_record = model_to_dict(battery_record)
        battery = {
            "serial_number": battery.serial_number,
            "rent_date": battery_record.rent_date
        }
        return jsonify({
            'response': {
                "battery": battery,
                # "battery_record": battery_record
            }
        }), 200
    except DoesNotExist as e:
        return jsonify({
            'response': {
                "battery": [],
            }
        }), 200
        # return jsonify({
        #     'response': {
        #         'error': e.args,
        #         'message': '?????'
        #     }
        # }), 400


# 3. ????