我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用peewee.SelectQuery()。
def search_reserveds(search_text, **kwargs): """Search the Reserved table.""" is_first = True where_clause = None if search_text: search_values = re.sub('[~`%&-+=]', '', search_text).split() for s in search_values: s_ = u'%'+s+u'%' if is_first: where_clause = (Reserved.name ** s_) is_first = False else: where_clause = where_clause & \ (Reserved.name ** s_) if is_first: where_clause = (~(Reserved.id >> None)) select_clause = pwe.SelectQuery(Reserved, Reserved.id, Reserved.name) order_by_clause = Reserved.name reserved_list = search_rows(select_clause, where_clause, order_by_clause) return reserved_list
def add(self, value, clear_existing=False): if clear_existing: self.clear() fd = self._field_descriptor if isinstance(value, SelectQuery): query = value.select( SQL(str(self._instance.get_id())), fd.rel_model._meta.primary_key) fd.through_model.insert_from( fields=[fd.src_fk, fd.dest_fk], query=query).execute() else: if not isinstance(value, (list, tuple)): value = [value] if not value: return inserts = [{ fd.src_fk.name: self._instance.get_id(), fd.dest_fk.name: rel_id} for rel_id in self._id_list(value)] fd.through_model.insert_many(inserts).execute()
def remove(self, value): fd = self._field_descriptor if isinstance(value, SelectQuery): subquery = value.select(value.model_class._meta.primary_key) return (fd.through_model .delete() .where( (fd.dest_fk << subquery) & (fd.src_fk == self._instance.get_id())) .execute()) else: if not isinstance(value, (list, tuple)): value = [value] if not value: return return (fd.through_model .delete() .where( (fd.dest_fk << self._id_list(value)) & (fd.src_fk == self._instance.get_id())) .execute())
def __set__(self, instance, value): mtv = instance._meta.db_table miv = instance._get_pk_value() if (isinstance(value, SelectQuery) and value.model_class == self.model_class): UpdateQuery(self.model_class, { self.model_type_field: mtv, self.model_id_field: miv, }).where(value._where).execute() elif all(map(lambda i: isinstance(i, self.model_class), value)): for obj in value: setattr(obj, self.model_type_field.name, mtv) setattr(obj, self.model_id_field.name, miv) obj.save() else: raise ValueError('ReverseGFK field unable to handle "%s"' % value)
def dataClean(): if EdulistModel.table_exists() == False: EdulistModel.create_table() schools = SelectQuery(EdulistModel).select().where(EdulistModel.postcode == '') print(schools.count()) for school in schools: # category = school.category print(school.postcode) # category = category.replace(' ', '') # category = category.replace('\t', '') # category = category.replace('\n', '') # category = category.replace('\r', '?') # print(category) # school.category = category # school.save()
def add(self, value, clear_existing=False): if clear_existing: await self.clear() fd = self._field_descriptor if isinstance(value, SelectQuery): query = value.select( SQL(str(self._instance.get_id())), fd.rel_model._meta.primary_key) await fd.through_model.insert_from( fields=[fd.src_fk, fd.dest_fk], query=query).execute() else: if not isinstance(value, (list, tuple)): value = [value] if not value: return inserts = [{ fd.src_fk.name: self._instance.get_id(), fd.dest_fk.name: rel_id} for rel_id in self._id_list(value)] await fd.through_model.insert_many(inserts).execute()
def remove(self, value): fd = self._field_descriptor if isinstance(value, SelectQuery): subquery = value.select(value.model_class._meta.primary_key) return await (fd.through_model .delete() .where( (fd.dest_fk << subquery) & (fd.src_fk == self._instance.get_id())) .execute()) else: if not isinstance(value, (list, tuple)): value = [value] if not value: return return await (fd.through_model .delete() .where( (fd.dest_fk << self._id_list(value)) & (fd.src_fk == self._instance.get_id())) .execute())
def get_page_context(self): """ Return current page context """ try: page = int(self.get_argument('page', 1)) except ValueError: page = 1 try: count = peewee.SelectQuery(Record).count() except peewee.IntegrityError: count = 0 page_count = int(count/env.ADMIN_ITEMS_PER_PAGE) + \ int(bool(count % env.ADMIN_ITEMS_PER_PAGE)) prev_page, page, next_page = self.paging(page, page_count) try: records = Record\ .select()\ .order_by( Record.active.desc(), Record.uts.desc())\ .paginate(page, paginate_by=env.ADMIN_ITEMS_PER_PAGE) except peewee.IntegrityError: records = [] return dict(records=records, count=count, page_count=page_count, prev_page=prev_page, page=page, next_page=next_page)
def jobs(project, page): """ For when no job id was given. """ g.selected_tab = "jobs" # Check if project argument is correct project_query = Project.select().where(Project.slug == project).first() if project_query is None: flash("Invalid project.") return redirect(url_for("projects")) session["project"] = project_query page = int(page) if page <= 0: flash("invalid page number") return redirect(project + "/jobs/1") count = peewee.SelectQuery(Job).count() query = (Job .select() .where(Job.project == project_query) .order_by(-Job.started.is_null(), -Job.started) .paginate(page, JOBSPERPAGE)) entries = [] for job in query: span = job.ended if job.started is not None: if job.ended is not None: span = job.ended - job.started entries.append(dict(id=job.id, status=job.get_status(), name=job.name, start=job.started, end=job.ended, span=span)) more = count > (page * JOBSPERPAGE) less = page > 1 pagedata = dict(id=page, more=more, less=less) return render_template("/jobs.html", entries=entries, page=pagedata)
def search_identifiers(search_text, **kwargs): """Fetch identifier rows. Parameters ---------- search_text : str or None Returns ------- identifier_list """ is_first = True where_clause = None if search_text: search_values = re.sub('[~`%&-+=]', '', search_text).split() for s in search_values: s_ = u'%'+s+u'%' if is_first: where_clause = (Identifier.name ** s_) is_first = False else: where_clause = where_clause & \ (Identifier.name ** s_) if is_first: where_clause = (~(Identifier.id >> None)) select_clause = pwe.SelectQuery(Identifier, Identifier.id, Identifier.name) order_by_clause = Identifier.name identifier_list = search_rows(select_clause, where_clause, order_by_clause) return identifier_list
def __init__(self, query_or_model, paginate_by, page_var='page', check_bounds=False): self.paginate_by = paginate_by self.page_var = page_var self.check_bounds = check_bounds if isinstance(query_or_model, SelectQuery): self.query = query_or_model self.model = self.query.model_class else: self.model = query_or_model self.query = self.model.select()
def get_object_or_404(query_or_model, *query): if not isinstance(query_or_model, SelectQuery): query_or_model = query_or_model.select() try: return query_or_model.where(*query).get() except DoesNotExist: abort(404)
def get_item(cls, id=None, order_by='id', order_type='desc', to_dict=True): if id is not None: data = get_object_or_404(cls, cls.id == id) else: order_field = getattr(cls, order_by) if order_type.lower() == 'asc': data = cls.select().order_by(-order_field) else: data = cls.select().order_by(+order_field) if to_dict: if isinstance(data, peewee.SelectQuery): data = [model2dict(i) for i in data] else: data = model2dict(data) return data
def fixData(): # p = Pool(100) schools = SelectQuery(EdulistModel).select().where(EdulistModel.postcode == '') print(schools.count()) # for school in schools: # p.apply_async(updatePostCode, args=(school,)) # # p.close() # p.join() # dataClean()
def test_search_fields_must_be_provided(post_model): """Ensure that search fields need to be set.""" # Nothing is defined to search on, so error out with pytest.raises(AttributeError): post_model.search('hello') # If fields are provided, return a query assert isinstance(post_model.search('hello', fields=('body',)), peewee.SelectQuery)
def base_query(cls): """Method that should return the basic query that all queries will use. The default base query that just returns a blank ``SELECT`` statement. It should be overridden in child classes when ``JOIN`` or ``WHERE`` will be needed for all queries. Returns: :py:class:`peewee.SelectQuery`: An unexecuted query with no logic applied. """ return cls.select()
def get_by_id(cls, record_id, execute=True): """Return a single instance of the model queried by ID. Args: record_id (int): Integer representation of the ID to query on. Keyword Args: execute (bool, optional): Should this method execute the query or return a query object for further manipulation? Returns: cls | :py:class:`peewee.SelectQuery`: If ``execute`` is ``True``, the query is executed, otherwise a query is returned. Raises: :py:class:`peewee.DoesNotExist`: Raised if a record with that ID doesn't exist. """ query = cls.base_query().where(cls.id == record_id) if execute: return query.get() return query