Python sqlalchemy.func 模块,lower() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用sqlalchemy.func.lower()

项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def get_all_without_views(cls, formats=[]):
        '''Returns all resources that have no resource views

        :param formats: if given, returns only resources that have no resource
            views and are in any of the received formats
        :type formats: list

        :rtype: list of ckan.model.Resource objects
        '''
        query = meta.Session.query(cls).outerjoin(ckan.model.ResourceView) \
                    .filter(ckan.model.ResourceView.id == None)

        if formats:
            lowercase_formats = [f.lower() for f in formats]
            query = query.filter(func.lower(cls.format).in_(lowercase_formats))

        return query.all()
项目:galaxycat    作者:igbmc    | 项目源码 | 文件源码
def search(cls, search):
        if search is None or len(search) == 0:
            return []

        query = Tool.query.outerjoin(EDAMOperation, Tool.edam_operations).join(ToolVersion).join(Instance, ToolVersion.instances)
        nodes = parse_search_query(search)
        for node in nodes:
            if type(node) == ComparisonNode:
                key = node[0]
                value = u" ".join(node[2]).lower()
                if key == u"topic":
                    query = query.filter(func.lower(EDAMOperation.label) == value)
                elif key == u"instance":
                    query = query.filter(func.lower(Instance.brand) == value)
                else:
                    # unknown key
                    return []
            else:
                term = "%{0}%".format(" ".join(node).lower())
                query = query.filter(Tool.name.ilike(term) | Tool.description.ilike(term) | Tool.display_name.ilike(term))

        return query.all()
项目:ppytrading    作者:yusukemurayama    | 项目源码 | 文件源码
def __get_stocks(self, symbol):
        """????????????????????

        Args:
            symbol: ?????????????None????????????????

        Returns:
            ???????????????
        """
        with start_session() as session:
            q = session.query(Stock)
            if symbol:
                q = q.filter(func.lower(Stock.symbol) == symbol.lower())

            else:
                q = q.filter_by(activated=True)

            return q.all()
项目:PIG    作者:mariusaarsnes    | 项目源码 | 文件源码
def make_parameter(self, desc):
        parameter = self.database.get_session() \
            .query(self.Parameter) \
            .filter(desc.strip().lower() == func.lower(self.Parameter.description)) \
            .first()
        if parameter is None:
            parameter = self.Parameter(description=desc.strip())
        return parameter;


# Example for reference


#  form[Division] = navn
#  form[Parameter1] = en
#  form[Type1] = Enum
#  form[Min1] = 
#  form[Max1] = 
#  form[Option1_1] = a
#  form[Option1_2] = b
#  form[Option1_3] = c
#  form[Parameter2] = nu
#  form[Type2] = Number
#  form[Min2] = 0
#  form[Max2] = 10
项目:rforms    作者:Jakeable    | 项目源码 | 文件源码
def update_setting():
    # generic method to update settings
    setting_name = request.form["setting"]
    data = request.form["data"]
    if data == "true":
        data = True
    elif data == "false":
        data = False
    if setting_name.lower() not in g.settings.__dict__.keys():
        return bad_request(f"setting field {setting_name} does not exist")
    if setting_name == 'min_age' and data is not None:
        age = age_to_words(int(data))
        setattr(g.settings, 'min_age_word', age)
    setattr(g.settings, setting_name, data)
    db.session.add(g.settings)
    db.session.commit()
    return jsonify(status="OK"), 200
项目:rforms    作者:Jakeable    | 项目源码 | 文件源码
def settings():
    # returns data about the current settings
    i = inspect(db.engine)
    cols = i.get_columns("Settings")
    ignore = [
        'id',
        'questions',
        'required_ids',
        'response_body',
        'min_age_word']
    out = {}
    for col in cols:
        # don't include things that can't be modified in the settings menu
        if col["name"].lower() in ignore:
            continue

        col_dict = {}
        col_dict["value"] = getattr(g.settings, col["name"])
        col_dict["type"] = type(col_dict["value"]).__name__
        out[col["name"].lower()] = col_dict

    return Response(json.dumps(out), mimetype="application/json")
项目:rforms    作者:Jakeable    | 项目源码 | 文件源码
def add_exemption():
    # adds an exemption of restrictions for a user

    username = request.form["username"]
    if not username:
        return bad_request("username not provided")
    user = User.query.filter(func.lower(username) ==
                             func.lower(username)).first()
    if user:
        user.is_exempt = True
        if len(user.response) > 0:
            user.response = ""
        db.session.add(user)
        db.session.commit()
        return jsonify(status="OK"), 200
    else:
        return bad_request("User does not exist yet")
项目:apoptosis    作者:hrdkx    | 项目源码 | 文件源码
def post(self):
        slack_id = self.get_argument("slack_id", None)

        if not slack_id:
            raise tornado.web.HTTPError(400)

        slackidentity = SlackIdentityModel(slack_id.lower())
        slackidentity.user = self.current_user

        session.add(slackidentity)
        session.commit()

        sec_log.info(
            "slackidentity {} added to {}".format(
                slackidentity, slackidentity.user)
        )

        return self.redirect(
            "/services/add_slack_identity/success?slackidentity_id={id}".format(
                 id=slackidentity.id)
        )
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_all_statements_with_value(request, value):
    """
    Returns all statements, where with the value

    :param request: request
    :param value: string
    :return: dict()
    """
    issue_uid = issue_helper.get_issue_id(request)
    db_statements = get_not_disabled_statement_as_query().filter_by(issue_uid=issue_uid).all()
    return_array = []
    slug = DBDiscussionSession.query(Issue).get(issue_uid).slug
    _um = UrlManager(request.application_url, for_api=False, slug=slug)
    for stat in db_statements:
        db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first()
        if value.lower() in db_tv.content.lower():
            rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid)
            rd['url'] = _um.get_url_for_statement_attitude(False, db_tv.statement_uid)
            return_array.append(rd)

    return_array = __sort_array(return_array)

    return return_array[:list_length]
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_strings_for_start(value, issue, is_startpoint):
    """
    Checks different position-strings for a match with given value

    :param value: string
    :param issue: int
    :param is_startpoint: boolean
    :return: dict()
    """
    db_statements = get_not_disabled_statement_as_query().filter(and_(Statement.is_startpoint == is_startpoint,
                                                                      Statement.issue_uid == issue)).all()
    return_array = []
    for stat in db_statements:
        db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first()
        if value.lower() in db_tv.content.lower():
            rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid)
            return_array.append(rd)

    return_array = __sort_array(return_array)

    return return_array[:list_length]
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_strings_for_edits(value, statement_uid):
    """
    Checks different textversion-strings for a match with given value

    :param value: string
    :param statement_uid: Statement.uid
    :return: dict()
    """

    db_tvs = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=statement_uid).all()  # TODO #432

    return_array = []
    index = 1
    for textversion in db_tvs:
        if value.lower() in textversion.content.lower():
            rd = __get_fuzzy_string_dict(current_text=value, return_text=textversion.content, uid=textversion.statement_uid)  # TODO #432
            return_array.append(rd)
            index += 1

    return_array = __sort_array(return_array)

    return return_array[:list_length]
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_strings_for_public_nickname(value, nickname):
    """
    Returns dictionaries with public nicknames of users, where the nickname containts the value

    :param value: String
    :param nickname: current users nickname
    :return: dict()
    """
    db_user = DBDiscussionSession.query(User).filter(func.lower(User.public_nickname).contains(func.lower(value)),
                                                     ~User.public_nickname.in_([nickname, 'admin', nick_of_anonymous_user])).all()
    return_array = []

    for index, user in enumerate(db_user):
        dist = get_distance(value, user.public_nickname)
        return_array.append({'index': index,
                             'distance': dist,
                             'text': user.public_nickname,
                             'avatar': get_public_profile_picture(user)})

    return_array = __sort_array(return_array)
    return return_array[:list_length]
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def json_accounts_user_search():
    query = request.args['query']
    return jsonify(accounts=map(
        lambda result: {
            "account_id": result[0],
            "user_id": result[1],
            "user_name": result[2]
        },
        session.query(FinanceAccount.id, User.id, User.name).filter(
            FinanceAccount.id == User.finance_account_id,
            or_(
                func.lower(User.name).like(func.lower("%{0}%".format(query))),
                User.id.like("{0}%".format(query))
            )
        ).all()
    ))
项目:dcss-scoreboard    作者:zxc23    | 项目源码 | 文件源码
def get_account_id(s: sqlalchemy.orm.session.Session,
                   name: str,
                   server: Server) -> int:
    """Get an account id, creating the account if needed.

    Note that player names are not case sensitive, so names are stored with
    their canonical capitalisation but we always compare the lowercase version.
    """
    player_id = get_player_id(s, name)
    acc = s.query(Account.id).filter(
        func.lower(Account.name) == name.lower(),
        Account.server == server).one_or_none()
    if acc:
        return acc[0]
    else:
        acc = Account(name=name, server=server, player_id=player_id)
        s.add(acc)
        s.commit()
        return acc.id
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def search_by_name(self, keyword):
        if len(keyword) == 0:
            return []
        keyword = '%' + keyword.lower() + '%'
        return self.db.session.query(User).filter(or_(func.lower(User.name).like(keyword),
                                  func.lower(User.fullname).like(keyword))).all()
项目:QXSConsolas    作者:qxsch    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def _validate_unique_case_insensitive(self, model_field, field, value):
        model = model_field.class_

        taken_query = model.query.filter(
            func.lower(model_field) == func.lower(value)
        )
        if self.id is not None:  # in case of update
            taken_query = taken_query.filter(model.id != self.id)
        if taken_query.first() is not None:
            self._error(field, 'has already been taken')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:ShelbySearch    作者:Agentscreech    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:ascii-flask    作者:skftn    | 项目源码 | 文件源码
def get(uid=None, limit=60, offset=0, maxlines=None, colored=None, scene=None, name=None):
        q = db.session.query(AsciiModel)
        if uid and isinstance(uid, int):
            result = q.filter(AsciiModel.id == uid).first()
            if result:
                result.html = AsciiController.irc_to_html(result.path)
            return [result]
        if maxlines and isinstance(maxlines, int):
            q = q.filter(AsciiModel.numlines <= maxlines)
        if name and isinstance(name, str):
            name = name.replace("%", "")
            name = name.replace("_", "")
            q = q.filter(func.lower(AsciiModel.name).like("%"+name+"%"))
        if isinstance(colored, bool):
            q = q.filter(AsciiModel.colored == colored)
        if isinstance(scene, bool):
            q = q.filter(AsciiModel.scene == scene)

        if offset and isinstance(offset, int):
            q = q.offset(offset)
        if limit and isinstance(limit, int):
            q = q.limit(limit)
        results = q.all()
        rtn = []
        for result in results:
            result.html = AsciiController.irc_to_html(result.path)
            rtn.append(result)
        return rtn
项目:Price-Comparator    作者:Thejas-1    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:pyramid-restful-framework    作者:danpoland    | 项目源码 | 文件源码
def build_comparision(self, field, value):
        if issubclass(field.type.__class__, ARRAY):
            return or_(*[field.any(v.lower()) for v in value.split(',')])

        return or_(*[func.lower(field).like('%{}%'.format(v.lower())) for v in value.split(',')])
项目:ppytrading    作者:yusukemurayama    | 项目源码 | 文件源码
def __output(self, stock, indicators):
        """history????indicator???????????

        Args:
            stock: ?????????
            indicators: ????indicator????
        """
        filename = '-'.join([i.__class__.__name__ for i in indicators])
        output_path = os.path.join(const.OUTPUT_INDICATOR_DIR,
                                   '{}-{}.csv'.format(stock.symbol.lower(), filename))
        self._prepare_directory(os.path.dirname(output_path))

        headers = ['Date', 'Open', 'High', 'Low', 'Close', 'Volume']
        for i in indicators:
            headers.append(i.name_with_args)

        with open(output_path, 'w', encoding=self.encoding, newline='') as fp:
            w = csv.writer(fp)
            w.writerow(headers)
            for idx, hist in enumerate(stock.histories):
                row = [hist.date.strftime('%Y-%m-%d'),
                       hist.open_price,
                       hist.high_price,
                       hist.low_price,
                       hist.close_price,
                       hist.volume]
                for i in indicators:
                    try:
                        val = i.get(idx)
                        if isinstance(val, float):
                            # numpy.float64???float????????????
                            # ?CSV????????????????????????
                            val = float(val)
                    except NoDataError:
                        val = ''
                    row.append(val)
                w.writerow(row)

        plogger.info('[{}] ????????????'.format(output_path))
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:Callandtext    作者:iaora    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:SuperFlaskSeed    作者:saurabh1e    | 项目源码 | 文件源码
def prepare_queryset(query, model, key, value):
        return query.filter(func.lower(getattr(model, key)).contains(value[0].lower()))
项目:rforms    作者:Jakeable    | 项目源码 | 文件源码
def user_lookup(username):
    # shows a user's page
    is_json = False
    if username.endswith(".json"):
        username = username.split(".")[0]
        is_json = True

    user = User.query.filter_by(username=username).first()
    if not user:
        # check to see if a similar username exists
        user = User.query.filter(User.username.ilike(username)).first()
        show_warning = True
    if user.username.lower() == username.lower():
        show_warning = False
    if not user:
        return abort(404)
    if is_json:
        return jsonify(username=user.username,
                       response_md=user.full_body_md,
                       response_html=user.full_body_html,
                       submitted=user.submitted,
                       processed=user.processed,
                       last_login=user.last_login)
    return render_template(
        "user.html",
        user=user,
        username=username,
        show_warning=show_warning)
项目:rforms    作者:Jakeable    | 项目源码 | 文件源码
def remove_mod():
    # removes a user from the list of moderators
    username = request.form["username"]
    user = User.query.filter(func.lower(User.username)
                             == func.lower(username)).first()
    if not user:
        return bad_request("user not found")
    user.form_mod = False
    db.session.add(user)
    db.session.commit()
    return jsonify(status="OK"), 200
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:knowledge-repo    作者:airbnb    | 项目源码 | 文件源码
def get_query_param_set(params):
    """
    Strip, lowercase, and remove empty params to be used in a query
    """
    param_set = params.strip().lower().split(" ")
    param_set = [p for p in param_set if len(p) > 0]
    return param_set
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def get_by_username(self, username):
        return self.dbsession.query(self.User).filter(func.lower(self.User.username) == username.lower()).first()
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def get_by_email(self, email):
        return self.dbsession.query(self.User).filter(func.lower(self.User.email) == email.lower()).first()
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def get_black_list(self) -> set:
        @with_session
        def _get_black_list(session=None):
            rows = session.query(BlackList).all()
            return {row.word.lower().strip() for row in rows}

        blacklist = self.env.cache.get_black_list()
        if blacklist is not None:
            return blacklist

        blacklist = _get_black_list()
        if len(blacklist) > 0:
            self.env.cache.set_black_list(blacklist)
        return blacklist
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def remove_matching_word_from_blacklist(self, word: str) -> None:
        @with_session
        def _delete(_word: str, session=None) -> None:
            session.query(BlackList).filter(func.lower(BlackList.word) == func.lower(_word))\
                .delete(synchronize_session='fetch')
            session.commit()

        if word is None or len(word.strip()) == 0:
            raise ValueError('empty word when deleting from word list')
        _delete(word)
        self.env.cache.reset_black_list()
项目:dino    作者:thenetcircle    | 项目源码 | 文件源码
def add_words_to_blacklist(self, words: list, session=None) -> None:
        all_words = {row.word.lower() for row in session.query(BlackList).all()}
        for word in words:
            if word is None or len(word.strip()) == 0 or word.strip().lower() in all_words:
                continue
            blacklisted = BlackList()
            blacklisted.word = word
            session.add(blacklisted)
        session.commit()
        self.env.cache.reset_black_list()
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:gnocchi    作者:gnocchixyz    | 项目源码 | 文件源码
def clean_substr(col, start, length):
    return func.lower(func.substr(func.hex(col), start, length))
项目:QualquerMerdaAPI    作者:tiagovizoto    | 项目源码 | 文件源码
def test_composed_multiple(self):
        table = self.tables.some_table
        lx = (table.c.x + table.c.y).label('lx')
        ly = (func.lower(table.c.q) + table.c.p).label('ly')
        self._assert_result(
            select([lx, ly]).order_by(lx, ly.desc()),
            [(3, util.u('q1p3')), (5, util.u('q2p2')), (7, util.u('q3p1'))]
        )
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_all_arguments_with_text_and_url_by_statement_id(statement_uid, urlmanager, color_statement=False, is_jump=False):
    """
    Given a statement_uid, it returns all arguments, which use this statement and adds
    the corresponding text to it, which normally appears in the bubbles. The resulting
    text depends on the provided language.

    :param statement_uid: Id to a statement, which should be analyzed
    :param color_statement: True, if the statement (specified by the ID) should be colored
    :return: list of dictionaries containing some properties of these arguments
    :rtype: list
    """
    logger('DBAS.LIB', 'get_all_arguments_with_text_by_statement_id', 'main ' + str(statement_uid))
    arguments = get_all_arguments_by_statement(statement_uid)
    uids = [arg.uid for arg in arguments] if arguments else None
    results = list()
    sb = '<{} data-argumentation-type="position">'.format(tag_type) if color_statement else ''
    se = '</{}>'.format(tag_type) if color_statement else ''

    if not uids:
        return []

    uids.sort()
    for uid in uids:
        statement_text = get_text_for_statement_uid(statement_uid)
        attack_type = 'jump' if is_jump else ''
        argument_text = get_text_for_argument_uid(uid, anonymous_style=True, attack_type=attack_type)
        pos = argument_text.lower().find(statement_text.lower())
        argument_text = argument_text[0:pos] + sb + argument_text[pos:pos + len(statement_text)] + se + argument_text[pos + len(statement_text):]
        results.append({'uid': uid,
                        'text': argument_text,
                        'url': urlmanager.get_url_for_jump(False, uid)})
    return results
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_user_by_case_insensitive_nickname(nickname):
    """
    Returns user with given nickname

    :param nickname: String
    :return: User or None
    """
    return DBDiscussionSession.query(User).filter(func.lower(User.nickname) == func.lower(nickname)).first()
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_user_by_case_insensitive_public_nickname(public_nickname):
    """
    Returns user with given public nickname

    :param public_nickname: String
    :return: User or None
    """
    return DBDiscussionSession.query(User).filter(func.lower(User.public_nickname) == func.lower(public_nickname)).first()
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def bubbles_already_last_in_list(bubble_list, bubbles):
    """
    Are the given bubbles already at the end of the bubble list

    :param bubble_list: list of Bubbles
    :param bubbles:  list of bubbles
    :return: Boolean
    """
    if isinstance(bubbles, list):
        length = len(bubbles)
    else:
        length = 1
        bubbles = [bubbles]

    if len(bubble_list) < length:
        return False

    for bubble in bubbles:
        if 'message' not in bubble:
            return False

    start_index = - length
    is_already_in = False
    for bubble in bubbles:

        last = bubble_list[start_index]
        if 'message' not in last or 'message' not in bubble:
            return False

        text1 = __cleanhtml(last['message'].lower()).strip()
        text2 = __cleanhtml(bubble['message'].lower()).strip()
        is_already_in = is_already_in or (text1 == text2)
        start_index += 1

    return is_already_in
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_strings_for_duplicates_or_reasons(value, issue, oem_value_uid=None):
    """
    Checks different textversion-strings for a match with given value

    :param value: string
    :param issue: Issue.uid
    :param oem_value: integer
    :return: dict()
    """
    db_statements = get_not_disabled_statement_as_query().filter_by(issue_uid=issue).all()
    return_array = []

    for stat in db_statements:
        if stat.uid is oem_value_uid:
            continue

        db_tv = DBDiscussionSession.query(TextVersion).filter_by(statement_uid=stat.uid).order_by(TextVersion.uid.asc()).first()
        if value.lower() in db_tv.content.lower():  # and db_tv.content.lower() != oem_value.lower():
            rd = __get_fuzzy_string_dict(current_text=value, return_text=db_tv.content, uid=db_tv.statement_uid)  # TODO #432
            return_array.append(rd)

    return_array = __sort_array(return_array)

    # logger('fuzzy_string_matcher', 'get_strings_for_duplicates_or_reasons',
    # 'string: {}, issue {}, len(dict): '.format(value, issue, len(return_array))

    return return_array[:list_length]
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def __get_fuzzy_string_dict(index=0, current_text='', return_text='', uid=0):
    """
    Returns dictionary with index, distance, text and statement_uid as keys

    :param index: int
    :param current_text: string
    :param return_text: string
    :param uid: int
    :return: dict()
    """
    return {'index': index,
            'distance': get_distance(current_text.lower(), return_text.lower()),
            'text': return_text,
            'statement_uid': uid}
项目:dbas    作者:hhucn    | 项目源码 | 文件源码
def get_lev_distance(a, b):
    """
    Returns the levensthein distance between to strings

    :param a: first string
    :param b: second string
    :return: distance between a and b
    """
    dist = distance(a.strip().lower(), b.strip().lower())
    #  logger('fuzzy_string_matcher', 'get_distance', 'levensthein: ' + str(dist) + ', value: ' + a.lower() + ' in: ' + b.lower())
    return str(dist).zfill(max_count_zeros)