我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用sqlalchemy.func.lower()。
def get_all_without_views(cls, formats=[]): '''Returns all resources that have no resource views :param formats: if given, returns only resources that have no resource views and are in any of the received formats :type formats: list :rtype: list of ckan.model.Resource objects ''' query = meta.Session.query(cls).outerjoin(ckan.model.ResourceView) \ .filter(ckan.model.ResourceView.id == None) if formats: lowercase_formats = [f.lower() for f in formats] query = query.filter(func.lower(cls.format).in_(lowercase_formats)) return query.all()
def search(cls, search): if search is None or len(search) == 0: return [] query = Tool.query.outerjoin(EDAMOperation, Tool.edam_operations).join(ToolVersion).join(Instance, ToolVersion.instances) nodes = parse_search_query(search) for node in nodes: if type(node) == ComparisonNode: key = node[0] value = u" ".join(node[2]).lower() if key == u"topic": query = query.filter(func.lower(EDAMOperation.label) == value) elif key == u"instance": query = query.filter(func.lower(Instance.brand) == value) else: # unknown key return [] else: term = "%{0}%".format(" ".join(node).lower()) query = query.filter(Tool.name.ilike(term) | Tool.description.ilike(term) | Tool.display_name.ilike(term)) return query.all()
def __get_stocks(self, symbol): """???????????????????? Args: symbol: ?????????????None???????????????? Returns: ??????????????? """ with start_session() as session: q = session.query(Stock) if symbol: q = q.filter(func.lower(Stock.symbol) == symbol.lower()) else: q = q.filter_by(activated=True) return q.all()
def make_parameter(self, desc): parameter = self.database.get_session() \ .query(self.Parameter) \ .filter(desc.strip().lower() == func.lower(self.Parameter.description)) \ .first() if parameter is None: parameter = self.Parameter(description=desc.strip()) return parameter; # Example for reference # form[Division] = navn # form[Parameter1] = en # form[Type1] = Enum # form[Min1] = # form[Max1] = # form[Option1_1] = a # form[Option1_2] = b # form[Option1_3] = c # form[Parameter2] = nu # form[Type2] = Number # form[Min2] = 0 # form[Max2] = 10
def update_setting(): # generic method to update settings setting_name = request.form["setting"] data = request.form["data"] if data == "true": data = True elif data == "false": data = False if setting_name.lower() not in g.settings.__dict__.keys(): return bad_request(f"setting field {setting_name} does not exist") if setting_name == 'min_age' and data is not None: age = age_to_words(int(data)) setattr(g.settings, 'min_age_word', age) setattr(g.settings, setting_name, data) db.session.add(g.settings) db.session.commit() return jsonify(status="OK"), 200
def settings(): # returns data about the current settings i = inspect(db.engine) cols = i.get_columns("Settings") ignore = [ 'id', 'questions', 'required_ids', 'response_body', 'min_age_word'] out = {} for col in cols: # don't include things that can't be modified in the settings menu if col["name"].lower() in ignore: continue col_dict = {} col_dict["value"] = getattr(g.settings, col["name"]) col_dict["type"] = type(col_dict["value"]).__name__ out[col["name"].lower()] = col_dict return Response(json.dumps(out), mimetype="application/json")
def add_exemption(): # adds an exemption of restrictions for a user username = request.form["username"] if not username: return bad_request("username not provided") user = User.query.filter(func.lower(username) == func.lower(username)).first() if user: user.is_exempt = True if len(user.response) > 0: user.response = "" db.session.add(user) db.session.commit() return jsonify(status="OK"), 200 else: return bad_request("User does not exist yet")
def post(self): slack_id = self.get_argument("slack_id", None) if not slack_id: raise tornado.web.HTTPError(400) slackidentity = SlackIdentityModel(slack_id.lower()) slackidentity.user = self.current_user session.add(slackidentity) session.commit() sec_log.info( "slackidentity {} added to {}".format( slackidentity, slackidentity.user) ) return self.redirect( "/services/add_slack_identity/success?slackidentity_id={id}".format( id=slackidentity.id) )
def get_all_statements_with_value(request, value): """ Returns all statements, where with the value :param request: request :param value: string :return: dict() """ issue_uid = issue_helper.get_issue_id(request) db_statements = get_not_disabled_statement_as_query().filter_by(issue_uid=issue_uid).all() return_array = [] slug = DBDiscussionSession.query(Issue).get(issue_uid).slug _um = UrlManager(request.application_url, for_api=False, slug=slug) for stat in db_statements: db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first() if value.lower() in db_tv.content.lower(): rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid) rd['url'] = _um.get_url_for_statement_attitude(False, db_tv.statement_uid) return_array.append(rd) return_array = __sort_array(return_array) return return_array[:list_length]
def get_strings_for_start(value, issue, is_startpoint): """ Checks different position-strings for a match with given value :param value: string :param issue: int :param is_startpoint: boolean :return: dict() """ db_statements = get_not_disabled_statement_as_query().filter(and_(Statement.is_startpoint == is_startpoint, Statement.issue_uid == issue)).all() return_array = [] for stat in db_statements: db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first() if value.lower() in db_tv.content.lower(): rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid) return_array.append(rd) return_array = __sort_array(return_array) return return_array[:list_length]
def get_strings_for_edits(value, statement_uid): """ Checks different textversion-strings for a match with given value :param value: string :param statement_uid: Statement.uid :return: dict() """ db_tvs = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=statement_uid).all() # TODO #432 return_array = [] index = 1 for textversion in db_tvs: if value.lower() in textversion.content.lower(): rd = __get_fuzzy_string_dict(current_text=value, return_text=textversion.content, uid=textversion.statement_uid) # TODO #432 return_array.append(rd) index += 1 return_array = __sort_array(return_array) return return_array[:list_length]
def get_strings_for_public_nickname(value, nickname): """ Returns dictionaries with public nicknames of users, where the nickname containts the value :param value: String :param nickname: current users nickname :return: dict() """ db_user = DBDiscussionSession.query(User).filter(func.lower(User.public_nickname).contains(func.lower(value)), ~User.public_nickname.in_([nickname, 'admin', nick_of_anonymous_user])).all() return_array = [] for index, user in enumerate(db_user): dist = get_distance(value, user.public_nickname) return_array.append({'index': index, 'distance': dist, 'text': user.public_nickname, 'avatar': get_public_profile_picture(user)}) return_array = __sort_array(return_array) return return_array[:list_length]
def json_accounts_user_search(): query = request.args['query'] return jsonify(accounts=map( lambda result: { "account_id": result[0], "user_id": result[1], "user_name": result[2] }, session.query(FinanceAccount.id, User.id, User.name).filter( FinanceAccount.id == User.finance_account_id, or_( func.lower(User.name).like(func.lower("%{0}%".format(query))), User.id.like("{0}%".format(query)) ) ).all() ))
def get_account_id(s: sqlalchemy.orm.session.Session, name: str, server: Server) -> int: """Get an account id, creating the account if needed. Note that player names are not case sensitive, so names are stored with their canonical capitalisation but we always compare the lowercase version. """ player_id = get_player_id(s, name) acc = s.query(Account.id).filter( func.lower(Account.name) == name.lower(), Account.server == server).one_or_none() if acc: return acc[0] else: acc = Account(name=name, server=server, player_id=player_id) s.add(acc) s.commit() return acc.id
def test_composed_multiple(self): table = self.tables.some_table lx = (table.c.x + table.c.y).label('lx') ly = (func.lower(table.c.q) + table.c.p).label('ly') self._assert_result( select([lx, ly]).order_by(lx, ly.desc()), [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))] )
def search_by_name(self, keyword): if len(keyword) == 0: return [] keyword = '%' + keyword.lower() + '%' return self.db.session.query(User).filter(or_(func.lower(User.name).like(keyword), func.lower(User.fullname).like(keyword))).all()
def _validate_unique_case_insensitive(self, model_field, field, value): model = model_field.class_ taken_query = model.query.filter( func.lower(model_field) == func.lower(value) ) if self.id is not None: # in case of update taken_query = taken_query.filter(model.id != self.id) if taken_query.first() is not None: self._error(field, 'has already been taken')
def get(uid=None, limit=60, offset=0, maxlines=None, colored=None, scene=None, name=None): q = db.session.query(AsciiModel) if uid and isinstance(uid, int): result = q.filter(AsciiModel.id == uid).first() if result: result.html = AsciiController.irc_to_html(result.path) return [result] if maxlines and isinstance(maxlines, int): q = q.filter(AsciiModel.numlines <= maxlines) if name and isinstance(name, str): name = name.replace("%", "") name = name.replace("_", "") q = q.filter(func.lower(AsciiModel.name).like("%"+name+"%")) if isinstance(colored, bool): q = q.filter(AsciiModel.colored == colored) if isinstance(scene, bool): q = q.filter(AsciiModel.scene == scene) if offset and isinstance(offset, int): q = q.offset(offset) if limit and isinstance(limit, int): q = q.limit(limit) results = q.all() rtn = [] for result in results: result.html = AsciiController.irc_to_html(result.path) rtn.append(result) return rtn
def build_comparision(self, field, value): if issubclass(field.type.__class__, ARRAY): return or_(*[field.any(v.lower()) for v in value.split(',')]) return or_(*[func.lower(field).like('%{}%'.format(v.lower())) for v in value.split(',')])
def __output(self, stock, indicators): """history????indicator??????????? Args: stock: ????????? indicators: ????indicator???? """ filename = '-'.join([i.__class__.__name__ for i in indicators]) output_path = os.path.join(const.OUTPUT_INDICATOR_DIR, '{}-{}.csv'.format(stock.symbol.lower(), filename)) self._prepare_directory(os.path.dirname(output_path)) headers = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume'] for i in indicators: headers.append(i.name_with_args) with open(output_path, 'w', encoding=self.encoding, newline='') as fp: w = csv.writer(fp) w.writerow(headers) for idx, hist in enumerate(stock.histories): row = [hist.date.strftime('%Y-%m-%d'), hist.open_price, hist.high_price, hist.low_price, hist.close_price, hist.volume] for i in indicators: try: val = i.get(idx) if isinstance(val, float): # numpy.float64???float???????????? # ?CSV???????????????????????? val = float(val) except NoDataError: val = '' row.append(val) w.writerow(row) plogger.info('[{}] ????????????'.format(output_path))
def prepare_queryset(query, model, key, value): return query.filter(func.lower(getattr(model, key)).contains(value[0].lower()))
def user_lookup(username): # shows a user's page is_json = False if username.endswith(".json"): username = username.split(".")[0] is_json = True user = User.query.filter_by(username=username).first() if not user: # check to see if a similar username exists user = User.query.filter(User.username.ilike(username)).first() show_warning = True if user.username.lower() == username.lower(): show_warning = False if not user: return abort(404) if is_json: return jsonify(username=user.username, response_md=user.full_body_md, response_html=user.full_body_html, submitted=user.submitted, processed=user.processed, last_login=user.last_login) return render_template( "user.html", user=user, username=username, show_warning=show_warning)
def remove_mod(): # removes a user from the list of moderators username = request.form["username"] user = User.query.filter(func.lower(User.username) == func.lower(username)).first() if not user: return bad_request("user not found") user.form_mod = False db.session.add(user) db.session.commit() return jsonify(status="OK"), 200
def get_query_param_set(params): """ Strip, lowercase, and remove empty params to be used in a query """ param_set = params.strip().lower().split(" ") param_set = [p for p in param_set if len(p) > 0] return param_set
def get_by_username(self, username): return self.dbsession.query(self.User).filter(func.lower(self.User.username) == username.lower()).first()
def get_by_email(self, email): return self.dbsession.query(self.User).filter(func.lower(self.User.email) == email.lower()).first()
def get_black_list(self) -> set: @with_session def _get_black_list(session=None): rows = session.query(BlackList).all() return {row.word.lower().strip() for row in rows} blacklist = self.env.cache.get_black_list() if blacklist is not None: return blacklist blacklist = _get_black_list() if len(blacklist) > 0: self.env.cache.set_black_list(blacklist) return blacklist
def remove_matching_word_from_blacklist(self, word: str) -> None: @with_session def _delete(_word: str, session=None) -> None: session.query(BlackList).filter(func.lower(BlackList.word) == func.lower(_word))\ .delete(synchronize_session='fetch') session.commit() if word is None or len(word.strip()) == 0: raise ValueError('empty word when deleting from word list') _delete(word) self.env.cache.reset_black_list()
def add_words_to_blacklist(self, words: list, session=None) -> None: all_words = {row.word.lower() for row in session.query(BlackList).all()} for word in words: if word is None or len(word.strip()) == 0 or word.strip().lower() in all_words: continue blacklisted = BlackList() blacklisted.word = word session.add(blacklisted) session.commit() self.env.cache.reset_black_list()
def clean_substr(col, start, length): return func.lower(func.substr(func.hex(col), start, length))
def get_all_arguments_with_text_and_url_by_statement_id(statement_uid, urlmanager, color_statement=False, is_jump=False): """ Given a statement_uid, it returns all arguments, which use this statement and adds the corresponding text to it, which normally appears in the bubbles. The resulting text depends on the provided language. :param statement_uid: Id to a statement, which should be analyzed :param color_statement: True, if the statement (specified by the ID) should be colored :return: list of dictionaries containing some properties of these arguments :rtype: list """ logger('DBAS.LIB', 'get_all_arguments_with_text_by_statement_id', 'main ' + str(statement_uid)) arguments = get_all_arguments_by_statement(statement_uid) uids = [arg.uid for arg in arguments] if arguments else None results = list() sb = '<{} data-argumentation-type="position">'.format(tag_type) if color_statement else '' se = '</{}>'.format(tag_type) if color_statement else '' if not uids: return [] uids.sort() for uid in uids: statement_text = get_text_for_statement_uid(statement_uid) attack_type = 'jump' if is_jump else '' argument_text = get_text_for_argument_uid(uid, anonymous_style=True, attack_type=attack_type) pos = argument_text.lower().find(statement_text.lower()) argument_text = argument_text[0:pos] + sb + argument_text[pos:pos + len(statement_text)] + se + argument_text[pos + len(statement_text):] results.append({'uid': uid, 'text': argument_text, 'url': urlmanager.get_url_for_jump(False, uid)}) return results
def get_user_by_case_insensitive_nickname(nickname): """ Returns user with given nickname :param nickname: String :return: User or None """ return DBDiscussionSession.query(User).filter(func.lower(User.nickname) == func.lower(nickname)).first()
def get_user_by_case_insensitive_public_nickname(public_nickname): """ Returns user with given public nickname :param public_nickname: String :return: User or None """ return DBDiscussionSession.query(User).filter(func.lower(User.public_nickname) == func.lower(public_nickname)).first()
def bubbles_already_last_in_list(bubble_list, bubbles): """ Are the given bubbles already at the end of the bubble list :param bubble_list: list of Bubbles :param bubbles: list of bubbles :return: Boolean """ if isinstance(bubbles, list): length = len(bubbles) else: length = 1 bubbles = [bubbles] if len(bubble_list) < length: return False for bubble in bubbles: if 'message' not in bubble: return False start_index = - length is_already_in = False for bubble in bubbles: last = bubble_list[start_index] if 'message' not in last or 'message' not in bubble: return False text1 = __cleanhtml(last['message'].lower()).strip() text2 = __cleanhtml(bubble['message'].lower()).strip() is_already_in = is_already_in or (text1 == text2) start_index += 1 return is_already_in
def get_strings_for_duplicates_or_reasons(value, issue, oem_value_uid=None): """ Checks different textversion-strings for a match with given value :param value: string :param issue: Issue.uid :param oem_value: integer :return: dict() """ db_statements = get_not_disabled_statement_as_query().filter_by(issue_uid=issue).all() return_array = [] for stat in db_statements: if stat.uid is oem_value_uid: continue db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first() if value.lower() in db_tv.content.lower(): # and db_tv.content.lower() != oem_value.lower(): rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid) # TODO #432 return_array.append(rd) return_array = __sort_array(return_array) # logger('fuzzy_string_matcher', 'get_strings_for_duplicates_or_reasons', # 'string: {}, issue {}, len(dict): '.format(value, issue, len(return_array)) return return_array[:list_length]
def __get_fuzzy_string_dict(index=0, current_text='', return_text='', uid=0): """ Returns dictionary with index, distance, text and statement_uid as keys :param index: int :param current_text: string :param return_text: string :param uid: int :return: dict() """ return {'index': index, 'distance': get_distance(current_text.lower(), return_text.lower()), 'text': return_text, 'statement_uid': uid}
def get_lev_distance(a, b): """ Returns the levensthein distance between to strings :param a: first string :param b: second string :return: distance between a and b """ dist = distance(a.strip().lower(), b.strip().lower()) # logger('fuzzy_string_matcher', 'get_distance', 'levensthein: ' + str(dist) + ', value: ' + a.lower() + ' in: ' + b.lower()) return str(dist).zfill(max_count_zeros)