我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用sqlalchemy.func()。
def _convertFieldOrFunction(self, fieldOrFunction, preferValue=False): """ Convert a string to a column reference, or a dictionary to a column or function reference. If a function is passed, this should be a canonical function reference ('func' and 'param' are both populated). :param fieldOrFunction: a string with a column name or a dictionary with either a field, function, or value. :param preferValue: if True then if fieldOrFunction is not a dictionary, return it unchanged. :returns: a constructed column or function object, or a bare value. """ if not isinstance(fieldOrFunction, dict): if preferValue: return fieldOrFunction return getattr(self.tableClass, fieldOrFunction) if 'field' in fieldOrFunction: return getattr(self.tableClass, fieldOrFunction['field']) if 'value' in fieldOrFunction: if not preferValue: return sqlalchemy.sql.elements.literal( fieldOrFunction['value']) return fieldOrFunction['value'] fieldOrFunction = self.isFunction(fieldOrFunction) if fieldOrFunction is False: raise DatabaseConnectorException('Not a function') if not self._isFunctionAllowed(fieldOrFunction['func']): raise DatabaseConnectorException('Function %s is not allowed' % fieldOrFunction['func']) param = fieldOrFunction.get('param', fieldOrFunction.get('params', [])) # Determine the function we need to call to apply the function if fieldOrFunction['func'] in ('distinct', 'cast'): if (fieldOrFunction['func'] == 'cast' and len(param) == 2 and isinstance(param[1], dict) and 'value' in param[1]): param[1]['value'] = self.types.get(param[1]['value'], param[1]['value']) funcfunc = getattr(sqlalchemy, fieldOrFunction['func']) else: funcfunc = getattr(sqlalchemy.func, fieldOrFunction['func']) return funcfunc( *[self._convertFieldOrFunction(entry, True) for entry in param])
def make_shell_context(): from pprint import pprint from flask_sqlalchemy import get_debug_queries from sqlalchemy import desc, func from scrobbler.models import Album, Artist, NowPlaying, Scrobble, Session, Token, User return dict( app=app, db=db, pprint=pprint, gq=get_debug_queries, func=func, desc=desc, User=User, Session=Session, Token=Token, Scrobble=Scrobble, NowPlaying=NowPlaying, Artist=Artist, Album=Album, )
def tag_show(context, data_dict): '''Return the details of a tag and all its datasets. :param id: the name or id of the tag :type id: string :param vocabulary_id: the id or name of the tag vocabulary that the tag is in - if it is not specified it will assume it is a free tag. (optional) :type vocabulary_id: string :param include_datasets: include a list of the tag's datasets. (Up to a limit of 1000 - for more flexibility, use package_search - see :py:func:`package_search` for an example.) (optional, default: ``False``) :type include_datasets: bool :returns: the details of the tag, including a list of all of the tag's datasets and their details :rtype: dictionary ''' model = context['model'] id = _get_or_bust(data_dict, 'id') include_datasets = asbool(data_dict.get('include_datasets', False)) tag = model.Tag.get(id, vocab_id_or_name=data_dict.get('vocabulary_id')) context['tag'] = tag if tag is None: raise NotFound _check_access('tag_show', context, data_dict) return model_dictize.tag_dictize(tag, context, include_datasets=include_datasets)
def config_option_show(context, data_dict): '''Show the current value of a particular configuration option. Only returns runtime-editable config options (the ones returned by :py:func:`~ckan.logic.action.get.config_option_list`), which can be updated with the :py:func:`~ckan.logic.action.update.config_option_update` action. :param key: The configuration option key :type key: string :returns: The value of the config option from either the system_info table or ini file. :rtype: string :raises: :class:`ckan.logic.ValidationError`: if config option is not in the schema (whitelisted as editable). ''' _check_access('config_option_show', context, data_dict) key = _get_or_bust(data_dict, 'key') schema = ckan.logic.schema.update_configuration_schema() # Only return whitelisted keys if key not in schema: raise ValidationError( 'Configuration option \'{0}\' can not be shown'.format(key)) # return the value from config return config.get(key, None)
def config_option_list(context, data_dict): '''Return a list of runtime-editable config options keys that can be updated with :py:func:`~ckan.logic.action.update.config_option_update`. :returns: A list of config option keys. :rtype: list ''' _check_access('config_option_list', context, data_dict) schema = ckan.logic.schema.update_configuration_schema() return schema.keys()
def Any(other, arrexpr, operator=operators.eq): """A synonym for the :meth:`.ARRAY.Comparator.any` method. This method is legacy and is here for backwards-compatibility. .. seealso:: :func:`.expression.any_` """ return arrexpr.any(other, operator)
def All(other, arrexpr, operator=operators.eq): """A synonym for the :meth:`.ARRAY.Comparator.all` method. This method is legacy and is here for backwards-compatibility. .. seealso:: :func:`.expression.all_` """ return arrexpr.all(other, operator)
def any(self, other, operator=operators.eq): """Return ``other operator ANY (array)`` clause. Argument places are switched, because ANY requires array expression to be on the right hand-side. E.g.:: from sqlalchemy.sql import operators conn.execute( select([table.c.data]).where( table.c.data.any(7, operator=operators.lt) ) ) :param other: expression to be compared :param operator: an operator object from the :mod:`sqlalchemy.sql.operators` package, defaults to :func:`.operators.eq`. .. seealso:: :class:`.postgresql.Any` :meth:`.postgresql.ARRAY.Comparator.all` """ return Any(other, self.expr, operator=operator)
def all(self, other, operator=operators.eq): """Return ``other operator ALL (array)`` clause. Argument places are switched, because ALL requires array expression to be on the right hand-side. E.g.:: from sqlalchemy.sql import operators conn.execute( select([table.c.data]).where( table.c.data.all(7, operator=operators.lt) ) ) :param other: expression to be compared :param operator: an operator object from the :mod:`sqlalchemy.sql.operators` package, defaults to :func:`.operators.eq`. .. seealso:: :class:`.postgresql.All` :meth:`.postgresql.ARRAY.Comparator.any` """ return All(other, self.expr, operator=operator)
def visit_substring_func(self, func, **kw): s = self.process(func.clauses.clauses[0], **kw) start = self.process(func.clauses.clauses[1], **kw) if len(func.clauses.clauses) > 2: length = self.process(func.clauses.clauses[2], **kw) return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) else: return "SUBSTRING(%s FROM %s)" % (s, start)
def journal_read(session, func): """Read, process and delete (if successful) the oldest journal row. The row is locked on read, and remains locked until the processing succeeds or gives up. This is to ensure that (in a multithreaded or multiprocess environment) only one worker processes the update, which in turn ensures monotonicity.""" # TODO(ijw): start a transaction here # Find and lock the oldest journal entry # NB this will serialise on locking the oldest record if there are # multiple threads running (as there may be if multiple processes # are running) maybe_more=True with session.begin(): # Note also that this will, if the other thread succeeds, lock a # row that someone else deletes, so its session will abort. entry = session.query(VppEtcdJournal) \ .order_by(VppEtcdJournal.id) \ .with_for_update() \ .first() if entry: if func(entry.k, entry.v): # ... can quite reasonably fail... # Once done, it should go. session.delete(entry) else: # For some reason, we can't do the job. entry.retries = entry.retries + 1 session.update(entry) else: # The table is empty - no work available. maybe_more=False
def package_relationships_list(context, data_dict): '''Return a dataset (package)'s relationships. :param id: the id or name of the first package :type id: string :param id2: the id or name of the second package :type id: string :param rel: relationship as string see :py:func:`~ckan.logic.action.create.package_relationship_create` for the relationship types (optional) :rtype: list of dictionaries ''' ##TODO needs to work with dictization layer model = context['model'] api = context.get('api_version') id = _get_or_bust(data_dict, "id") id2 = data_dict.get("id2") rel = data_dict.get("rel") ref_package_by = 'id' if api == 2 else 'name' pkg1 = model.Package.get(id) pkg2 = None if not pkg1: raise NotFound('First package named in request was not found.') if id2: pkg2 = model.Package.get(id2) if not pkg2: raise NotFound('Second package named in address was not found.') if rel == 'relationships': rel = None _check_access('package_relationships_list', context, data_dict) # TODO: How to handle this object level authz? # Currently we don't care relationships = pkg1.get_relationships(with_package=pkg2, type=rel) if rel and not relationships: raise NotFound('Relationship "%s %s %s" not found.' % (id, rel, id2)) relationship_dicts = [ rel.as_dict(pkg1, ref_package_by=ref_package_by) for rel in relationships] return relationship_dicts