Python sqlalchemy.sql.func 模块,lower() 实例源码

我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用sqlalchemy.sql.func.lower()

项目:zun    作者:openstack    | 项目源码 | 文件源码
def _validate_unique_container_name(self, context, name):
        if not CONF.compute.unique_container_name_scope:
            return
        lowername = name.lower()
        base_query = model_query(models.Container).\
            filter(func.lower(models.Container.name) == lowername)
        if CONF.compute.unique_container_name_scope == 'project':
            container_with_same_name = base_query.\
                filter_by(project_id=context.project_id).count()
        elif CONF.compute.unique_container_name_scope == 'global':
            container_with_same_name = base_query.count()
        else:
            return

        if container_with_same_name > 0:
            raise exception.ContainerAlreadyExists(field='name',
                                                   value=lowername)
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def add_upload(fname, cover, meta, size, hash, user_email):
    ext = os.path.splitext(fname)[1].lower()[1:]
    format_id = await get_format_id(ext)
    user_id = await get_user_id(user_email)
    async with engine.acquire() as conn:
        upload = model.Upload.__table__
        source_name = os.path.split(fname)[-1]
        res = await conn.execute(upload.insert().values(file=fname, cover=cover,
                                                        load_source=source_name,
                                                        size=size, hash=hash,
                                                        format_id=format_id,
                                                        version_id=1,
                                                        created_by_id=user_id,
                                                        modified_by_id=user_id,
                                                        meta=meta
                                                        ))
        new_id = (await res.fetchone())[0]
        return new_id
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def find_language(lang):
    async with engine.acquire() as conn:
        language = model.Language.__table__
        async def fl(where):
            res = await conn.execute(select([language.c.id, language.c.code, language.c.name]).where(where))
            l= await res.fetchone()
            if l:
                return {'id':l[0], 'code':l[1], 'name':l[2]}
        l=None
        if lang.get('code'):
            l = await fl(language.c.code == lang['code'])
            if not l:
                code = await find_synonym(lang['code'], model.Synonym.LANGUAGE_CODE)
                if code:
                    l = await fl(language.c.code == code)

        if not l and lang.get('name'):
            l = await fl(func.lower(language.c.name) == lang['name'].lower())
            if not l:
                name = await find_synonym(lang['name'], model.Synonym.LANGUAGE_NAME)
                if name:
                    l = await fl(language.c.name == name)

        return l
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def find_genre(gnr):
    async with engine.acquire() as conn:
        genre = model.Genre.__table__
        async def fg(name):
            res = await conn.execute(select([genre.c.id, genre.c.name]).where(func.lower(genre.c.name) == name.lower()))
            g = await res.fetchone()
            if g:
                return {'id':g[0], 'name':g[1]}

        ng =  await fg(gnr['name'])
        if not ng:
            name = await find_synonym(gnr['name'], model.Synonym.GENRE)
            if name:
                ng = await fg(name)

        return ng
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _validate_unique_server_name(context, name):
    if not CONF.osapi_compute_unique_server_name_scope:
        return

    lowername = name.lower()
    base_query = model_query(context, models.Instance, read_deleted='no').\
            filter(func.lower(models.Instance.hostname) == lowername)

    if CONF.osapi_compute_unique_server_name_scope == 'project':
        instance_with_same_name = base_query.\
                        filter_by(project_id=context.project_id).\
                        count()

    elif CONF.osapi_compute_unique_server_name_scope == 'global':
        instance_with_same_name = base_query.count()

    else:
        msg = _('Unknown osapi_compute_unique_server_name_scope value: %s'
                ' Flag must be empty, "global" or'
                ' "project"') % CONF.osapi_compute_unique_server_name_scope
        LOG.warning(msg)
        return

    if instance_with_same_name > 0:
        raise exception.InstanceExists(name=lowername)
项目:zun    作者:openstack    | 项目源码 | 文件源码
def action_event_finish(self, context, values):
        """Finish an event on a container action."""
        action = self._action_get_by_request_id(context,
                                                values['container_uuid'],
                                                values['request_id'])

        # When zun-compute restarts, the request_id was different with
        # request_id recorded in ContainerAction, so we can't get the original
        # recode according to request_id. Try to get the last created action
        # so that init_container can continue to finish the recovery action.
        if not action and not context.project_id:
            action = self._action_get_last_created_by_container_uuid(
                context, values['container_uuid'])

        if not action:
            raise exception.ContainerActionNotFound(
                request_id=values['request_id'],
                container_uuid=values['container_uuid'])
        event = model_query(models.ContainerActionEvent).\
            filter_by(action_id=action['id']).\
            filter_by(event=values['event']).\
            first()

        if not event:
            raise exception.ContainerActionEventNotFound(
                action_id=action['id'], event=values['event'])

        event.update(values)
        event.save()

        if values['result'].lower() == 'error':
            action.update({'message': 'Error'})
            action.save()

        return event
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def find_series(ser):
    async with engine.acquire() as conn:
        series = model.Series.__table__
        res = await conn.execute(select([series.c.id, series.c.title]).where(func.lower(series.c.title) == ser['title'].lower()))
        s = await res.fetchone()
        if s:
            return {'id': s[0], 'title': s[1]}
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def find_synonym(name, what):
    async with engine.acquire() as conn:
         synonym = model.Synonym.__table__
         res=await conn.execute(select([synonym.c.our_name]).where(and_(func.lower(synonym.c.other_name) == name.lower(), 
                                                                   synonym.c.category == what)))
         s = await res.fetchone()
         if s: return s[0]
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def create_conversion_batch(entity_name, entity_id, format, user_id):
    entity_name = entity_name.upper()
    if entity_name == 'AUTHOR':
        author = model.Author.__table__
        q = select([case([(author.c.first_name == None, author.c.last_name)],
                   else_ = author.c.first_name + ' ' + author.c.last_name)])\
            .where(author.c.id == entity_id)
    elif entity_name == 'SERIES':
        series = model.Series.__table__
        q = select([series.c.title]).where(series.c.id == entity_id)
    elif entity_name == 'BOOKSHELF':
        shelf = model.Bookshelf.__table__
        q = select([shelf.c.name]).where(shelf.c.id == entity_id)
    else:
        raise ValueError('Invalid entity name')

    format_id = await get_format_id(format)

    async with engine.acquire() as conn:
        batch = model.ConversionBatch.__table__
        res = await conn.execute(q)
        name = await res.scalar()
        name = "Books for %s %s" % (entity_name.lower(), name)
        res = await conn.execute(batch.insert()\
                                 .values(name=name, for_entity=entity_name,
                                    entity_id=entity_id, format_id=format_id,
                                    created_by_id = user_id,
                                    modified_by_id = user_id, version_id =1 )\
                                 .returning(batch.c.id))

        return await res.scalar()
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def action_event_finish(context, values):
    """Finish an event on an instance action."""
    convert_objects_related_datetimes(values, 'start_time', 'finish_time')
    action = _action_get_by_request_id(context, values['instance_uuid'],
                                       values['request_id'])
    # When nova-compute restarts, the context is generated again in
    # init_host workflow, the request_id was different with the request_id
    # recorded in InstanceAction, so we can't get the original record
    # according to request_id. Try to get the last created action so that
    # init_instance can continue to finish the recovery action, like:
    # powering_off, unpausing, and so on.
    if not action and not context.project_id:
        action = _action_get_last_created_by_instance_uuid(
            context, values['instance_uuid'])

    if not action:
        raise exception.InstanceActionNotFound(
                                    request_id=values['request_id'],
                                    instance_uuid=values['instance_uuid'])

    event_ref = model_query(context, models.InstanceActionEvent).\
                            filter_by(action_id=action['id']).\
                            filter_by(event=values['event']).\
                            first()

    if not event_ref:
        raise exception.InstanceActionEventNotFound(action_id=action['id'],
                                                    event=values['event'])
    event_ref.update(values)

    if values['result'].lower() == 'error':
        action.update({'message': 'Error'})

    return event_ref
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def normalize_email_case(email):
        # Assumes valid email. ensure domain is lower case
        (name, domain) = email.split('@')
        return name+'@'+domain.lower()
项目:idealoom    作者:conversence    | 项目源码 | 文件源码
def __init__(self, word):
        if isinstance(word, basestring):
            self.word = word.lower()
        elif isinstance(word, CaseInsensitiveWord):
            self.word = word.word
        else:
            self.word = func.lower(word)