我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.cast()。
def register_worktime(user_id, is_workon=True): """????????DB????? :param str user_id: Slack?user_id :param bool is_workon: ???????????defalt??? """ today = datetime.date.today() s = Session() # SQLite??????????cast??????MySQL?debug??? record = (s.query(KintaiHistory) .filter(cast(KintaiHistory.registered_at, Date) == today) .filter(KintaiHistory.user_id == user_id) .filter(KintaiHistory.is_workon.is_(is_workon)) .one_or_none()) if record: record.registered_at = datetime.datetime.now() else: s.add(KintaiHistory(user_id=user_id, is_workon=is_workon)) s.commit()
def oql_for_chunk(self, chunk, include_self=False): q = self.items.filter(cast(Item.location, Geometry).contained(envelope(chunk))) tags = set() for item in q: tags |= set(item.tags) tags.difference_update(skip_tags) tags = matcher.simplify_tags(tags) if not(tags): print('no tags, skipping') return ymin, ymax, xmin, xmax = chunk bbox = '{:f},{:f},{:f},{:f}'.format(ymin, xmin, ymax, xmax) oql = overpass.oql_for_area(self.overpass_type, self.osm_id, tags, bbox, None, include_self=include_self) return oql
def get_user(user_id): secret = request.headers.get('Authorization') user = db_session.query(api.models.User).filter(api.models.User.id == user_id). \ filter(api.models.User.secret == secret).one_or_none() # get koin_count, mission_count_today and mission_count mission_count = db_session.query(api.models.Solution).filter(api.models.Solution.user_id == user_id)\ .filter(api.models.Solution.valid).count() mission_count_today = db_session.query(api.models.Solution).filter(api.models.Solution.user_id == user_id)\ .filter(api.models.Solution.valid)\ .filter(cast(api.models.Solution.create_date, Date) == date.today()).count() koin_count = db_session.query(func.sum(api.models.Solution.koin_count)).filter(api.models.Solution.user_id == user_id).scalar() logger.debug('user '+str(user_id)+' logged in') if user: return user.dump(mission_count=mission_count, mission_count_today=mission_count_today, koin_count=koin_count) return 'Unauthorized', 401
def cast(self, type_): """Produce a type cast, i.e. ``CAST(<expression> AS <type>)``. This is a shortcut to the :func:`~.expression.cast` function. .. versionadded:: 1.0.7 """ return Cast(self, type_)
def handle_info_json(self, model, info, fulltextsearch=None): """Handle info JSON query filter.""" clauses = [] headlines = [] order_by_ranks = [] if '::' in info: pairs = info.split('|') for pair in pairs: if pair != '': k,v = pair.split("::") if fulltextsearch == '1': vector = _entity_descriptor(model, 'info')[k].astext clause = func.to_tsvector(vector).match(v) clauses.append(clause) if len(headlines) == 0: headline = func.ts_headline(self.language, vector, func.to_tsquery(v)) headlines.append(headline) order = func.ts_rank_cd(func.to_tsvector(vector), func.to_tsquery(v), 4).label('rank') order_by_ranks.append(order) else: clauses.append(_entity_descriptor(model, 'info')[k].astext == v) else: info = json.dumps(info) clauses.append(cast(_entity_descriptor(model, 'info'), Text) == info) return clauses, headlines, order_by_ranks
def _set_orderby_desc(self, query, model, limit, last_id, offset, descending, orderby): """Return an updated query with the proper orderby and desc.""" if orderby == 'fav_user_ids': n_favs = func.coalesce(func.array_length(model.fav_user_ids, 1), 0).label('n_favs') query = query.add_column(n_favs) if orderby in ['created', 'updated', 'finish_time']: if descending: query = query.order_by(desc( cast(getattr(model, orderby), TIMESTAMP))) else: query = query.order_by(cast(getattr(model, orderby), TIMESTAMP)) else: if orderby != 'fav_user_ids': if descending: query = query.order_by(desc(getattr(model, orderby))) else: query = query.order_by(getattr(model, orderby)) else: if descending: query = query.order_by(desc("n_favs")) else: query = query.order_by("n_favs") if last_id: query = query.limit(limit) else: query = query.limit(limit).offset(offset) return query
def index(): conn,curr = sphinx_conn() totalsql = 'select count(*) from film' curr.execute(totalsql) totalcounts = curr.fetchall() total = int(totalcounts[0]['count(*)']) sphinx_close(curr,conn) keywords=Search_Keywords.query.order_by(Search_Keywords.order).limit(6) form=SearchForm() today = db.session.query(func.sum(Search_Statusreport.new_hashes)).filter(cast(Search_Statusreport.date, Date) == datetime.date.today()).scalar() return render_template('index.html',form=form,keywords=keywords,total=total,today=today,sitename=sitename)
def get_punch_card(session): query = session.query(cast(Sighting.expire_timestamp / 300, Integer).label('ts_date'), func.count('ts_date')) \ .group_by('ts_date') \ .order_by('ts_date') if conf.REPORT_SINCE: query = query.filter(Sighting.expire_timestamp > SINCE_TIME) results = query.all() results_dict = {r[0]: r[1] for r in results} filled = [] for row_no, i in enumerate(range(int(results[0][0]), int(results[-1][0]))): filled.append((row_no, results_dict.get(i, 0))) return filled
def HybridMag(flux_parameter, band, index=None): """Returns a hybrid property describing an asinh magnitude. ``flux_parameter`` must be a column with a flux in nanomaggies. ``band`` is the band name, to determine the softening parameter. If ``flux_parameter`` is and array, ``index`` defines the position of ``band`` within the array. """ @hybrid_property def hybridMag(self): if index is not None: flux = getattr(self, flux_parameter)[index] else: flux = getattr(self, flux_parameter) flux *= 1e-9 # From nanomaggies to maggies bb_band = bb[band] asinh_mag = -2.5 / np.log(10) * (np.arcsinh(flux / (2. * bb_band)) + np.log(bb_band)) return asinh_mag @hybridMag.expression def hybridMag(cls): if index is not None: # It needs to be index + 1 because Postgresql arrays are 1-indexed. flux = getattr(cls, flux_parameter)[index + 1] else: flux = getattr(cls, flux_parameter) flux *= 1e-9 bb_band = bb[band] xx = flux / (2. * bb_band) asinh_mag = (-2.5 / func.log(10) * (func.log(xx + func.sqrt(func.pow(xx, 2) + 1)) + func.log(bb_band))) return cast(asinh_mag, Float) return hybridMag
def logmass(parameter): @hybrid_property def mass(self): par = getattr(self, parameter) return math.log10(par) if par > 0. else 0. @mass.expression def mass(cls): par = getattr(cls, parameter) return cast(case([(par > 0., func.log(par)), (par == 0., 0.)]), Float) return mass
def HybridRatio(line1, line2): ''' produces emission line ratio hybrid properties ''' @hybrid_property def hybridRatio(self): if type(line1) == tuple: myline1 = getattr(self, line1[0])+getattr(self, line1[1]) else: myline1 = getattr(self, line1) if getattr(self, line2) > 0: return myline1/getattr(self, line2) else: return -999. @hybridRatio.expression def hybridRatio(cls): if type(line1) == tuple: myline1 = getattr(cls, line1[0])+getattr(cls, line1[1]) else: myline1 = getattr(cls, line1) return cast(case([(getattr(cls, line2) > 0., myline1/getattr(cls, line2)), (getattr(cls, line2) == 0., -999.)]), Float) return hybridRatio
def prepare_queryset(query, model, key, value): return query.filter(cast(getattr(model, key), Date) == datetime.strptime(value[0], '%Y-%m-%dT%H:%M:%S.%fZ').date())
def prepare_queryset(query, model, key, value): return query.filter(cast(getattr(model, key), Date) >= datetime.strptime(value[0], '%Y-%m-%dT%H:%M:%S.%fZ').date())
def prepare_queryset(query, model, key, value): return query.filter(cast(getattr(model, key), Date) <= datetime.strptime(value[0], '%Y-%m-%dT%H:%M:%S.%fZ').date())
def prepare_queryset(query, model, key, values): if len(values) == 1: values = values[0].split(',') return query.filter(cast(getattr(model, key), Date) .between(datetime.strptime(values[0], '%Y-%m-%dT%H:%M:%S.%fZ').date(), datetime.strptime(values[1], '%Y-%m-%dT%H:%M:%S.%fZ').date()))
def _exclusion_in_uuid(type_, name): """ Cast UUIDs to text for our exclusion index because postgres doesn't currently allow GiST indices on UUIDs. """ return sa.cast(sa.text(name), sap.TEXT), '='
def compile_array_agg(element, compiler, **kw): compiled = "%s(%s)" % (element.name, compiler.process(element.clauses)) if element.default is None: return compiled return str(sa.func.coalesce( sa.text(compiled), sa.cast(postgresql.array(element.default), element.type) ).compile(compiler))
def search_route(): query = request.args.get('q') per_page = request.args.get('per_page', 10, type=int) page = request.args.get('page', 0, type=int) if query is None: return json.dumps({'results':[]}) result = {} with session_scope() as session: results = session.query( City.name, City.country, func.ST_Y(cast(City.location, Geometry())), func.ST_X(cast(City.location, Geometry())), City.id ) \ .filter(unaccent(City.name).ilike(unaccent('%' + query + '%'))) \ .limit(per_page + 1) \ .offset(page * per_page) \ .all() more = len(results) == per_page + 1 results = results[:per_page] result = json.dumps({ 'results': [{'id': c[4], 'text': '{}, {}'.format(c[0], c[1]), 'coords': (c[2], c[3])} for i,c in enumerate(results)], 'more': more}) return result
def get_latest_results(self): """ Return results for highest run ID for each repository :return: list of PipelineRun rows """ session = self.session_factory() subq = session.query( PipelineRun.repository, func.max(cast(PipelineRun.run_id, Integer)).label('max_run_id')).group_by( PipelineRun.repository).subquery('subq') results = session.query(PipelineRun).filter( PipelineRun.repository == subq.c.repository, PipelineRun.run_id == cast(subq.c.max_run_id, String)).all() session.close() return results
def get_latest_results_for_project(self, project): """ Return results for the highest run ID for each repository in a given project. :param project: project name as string :return: list of PipelineRun rows """ session = self.session_factory() subq = session.query( PipelineRun.repository, func.max(cast(PipelineRun.run_id, Integer)).label('max_run_id')).group_by( PipelineRun.repository).filter_by(project=project).subquery('subq') results = session.query(PipelineRun).filter( PipelineRun.repository == subq.c.repository, PipelineRun.run_id == cast(subq.c.max_run_id, String)).all() session.close() return results
def _to_month(column): return func.make_date(cast(extract('year', column), Integer), cast(extract('month', column), Integer), 1)
def _get_months(query): min_ = _to_month(func.min(query.c.date)) max_ = _to_month(func.max(query.c.date)) age = func.age(max_, min_) n = cast(extract('year', age) * 12 + extract('month', age), Integer) q = db.session.query((min_ + text("interval '1' month") * func.generate_series(0, n)).label('date')) return q
def find_by_moderator(moderator: ModeratorModel, page: int, per_page: int, for_boards: List[BoardModel]) \ -> List[ReportModel]: with session() as s: q = s.query(ReportOrmModel) can_see_all_reports = has_role(moderator, roles.ROLE_ADMIN) if not can_see_all_reports: # Filter that gets all reports for the moderator id, if that moderator also has either # full permission or janitor. q = q.filter( ReportOrmModel.post_id == PostOrmModel.id, PostOrmModel.thread_id == ThreadOrmModel.id, ThreadOrmModel.board_id == BoardOrmModel.id, BoardOrmModel.id == BoardModeratorOrmModel.board_id, BoardModeratorOrmModel.moderator_id == moderator.id, BoardModeratorOrmModel.roles.overlap(cast(required_roles_for_viewing_reports(), ARRAY(String)))) else: q = q.filter( ReportOrmModel.post_id == PostOrmModel.id, PostOrmModel.thread_id == ThreadOrmModel.id, ThreadOrmModel.board_id == BoardOrmModel.id) if for_boards: board_ids = [board.id for board in for_boards] q = q.filter(BoardOrmModel.id.in_(board_ids)) q = q.order_by(desc(ReportOrmModel.date)) q = q.options( joinedload('post').joinedload('thread').joinedload('board') ) q = q.offset(page * per_page).limit(per_page) res = list(map(lambda i: ReportModel.from_orm_model(i), q.all())) s.commit() return res
def find_today_with_attrs(cls, attrs): valid_names = Team.hero_names.keys() for hero in valid_names: if hero not in attrs: attrs[hero] = False today = datetime.utcnow().date() row = Pick.query.filter_by(**attrs).\ filter(cast(Pick.uploaded_at, Date) == today).limit(1).first() if row: return row return None # Returns True if the hero the user was playing is a suggested pick for the # known team composition.
def type_coerce(expression, type_): """Associate a SQL expression with a particular type, without rendering ``CAST``. E.g.:: from sqlalchemy import type_coerce stmt = select([type_coerce(log_table.date_string, StringDateTime())]) The above construct will produce SQL that is usually otherwise unaffected by the :func:`.type_coerce` call:: SELECT date_string FROM log However, when result rows are fetched, the ``StringDateTime`` type will be applied to result rows on behalf of the ``date_string`` column. A type that features bound-value handling will also have that behavior take effect when literal values or :func:`.bindparam` constructs are passed to :func:`.type_coerce` as targets. For example, if a type implements the :meth:`.TypeEngine.bind_expression` method or :meth:`.TypeEngine.bind_processor` method or equivalent, these functions will take effect at statement compilation/execution time when a literal value is passed, as in:: # bound-value handling of MyStringType will be applied to the # literal value "some string" stmt = select([type_coerce("some string", MyStringType)]) :func:`.type_coerce` is similar to the :func:`.cast` function, except that it does not render the ``CAST`` expression in the resulting statement. :param expression: A SQL expression, such as a :class:`.ColumnElement` expression or a Python string which will be coerced into a bound literal value. :param type_: A :class:`.TypeEngine` class or instance indicating the type to which the expression is coerced. .. seealso:: :func:`.cast` """ type_ = type_api.to_instance(type_) if hasattr(expression, '__clause_element__'): return type_coerce(expression.__clause_element__(), type_) elif isinstance(expression, BindParameter): bp = expression._clone() bp.type = type_ return bp elif not isinstance(expression, Visitable): if expression is None: return Null() else: return literal(expression, type_=type_) else: return Label(None, expression, type_=type_)