我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用peewee.DoesNotExist()。
def get_player(self, login=None, pk=None, lock=True): """ Get player by login or primary key. :param login: Login. :param pk: Primary Key identifier. :param lock: Lock for a sec when receiving. :return: Player or exception if not found :rtype: pyplanet.apps.core.maniaplanet.models.Player """ try: if login: return await Player.get_by_login(login) elif pk: return await Player.get(pk=pk) else: raise PlayerNotFound('Player not found.') except DoesNotExist: if lock: await asyncio.sleep(4) return await self.get_player(login=login, pk=pk, lock=False) else: raise PlayerNotFound('Player not found.')
def check_db_schema_version(): """ Ensure DB schema is correct version. Drop tables if not. """ db_schema_version = None try: db_schema_version = Setting.get(Setting.name == 'DB_SCHEMA_VERSION').value except (peewee.OperationalError, peewee.DoesNotExist, peewee.ProgrammingError) as e: printdbg("[info]: Can't get DB_SCHEMA_VERSION...") printdbg("[info]: SCHEMA_VERSION (code) = [%s]" % SCHEMA_VERSION) printdbg("[info]: DB_SCHEMA_VERSION = [%s]" % db_schema_version) if (SCHEMA_VERSION != db_schema_version): printdbg("[info]: Schema version mis-match. Syncing tables.") try: existing_table_names = db.get_tables() existing_models = [m for m in db_models() if m._meta.db_table in existing_table_names] if (existing_models): printdbg("[info]: Dropping tables...") db.drop_tables(existing_models, safe=False, cascade=False) except (peewee.InternalError, peewee.OperationalError, peewee.ProgrammingError) as e: print("[error] Could not drop tables: %s" % e)
def users(self, ctx: commands.Context, count=10): """ Show users with the most messages """ count = count # Show at least 1 user and 20 at most count = max(1, count) count = min(20, count) try: server = self.orm.Server.get(discord_id=ctx.message.server.id) except DoesNotExist as e: # If there's no server in the db an exception will be raised self.config.logger.error(e) else: users = await self.orm.query.user_top_list(count, server) embed = discord.Embed(color=discord.Color(self.config.constants.embed_color), timestamp=datetime.datetime.now()) for user in users: # the user might not have a name if s/he hasn't sent a message already # so in that case use the id instead name = user.name if user.name != '' else user.discord_id embed.add_field(name=name, value='Total messages: {}'.format(user.count), inline=False) await self.bot.say(content='Top active users:', embed=embed)
def get_obfuscated_by_lower(obfuscated_lower, **kwargs): """Get an identifier row from the lowercase obfuscated value. Parameters ---------- obfuscated_lower: str Returns ------- identifier_row """ try: identifier_row = get_row_by_fields(Identifier, obfuscated_lower=obfuscated_lower) return identifier_row.name except DoesNotExist: return None
def get_obfuscated_name(identifier_name, **kwargs): """Get obfuscated name of an identifier. Parameters ---------- identifier_name : str Returns ------- obfuscated_name : str """ try: identifier_row = get_identifier_by_name(identifier_name) return identifier_row.obfuscated_name except DoesNotExist: return identifier_name
def add_attribs_reserveds_list(attrib_list): """Add attributes that follow a reserved name to reserveds list.""" if len(attrib_list) > 1: # A single item does not change is_reserved = False reserved_set = set() for name in attrib_list: if not is_reserved: try: get_reserved_by_name(name) is_reserved = True package_name = name continue # Don't add already reserved name except DoesNotExist: continue if is_reserved: reserved_set.add(name) if reserved_set: add_reserveds(package_name, reserved_set)
def test_logic_reserved_delete(): from logic.reserved import delete_reserved, get_reserved, \ save_reserved reserved_row = get_reserved(None) assert save_reserved( reserved_row, name=u'Reserved Two') assert get_reserved(2).id == 2, 'Record not present or wrong id' # Delete an existing reserved row = get_reserved(2) assert delete_reserved(row) == 1 with pytest.raises(DoesNotExist): assert get_reserved(2).id == 2, 'Record not found' # Fail to delete a non-existing reserved with pytest.raises(DoesNotExist): assert delete_reserved(row)
def test_logic_identifier_delete(): from logic.identifier import delete_identifier, get_identifier, \ save_identifier identifier_row = get_identifier(None) assert save_identifier( identifier_row, name=u'Identifier Two') assert get_identifier(2).id == 2, 'Record not present or wrong id' # Delete an existing identifier row = get_identifier(2) assert delete_identifier(row) == 1 with pytest.raises(DoesNotExist): assert get_identifier(2).id == 2, 'Record not found' # Fail to delete a non-existing identifier with pytest.raises(DoesNotExist): assert delete_identifier(row)
def test_create_decision(): from logic.identifier import Identifier from logic.identifier import get_identifier time_before = datetime.datetime.now() identifier_row = get_identifier(None) time_after = datetime.datetime.now() assert not identifier_row.id assert identifier_row.name == Identifier.name.default assert identifier_row.obfuscated_name == Identifier.obfuscated_name.default assert time_before <= identifier_row.created_timestamp <= time_after # Fail at bad parameter with pytest.raises(DoesNotExist): get_identifier(999)
def test_add_from_import(): bnf_parser = ObfuscatePythonBNF(get_obfuscated_name) # Reserve imported modules if first from module is reserved bnf_parser.from_import.parseString( " from some_module.reserved_one import is_reserved, also_is") assert get_reserved_by_name('is_reserved') assert get_reserved_by_name('also_is') # Reserve imported modules if first from module is reserved bnf_parser.from_import.parseString( " from reserved_one.some_module import is_reserved, also_reserved") assert get_reserved_by_name('is_reserved').primary_package == \ 'reserved_one' assert get_reserved_by_name('also_reserved').primary_package == \ 'reserved_one' # Import (without a from) should take no action bnf_parser.from_import.parseString( " import reserved_one, not_reserved_1, also_not_1") with pytest.raises(DoesNotExist): get_reserved_by_name('not_reserved_1') with pytest.raises(DoesNotExist): get_reserved_by_name('also_not_1')
def test_add_except_error(): bnf_parser = ObfuscatePythonBNF(get_obfuscated_name) # Reserve exception names bnf_parser.except_error.parseString( " except (FloatingPointError, SomeOtherError) as exc_err") assert get_reserved_by_name('FloatingPointError') assert get_reserved_by_name('SomeOtherError') with pytest.raises(DoesNotExist): assert get_reserved_by_name('exc_err') # Reserve exception names with double tab bnf_parser.except_error.parseString( " except (FloatingPointError, SomeOtherError) as exc_err") assert get_reserved_by_name('FloatingPointError') assert get_reserved_by_name('SomeOtherError') with pytest.raises(DoesNotExist): assert get_reserved_by_name('exc_err')
def test_add_kivy_import(): bnf_parser = ObfuscateKivyBNF(get_obfuscated_name) # Reserve imported modules from first reserved module bnf_parser.kivy_import.parseString( "#: import is_reserved some_module.reserved_one") assert get_reserved_by_name('is_reserved').primary_package == \ 'reserved_one' with pytest.raises(DoesNotExist): get_reserved_by_name('some_module') # Reserve imported modules if first from module is reserved bnf_parser.kivy_import.parseString( "#: import is_reserved reserved_one.some_module") assert get_reserved_by_name('is_reserved').primary_package == \ 'reserved_one' assert get_reserved_by_name('some_module').primary_package == \ 'reserved_one' # Import without directive should take no action bnf_parser.kivy_import.parseString( "import not_reserved_1 reserved_one") with pytest.raises(DoesNotExist): get_reserved_by_name('not_reserved_1')
def get_row(model, row_id, **kwargs): """Get a database row by its id. Parameters ---------- model : BaseModel row_id : int or None Returns ------- row """ try: if row_id: try: row = model.get(model.id == row_id) except DoesNotExist: raise else: row = model() except ImportError: raise return row
def get_row_by_name(model, row_name, FK_name=None, FK_id=None, **kwargs): """Get a unique row by its name or by name plus FK if a FK is needed. Parameters ---------- FK_id : int FK_name : str model : BaseModel row_name : str Returns ------- row """ try: if FK_name: row = model.get(model.name == row_name, getattr(model, FK_name) == FK_id) else: row = model.get(model.name == row_name) except DoesNotExist: raise else: return row
def _process_download_command(data): matches = re.search('download (\d{1,})', data) if matches is not None: episode_id = int(matches.group(1)) try: episode = Episode.get(Episode.id == episode_id) torrentclient = Transmission() torrentclient.download_episode(episode) episode.is_downloaded = True episode.save() return True except DoesNotExist: error = 'Tried to download non-existing episode with ID: %d' logging.error(error % episode_id) return False except TorrentClientException as e: logging.error('TorrentClientException occured: %s' % str(e)) return False else: logging.error('Incorrect download command received: %s' % data) return False
def get_virtual_card(): username = request.args.get("username") try: virtual_card = virtual_card_service.get_virtual_card( card_no=username ) virtual_card = model_to_dict(virtual_card, recurse=False) return jsonify({'response': virtual_card}), 200 except DoesNotExist as e: return jsonify({ 'response': { 'error': e.args, 'message': '????????' } }), 400 # ????
def get_battery(serial_number): """ get battery rent information :param serial_number: battery serial_number :return: information """ try: battery = battery_rent_service.get_battery(serial_number) return jsonify({'response': model_to_dict(battery)}), 200 except DoesNotExist as e: return jsonify({ 'response': { "error": '%s: %s' % (str(DoesNotExist), e.args), "message": '%s' % e.args } }), 400 # 2. ?????? ????
def validate(self, name, data): """ If there is a problem with the data, raise ValidationError. :param name: The name of this field. :param data: Dictionary of data for all fields. :raises: ValidationError """ super().validate(name, data) if self.value is not None: try: # self.query could be a query like "User.select()" or a model like "User" # so ".select().where()" handles both cases. self.value = [self.query.select().where(self.lookup_field == v).get() for v in self.value if v] except (AttributeError, ValueError, peewee.DoesNotExist): raise ValidationError('related', field=self.lookup_field.name, values=self.value)
def add_image(dataset_pk, image): try: dataset = db.Dataset.get(db.Dataset.dataset == dataset_pk) except peewee.DoesNotExist: print("Can't find any dataset with id {}".format(dataset_pk)) sys.exit(1) with open(image, 'rb') as f: data = f.read() try: mimetype = infer_mimetype( image ) except NoMimetypeException: print("Can't find mime type for <{}>".format(image)) sys.exit(2) print(len(data)) with db.database.atomic(): db.DatasetLogo.create( dataset = dataset, mimetype = mimetype, data = data )
def get_current_user(self): email = self.get_secure_cookie('email') name = self.get_secure_cookie('user') # Fix ridiculous bug with quotation marks showing on the web if name and (name[0] == '"') and (name[-1] == '"'): name = user[1:-1] if email: try: return db.User.select().where( db.User.email == email ).get() except peewee.DoesNotExist: ## Not saved in the database yet return db.User(email = email.decode('utf-8'), name = name.decode('utf-8')) else: return None
def get_map(self, uid=None): """ Get map instance by uid. :param uid: By uid (pk). :return: Player or exception if not found """ try: return await Map.get_by_uid(uid) except DoesNotExist: raise MapNotFound('Map not found.')
def get_map_by_index(self, index): """ Get map instance by index id (primary key). :param index: Primary key index. :return: Map instance or raise exception. """ try: return await Map.get(id=index) except DoesNotExist: raise MapNotFound('Map not found.')
def register(self, name, description='', app=None, min_level=1, namespace=None): """ Register a new permission. :param name: Name of permission :param description: Description in english. :param app: App instance to retrieve the label. :param min_level: Minimum level required. :param namespace: Namespace, only for core usage! :return: Permission instance. """ if not namespace and app: namespace = app.label if not namespace: raise Exception('Namespace is required. You should give your app instance with app=app instead!') try: perm = await self.get_perm(namespace=namespace, name=name) # TODO: Implement overrides on min_level here. if perm.min_level != min_level: perm.min_level = min_level await perm.save() except DoesNotExist: perm = Permission(namespace=namespace, name=name, description=description, min_level=min_level) await perm.save() return perm
def get(self, name): setting_name = "__transient_%s" % (name) try: the_setting = Setting.get(Setting.name == setting_name) t = Transient.from_setting(the_setting) except Setting.DoesNotExist as e: return False if t.is_expired(): the_setting.delete_instance() return False else: return t.value
def delete(self, name): setting_name = "__transient_%s" % (name) try: s = Setting.get(Setting.name == setting_name) except Setting.DoesNotExist as e: return False return s.delete_instance() # === /models ===
def total(self, ctx: commands.Context): """ Show total amount of messages on server """ try: server = self.orm.Server.get(discord_id=ctx.message.server.id) except DoesNotExist as e: # If there's no server in the db an exception will be raised self.config.logger.error(e) else: total_messages = await self.orm.query.server_total_messages(server) await self.bot.say('Total messages: {}'.format(total_messages))
def add_identifiers(identifier_list=None, do_obfuscate=True): """Add identifier and obfuscated names to Identifiers table. Parameters ---------- do_obfuscate : bool identifier_list : list """ for identifier_name in identifier_list: # Skip identifiers in reserved try: get_reserved_by_name(identifier_name) except DoesNotExist: pass else: continue if not do_obfuscate \ or identifier_name[0:2] == '__' \ or identifier_name == '__init__.py' \ or identifier_name.startswith('test_') \ or identifier_name.endswith('_test.py'): obfuscated_name = identifier_name else: obfuscated_name = '' identifier_row = get_identifier(None) try: save_identifier(identifier_row, name=identifier_name, obfuscated_name=obfuscated_name) except IntegrityError as e: if 'unique' not in e.message.lower(): raise # If should not be obfuscated, replace obfuscated, o/w pass if not do_obfuscate: identifier_row = get_identifier_by_name(identifier_name) if identifier_row.obfuscated_name != identifier_name: save_identifier(identifier_row, obfuscated_name=identifier_name)
def validate_obfuscated_name(self): """Make sure the obfuscated name is unique.""" if self.name != self.obfuscated_name: is_unique = False while not is_unique: if not self.obfuscated_name: random_num = base_random_number(5) self.obfuscated_name = base_alphabet_encode(random_num, 5) try: get_reserved_by_name(self.obfuscated_name) except pwe.DoesNotExist: is_unique = True else: self.obfuscated_name = None
def save_identifier(identifier_row, **kwargs): """Save and identifier row. Parameters ---------- identifier_row Returns ------- identifier_id : int """ try: for name, value in kwargs.iteritems(): getattr(identifier_row, name) # Make sure column exists setattr(identifier_row, name, value) except AttributeError: raise with obfuscatedb.atomic(): try: identifier_id = save_row(identifier_row, **kwargs) except IntegrityError as e: # Resave with different obfuscated_name if already exists if 'unique' in e.message.lower() \ and not kwargs.get('obfuscated_name', None) \ and 'obfuscated_name' in e.message: identifier_row.obfuscated_name = None identifier_id = save_identifier(identifier_row) else: raise except DoesNotExist: raise return identifier_id
def add_from_import(self, from_import_list): """Add imported modules from reserved modules to reserved. Parameters ---------- from_import_list : list """ if not from_import_list or \ from_import_list[0] != 'from' or \ 'import' not in from_import_list[:]: return reserved_list = set() import_index = from_import_list[:].index('import') package_name = '' is_reserved = False for reserve_name in from_import_list[1:import_index]: # Start with first reserved directory in tree (if one exists) if not is_reserved: try: get_reserved_by_name(reserve_name) is_reserved = True package_name = reserve_name except DoesNotExist: continue if is_reserved: if reserve_name[0].isalpha() or reserve_name[0] == '_': reserved_list.add(reserve_name) if is_reserved: # Get imported items for reserve_name in from_import_list[import_index+1:]: if reserve_name[0].isalpha() or reserve_name[0] == '_': reserved_list.add(reserve_name) add_reserveds(package_name, reserved_list)
def add_kivy_import(self, kivy_import_list): """Add imported modules from reserved modules to reserved. Parameters ---------- kivy_import_list : list Kivy import statement. """ if not kivy_import_list or \ kivy_import_list[0].strip() != '#:' or \ kivy_import_list[1] != 'import': return reserved_list = set() package_name = '' is_reserved = False for reserve_name in kivy_import_list[3:]: # Start with first reserved directory in tree (if one exists) if not is_reserved: try: get_reserved_by_name(reserve_name) is_reserved = True package_name = reserve_name except DoesNotExist: continue if is_reserved: if reserve_name[0].isalpha() or reserve_name[0] == '_': reserved_list.add(reserve_name) if is_reserved: reserved_list.add(kivy_import_list[2]) add_reserveds(package_name, reserved_list)
def test_get_reserved(): # Get good reserved_id reserved_row = get_reserved(1) assert reserved_row.id == 1 assert reserved_row.name == u'reserved_one' # Fail at getting bad reserved_id with pytest.raises(DoesNotExist): get_reserved(999)
def test_create_decision(): from logic.reserved import Reserved from logic.reserved import get_reserved time_before = datetime.datetime.now() reserved_row = get_reserved(None) time_after = datetime.datetime.now() assert not reserved_row.id assert reserved_row.name == Reserved.name.default assert time_before <= reserved_row.created_timestamp <= time_after # Fail at bad parameter with pytest.raises(DoesNotExist): get_reserved(999)
def test_get_identifier(): # Get good identifier_id identifier_row = get_identifier(1) assert identifier_row.id == 1 assert identifier_row.name == u'identifier_one' # Fail at getting bad identifier_id with pytest.raises(DoesNotExist): get_identifier(999)
def test_get_identifier_by_name(): # Get good identifier_id identifier_row = get_identifier_by_name('identifier_one') assert identifier_row.id == 1 assert identifier_row.name == u'identifier_one' # Fail at getting bad identifier_id with pytest.raises(DoesNotExist): get_identifier_by_name('junk')
def test_add_reserveds(): # Define values to be updated, and their new values names_to_update = { "variable": "v001", "variable_row": "v002", "scenario": "s001", "decision": "d001", "decision_row": "d002" } for ident_name, obfuscated_name in names_to_update.iteritems(): identifier_row = get_identifier(None) save_identifier( identifier_row, name=ident_name, obfuscated_name=obfuscated_name ) bnf_parser = ObfuscatePythonBNF(get_obfuscated_name) # Identifiers after leading reserved attributes should not be obfuscated # They should be added to reserved and removed from identifiers bnf_parser.attribs.parseString( "decision.scenario(variable) = reserved_one.variable " "+ view.reserved_one") assert bnf_parser.statement.transformString( "decision.scenario(variable) = reserved_one.variable") == \ "d001.s001(variable)=reserved_one.variable" assert get_reserved_by_name('variable').primary_package == 'reserved_one' assert get_identifier_by_name('variable').obfuscated_name == 'variable' with pytest.raises(DoesNotExist): assert get_identifier_by_name('view').obfuscated_name != 'view'
def test_add_reserveds(): # Define values to be updated, and their new values names_to_update = { "variable": "v001", "variable_row": "v002", "scenario": "s001", "decision": "d001", "decision_row": "d002" } for ident_name, obfuscated_name in names_to_update.iteritems(): identifier_row = get_identifier(None) save_identifier( identifier_row, name=ident_name, obfuscated_name=obfuscated_name ) bnf_parser = ObfuscateKivyBNF(get_obfuscated_name) # Identifiers after leading reserved attributes should not be obfuscated # They should be added to reserved and removed from identifiers bnf_parser.attribs.parseString( "decision.scenario(variable) = reserved_one.variable " "+ view.reserved_one") assert bnf_parser.statement.transformString( "decision.scenario(variable) = reserved_one.variable") == \ "d001.s001(variable)=reserved_one.variable" assert get_reserved_by_name('variable').primary_package == 'reserved_one' assert get_identifier_by_name('variable').obfuscated_name == 'variable' with pytest.raises(DoesNotExist): assert get_identifier_by_name('view').obfuscated_name != 'view'
def get_row_by_FK(model, FK_name, FK_id, is_required=False, **kwargs): """Get a unique row a FK id or return an empty row. Parameters ---------- FK_id : int FK_name : str is_required : bool model : BaseModel Returns ------- row """ if FK_id: try: row = model.get(getattr(model, FK_name) == FK_id) except DoesNotExist: if is_required: raise else: row = model() else: row = model() return row
def get_row_by_fields(model, **kwargs): """Get a unique row by arbitrary fields. Parameters ---------- kwargs : dict of field names and values model : BaseModel Returns ------- row """ field_name = [] field_value = [] try: for name, value in kwargs.iteritems(): field_name.append(getattr(model, name)) # Make sure column exists field_value.append(value) except AssertionError as e: raise try: if len(field_name) == 1: row = model.get(field_name[0] == field_value[0]) elif len(field_name) == 2: row = model.get(field_name[1] == field_value[1]) elif len(field_name) == 3: row = model.get(field_name[2] == field_value[2]) elif len(field_name) == 4: row = model.get(field_name[3] == field_value[3]) elif len(field_name) == 5: row = model.get(field_name[4] == field_value[4]) else: raise ValueError('Too many arguments for row lookup') except DoesNotExist: raise else: return row
def _get_existing_episode_from_database(episode): """Retrieves an existing episode from the database. An episode is equal if it has the same show, season, episode and quality. We store multiple show+season+episode Episodes if the quality is different so later on we can decide which we want to download and perhaps wait for a better quality episode.""" try: return Episode.select() \ .where(Episode.show == episode.show) \ .where(Episode.season == episode.season) \ .where(Episode.episode == episode.episode) \ .where(Episode.quality == episode.quality) \ .get() except DoesNotExist: return None
def _is_episode_downloaded(episode): """Checks if this item has already been downloaded.""" try: Episode.select() \ .where(Episode.show == episode.show) \ .where(Episode.season == episode.season) \ .where(Episode.episode == episode.episode) \ .where(Episode.is_downloaded == 1) \ .get() return True except DoesNotExist: return False
def get(show_id): """Handles GET requests. Returns a single show.""" try: show = Show.get(Show.id == show_id) return show.to_dict() except DoesNotExist: abort(404)
def delete(show_id): """Handles DELETE requests. Deletes a show.""" try: show = Show.get(Show.id == show_id) show.delete_instance() return '', 204 except DoesNotExist: abort(404)
def startup_library_service(): wamp = autobahn_sync.AutobahnSync() wamp.run() db = pw.SqliteDatabase('books.db') class Book(pw.Model): title = pw.CharField() author = pw.CharField() class Meta: database = db try: db.create_table(Book) except pw.OperationalError: pass @wamp.register('com.library.get_book') def get_book(id): try: b = Book.get(id=id) except pw.DoesNotExist: return {'_error': "Doesn't exist"} return {'id': id, 'title': b.title, 'author': b.author} @wamp.register('com.library.new_book') def new_book(title, author): book = Book(title=title, author=author) book.save() wamp.session.publish('com.library.book_created', book.id) return {'id': book.id}
def get_object_or_404(query_or_model, *query): if not isinstance(query_or_model, SelectQuery): query_or_model = query_or_model.select() try: return query_or_model.where(*query).get() except DoesNotExist: abort(404)
def get_user_battery(): username = get_jwt_identity() # username = request.args.get("username") try: battery, battery_record = battery_rent_service.get_user_battery( username=username ) # battery = model_to_dict(battery) # battery_record = model_to_dict(battery_record) battery = { "serial_number": battery.serial_number, "rent_date": battery_record.rent_date } return jsonify({ 'response': { "battery": battery, # "battery_record": battery_record } }), 200 except DoesNotExist as e: return jsonify({ 'response': { "battery": [], } }), 200 # return jsonify({ # 'response': { # 'error': e.args, # 'message': '?????' # } # }), 400 # 3. ????