我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用sqlalchemy.sql.func.lower()。
def _validate_unique_container_name(self, context, name): if not CONF.compute.unique_container_name_scope: return lowername = name.lower() base_query = model_query(models.Container).\ filter(func.lower(models.Container.name) == lowername) if CONF.compute.unique_container_name_scope == 'project': container_with_same_name = base_query.\ filter_by(project_id=context.project_id).count() elif CONF.compute.unique_container_name_scope == 'global': container_with_same_name = base_query.count() else: return if container_with_same_name > 0: raise exception.ContainerAlreadyExists(field='name', value=lowername)
def add_upload(fname, cover, meta, size, hash, user_email): ext = os.path.splitext(fname)[1].lower()[1:] format_id = await get_format_id(ext) user_id = await get_user_id(user_email) async with engine.acquire() as conn: upload = model.Upload.__table__ source_name = os.path.split(fname)[-1] res = await conn.execute(upload.insert().values(file=fname, cover=cover, load_source=source_name, size=size, hash=hash, format_id=format_id, version_id=1, created_by_id=user_id, modified_by_id=user_id, meta=meta )) new_id = (await res.fetchone())[0] return new_id
def find_language(lang): async with engine.acquire() as conn: language = model.Language.__table__ async def fl(where): res = await conn.execute(select([language.c.id, language.c.code, language.c.name]).where(where)) l= await res.fetchone() if l: return {'id':l[0], 'code':l[1], 'name':l[2]} l=None if lang.get('code'): l = await fl(language.c.code == lang['code']) if not l: code = await find_synonym(lang['code'], model.Synonym.LANGUAGE_CODE) if code: l = await fl(language.c.code == code) if not l and lang.get('name'): l = await fl(func.lower(language.c.name) == lang['name'].lower()) if not l: name = await find_synonym(lang['name'], model.Synonym.LANGUAGE_NAME) if name: l = await fl(language.c.name == name) return l
def find_genre(gnr): async with engine.acquire() as conn: genre = model.Genre.__table__ async def fg(name): res = await conn.execute(select([genre.c.id, genre.c.name]).where(func.lower(genre.c.name) == name.lower())) g = await res.fetchone() if g: return {'id':g[0], 'name':g[1]} ng = await fg(gnr['name']) if not ng: name = await find_synonym(gnr['name'], model.Synonym.GENRE) if name: ng = await fg(name) return ng
def _validate_unique_server_name(context, name): if not CONF.osapi_compute_unique_server_name_scope: return lowername = name.lower() base_query = model_query(context, models.Instance, read_deleted='no').\ filter(func.lower(models.Instance.hostname) == lowername) if CONF.osapi_compute_unique_server_name_scope == 'project': instance_with_same_name = base_query.\ filter_by(project_id=context.project_id).\ count() elif CONF.osapi_compute_unique_server_name_scope == 'global': instance_with_same_name = base_query.count() else: msg = _('Unknown osapi_compute_unique_server_name_scope value: %s' ' Flag must be empty, "global" or' ' "project"') % CONF.osapi_compute_unique_server_name_scope LOG.warning(msg) return if instance_with_same_name > 0: raise exception.InstanceExists(name=lowername)
def action_event_finish(self, context, values): """Finish an event on a container action.""" action = self._action_get_by_request_id(context, values['container_uuid'], values['request_id']) # When zun-compute restarts, the request_id was different with # request_id recorded in ContainerAction, so we can't get the original # recode according to request_id. Try to get the last created action # so that init_container can continue to finish the recovery action. if not action and not context.project_id: action = self._action_get_last_created_by_container_uuid( context, values['container_uuid']) if not action: raise exception.ContainerActionNotFound( request_id=values['request_id'], container_uuid=values['container_uuid']) event = model_query(models.ContainerActionEvent).\ filter_by(action_id=action['id']).\ filter_by(event=values['event']).\ first() if not event: raise exception.ContainerActionEventNotFound( action_id=action['id'], event=values['event']) event.update(values) event.save() if values['result'].lower() == 'error': action.update({'message': 'Error'}) action.save() return event
def find_series(ser): async with engine.acquire() as conn: series = model.Series.__table__ res = await conn.execute(select([series.c.id, series.c.title]).where(func.lower(series.c.title) == ser['title'].lower())) s = await res.fetchone() if s: return {'id': s[0], 'title': s[1]}
def find_synonym(name, what): async with engine.acquire() as conn: synonym = model.Synonym.__table__ res=await conn.execute(select([synonym.c.our_name]).where(and_(func.lower(synonym.c.other_name) == name.lower(), synonym.c.category == what))) s = await res.fetchone() if s: return s[0]
def create_conversion_batch(entity_name, entity_id, format, user_id): entity_name = entity_name.upper() if entity_name == 'AUTHOR': author = model.Author.__table__ q = select([case([(author.c.first_name == None, author.c.last_name)], else_ = author.c.first_name + ' ' + author.c.last_name)])\ .where(author.c.id == entity_id) elif entity_name == 'SERIES': series = model.Series.__table__ q = select([series.c.title]).where(series.c.id == entity_id) elif entity_name == 'BOOKSHELF': shelf = model.Bookshelf.__table__ q = select([shelf.c.name]).where(shelf.c.id == entity_id) else: raise ValueError('Invalid entity name') format_id = await get_format_id(format) async with engine.acquire() as conn: batch = model.ConversionBatch.__table__ res = await conn.execute(q) name = await res.scalar() name = "Books for %s %s" % (entity_name.lower(), name) res = await conn.execute(batch.insert()\ .values(name=name, for_entity=entity_name, entity_id=entity_id, format_id=format_id, created_by_id = user_id, modified_by_id = user_id, version_id =1 )\ .returning(batch.c.id)) return await res.scalar()
def action_event_finish(context, values): """Finish an event on an instance action.""" convert_objects_related_datetimes(values, 'start_time', 'finish_time') action = _action_get_by_request_id(context, values['instance_uuid'], values['request_id']) # When nova-compute restarts, the context is generated again in # init_host workflow, the request_id was different with the request_id # recorded in InstanceAction, so we can't get the original record # according to request_id. Try to get the last created action so that # init_instance can continue to finish the recovery action, like: # powering_off, unpausing, and so on. if not action and not context.project_id: action = _action_get_last_created_by_instance_uuid( context, values['instance_uuid']) if not action: raise exception.InstanceActionNotFound( request_id=values['request_id'], instance_uuid=values['instance_uuid']) event_ref = model_query(context, models.InstanceActionEvent).\ filter_by(action_id=action['id']).\ filter_by(event=values['event']).\ first() if not event_ref: raise exception.InstanceActionEventNotFound(action_id=action['id'], event=values['event']) event_ref.update(values) if values['result'].lower() == 'error': action.update({'message': 'Error'}) return event_ref
def normalize_email_case(email): # Assumes valid email. ensure domain is lower case (name, domain) = email.split('@') return name+'@'+domain.lower()
def __init__(self, word): if isinstance(word, basestring): self.word = word.lower() elif isinstance(word, CaseInsensitiveWord): self.word = word.word else: self.word = func.lower(word)