我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.bindparam()。
def search(cls, connection: SQLSession, term: str) -> Sequence['Mod']: """Search for Mods that contain TERM in name or summary. Keyword arguments: connection: Database connection to ask on. term: The term to search for. Returns: Sequence of matching mods (possibly empty). """ query = SQLBakery(lambda conn: conn.query(cls)) query += lambda q: q.filter(or_( cls.name.like(bindparam('term')), cls.summary.like(bindparam('term')), )) query += lambda q: q.order_by(cls.name) return query(connection).params(term='%{}%'.format(term)).all()
def find(cls, connection: SQLSession, name: str) -> 'Mod': """Find exactly one Mod named NAME. Keyword Arguments: connection: Database connection to ask on. name: The name of the mod to search for. Returns: The requested mod. Raises: NoResultsFound: The name does not match any known mod. MultipleResultsFound: The name is too ambiguous, multiple matching mods found. """ query = SQLBakery(lambda conn: conn.query(cls)) query += lambda q: q.filter(cls.name.like(bindparam('name'))) return query(connection).params(name='{}'.format(name)).one()
def with_id(cls, connection: SQLSession, id: int) -> 'Mod': """Fetch mod with id from database. Keyword arguments: connection: Database connection to fetch from. id: The id field of the mod to get. Returns: The requested mod. Raises: NoResultFound: Mod with specified id does not exist. """ query = SQLBakery(lambda conn: conn.query(cls)) query += lambda q: q.filter(cls.id == bindparam('id')) return query(connection).params(id=id).one()
def test_bound_limit(self): table = self.tables.some_table self._assert_result( select([table]).order_by(table.c.id).limit(bindparam('l')), [(1, 1, 2), (2, 2, 3)], params={"l": 2} )
def test_bound_offset(self): table = self.tables.some_table self._assert_result( select([table]).order_by(table.c.id).offset(bindparam('o')), [(3, 3, 4), (4, 4, 5)], params={"o": 2} )
def test_bound_limit_offset(self): table = self.tables.some_table self._assert_result( select([table]).order_by(table.c.id). limit(bindparam("l")).offset(bindparam("o")), [(2, 2, 3), (3, 3, 4)], params={"l": 2, "o": 1} )
def unique_params(self, *optionaldict, **kwargs): """Return a copy with :func:`bindparam()` elements replaced. Same functionality as ``params()``, except adds `unique=True` to affected bind parameters so that multiple statements can be used. """ return self._params(True, optionaldict, kwargs)
def _params(self, unique, optionaldict, kwargs): if len(optionaldict) == 1: kwargs.update(optionaldict[0]) elif len(optionaldict) > 1: raise exc.ArgumentError( "params() takes zero or one positional dictionary argument") def visit_bindparam(bind): if bind.key in kwargs: bind.value = kwargs[bind.key] bind.required = False if unique: bind._convert_to_unique() return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
def update_canonicals(canonicals): ''' Update canonical data for android devices. ''' global ENGINE binding = [{"p_{}".format(k): v for k, v in canonical.items()} for canonical in canonicals] device_table = model.metadata.tables['device'] stmt = update(device_table).\ values(device_token_new=bindparam('p_new_token')).\ where(and_(device_table.c.login_id == bindparam('p_login_id'), func.coalesce(device_table.c.device_token_new, device_table.c.device_token) == bindparam('p_old_token'))) ENGINE.execute(stmt, binding) with session_scope() as session: query = text('SELECT keep_max_users_per_device( \ (:platform_id)::int2, :device_token, (:max_users_per_device)::int2)') for canonical in canonicals: session.execute(query, {'platform_id': constants.PLATFORM_ANDROID, 'device_token': canonical['new_token'], 'max_users_per_device': config.max_users_per_device }) session.execute(query, {'platform_id': constants.PLATFORM_ANDROID_TABLET, 'device_token': canonical['new_token'], 'max_users_per_device': config.max_users_per_device }) session.commit()
def update_unregistered_devices(unregistered): ''' Update data for unregistered Android devices. Unregistered device will not receive notifications and will be deleted when number of devices exceeds maximum. ''' global ENGINE binding = [{"p_{}".format(k): v for k, v in u.items()} for u in unregistered] device_table = model.metadata.tables['device'] stmt = update(device_table).\ values(unregistered_ts=func.now()).\ where(and_(device_table.c.login_id == bindparam('p_login_id'), func.coalesce(device_table.c.device_token_new, device_table.c.device_token) == bindparam('p_device_token'))) ENGINE.execute(stmt, binding)
def params(self, *optionaldict, **kwargs): """Return a copy with :func:`bindparam()` elements replaced. Returns a copy of this ClauseElement with :func:`bindparam()` elements replaced with values taken from the given dictionary:: >>> clause = column('x') + bindparam('foo') >>> print clause.compile().params {'foo':None} >>> print clause.params({'foo':7}).compile().params {'foo':7} """ return self._params(False, optionaldict, kwargs)
def _getPipeInfo(self, pipename): ''' Retrieve the pipeline Info for a given pipeline version name ''' assert pipename.lower() in ['drp', 'dap'], 'Pipeline Name must either be DRP or DAP' # bindparam values bindname = 'drpver' if pipename.lower() == 'drp' else 'dapver' bindvalue = self._drpver if pipename.lower() == 'drp' else self._dapver # class names if pipename.lower() == 'drp': inclasses = self._tableInQuery('cube') or 'cube' in str(self.query.statement.compile()) elif pipename.lower() == 'dap': inclasses = self._tableInQuery('file') or 'file' in str(self.query.statement.compile()) # set alias pipealias = self._drp_alias if pipename.lower() == 'drp' else self._dap_alias # get the pipeinfo if inclasses: pipeinfo = marvindb.session.query(pipealias).\ join(marvindb.datadb.PipelineName, marvindb.datadb.PipelineVersion).\ filter(marvindb.datadb.PipelineName.label == pipename.upper(), marvindb.datadb.PipelineVersion.version == bindparam(bindname, bindvalue)).one() else: pipeinfo = None return pipeinfo