Python sqlalchemy.func 模块,max() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.func.max()

项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def Get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
        datetime.now()):
    power_data = SQL.session.query(PowerModel, \
                                  label('low', func.min(PowerModel.low)),
                                  label('high', func.max(PowerModel.high)),
                                  label('total', func.sum(PowerModel.average)),
                                  label('date', PowerModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            filter(PowerModel.date > from_date).\
            filter(PowerModel.date < to_date).\
            group_by(PowerModel.date).all()

    if not power_data:
        return False

    ret_json = {'icpe' : icpe}
    ret_json['power'] = []
    for data in power_data:
        ret_json['power'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
                                    'total' : data.total})
    return ret_json
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def latest(icpe):
    heat_data = SQL.session.query(HeatModel, \
                                  label('low', func.min(HeatModel.low)),
                                  label('high', func.max(HeatModel.high)),
                                  label('total', func.sum(HeatModel.average)),
                                  label('date', HeatModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            order_by(HeatModel.date.desc()).\
            group_by(HeatModel.date).first()

    if not heat_data:
        return False

    return {'icpe' : icpe, 'date' : str(heat_data.date), 'low' : power_data.low,\
            'high' : heat_data.high, 'total' : power_data.total}
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date =
        datetime.now()):
    heat_data = SQL.session.query(HeatModel, \
                                  label('low', func.min(HeatModel.low)),
                                  label('high', func.max(HeatModel.high)),
                                  label('total', func.sum(HeatModel.average)),
                                  label('date', HeatModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            filter(HeatModel.date > from_date).\
            filter(HeatModel.date < to_date).\
            group_by(HeatModel.date).all()

    if not heat_data:
        return False

    ret_json = {'icpe' : icpe}
    ret_json['heat'] = []
    for data in heat_data:
        ret_json['heat'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high,
                                    'total' : data.total})
    return ret_json
项目:NodeDefender    作者:CTSNE    | 项目源码 | 文件源码
def latest(icpe):
    power_data = SQL.session.query(PowerModel, \
                                  label('low', func.min(PowerModel.low)),
                                  label('high', func.max(PowerModel.high)),
                                  label('total', func.sum(PowerModel.average)),
                                  label('date', PowerModel.date)).\
            join(iCPEModel).\
            filter(iCPEModel.mac_address == icpe).\
            order_by(PowerModel.date.desc()).\
            group_by(PowerModel.date).first()

    if not power_data:
        return False

    return {'icpe' : icpe, 'date' : str(power_data.date), 'low' : power_data.low,\
            'high' : power_data.high, 'total' : power_data.total}
项目:beproudbot    作者:beproud    | 项目源码 | 文件源码
def count_water_stock(message):
    """????????????????

    :param message: slackbot?????????????class
    """
    s = Session()
    stock_number, latest_ctime = (
        s.query(func.sum(WaterHistory.delta),
                func.max(case(whens=((
                    WaterHistory.delta != 0,
                    WaterHistory.ctime),), else_=None))).first()
    )

    if stock_number:
        # SQLite????????????????
        if not isinstance(latest_ctime, datetime.datetime):
            latest_ctime = datetime.datetime.strptime(latest_ctime,
                                                      '%Y-%m-%d %H:%M:%S')
        message.send('??: {}? ({:%Y?%m?%d?} ??)'
                     .format(stock_number, latest_ctime))
    else:
        message.send('??????????')
项目:iot    作者:jdupl    | 项目源码 | 文件源码
def get_latest_soil_humidity():
    """Gets latest info about soil humdity and predict waterings.
    """

    pub = []
    records = __to_pub_list(
        HygroRecord.query.group_by(HygroRecord.sensor_uuid)
        .having(func.max(HygroRecord.timestamp)).all())

    for r in records:
        last_watering_timestamp = analytics\
            ._get_last_watering_timestamp(r['sensor_uuid'])

        if last_watering_timestamp:
            polyn = analytics._get_polynomial(
                r['sensor_uuid'], last_watering_timestamp)

            next_watering_timestamp = analytics\
                ._predict_next_watering(polyn, last_watering_timestamp)

            r['last_watering_timestamp'] = last_watering_timestamp
            r['next_watering_timestamp'] = next_watering_timestamp
        pub.append(r)

    return pub
项目:sbds    作者:steemit    | 项目源码 | 文件源码
def highest_block(cls, session):
        """
        Return integer result of MAX(block_num) db query.

        This does not have the same meaning as last irreversible block, ie, it
        makes no claim that the all blocks lower than the MAX(block_num) exist
        in the database.

        Args:
            session (sqlalchemy.orm.session.Session):

        Returns:
            int:
        """
        highest = session.query(func.max(cls.block_num)).scalar()
        if not highest:
            return 0
        else:
            return highest
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def estimate_remaining_time(session, spawn_id, seen):
    first, last = get_first_last(session, spawn_id)

    if not first:
        return 90, 1800

    if seen > last:
        last = seen
    elif seen < first:
        first = seen

    if last - first > 1710:
        estimates = [
            time_until_time(x, seen)
            for x in (first + 90, last + 90, first + 1800, last + 1800)]
        return min(estimates), max(estimates)

    soonest = last + 90
    latest = first + 1800
    return time_until_time(soonest, seen), time_until_time(latest, seen)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def get_latest_runs(cls, session):
        """Returns the latest DagRun for each DAG. """
        subquery = (
            session
            .query(
                cls.dag_id,
                func.max(cls.execution_date).label('execution_date'))
            .group_by(cls.dag_id)
            .subquery()
        )
        dagruns = (
            session
            .query(cls)
            .join(subquery,
                  and_(cls.dag_id == subquery.c.dag_id,
                       cls.execution_date == subquery.c.execution_date))
            .all()
        )
        return dagruns
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_00_verify_db(self, testdb):
        b = testdb.query(Project).get(1)
        assert b is not None
        assert b.name == 'P1'
        assert b.notes == 'ProjectOne'
        assert b.is_active is True
        b = testdb.query(Project).get(2)
        assert b is not None
        assert b.name == 'P2'
        assert b.notes == 'ProjectTwo'
        assert b.is_active is True
        b = testdb.query(Project).get(3)
        assert b is not None
        assert b.name == 'P3Inactive'
        assert b.notes == 'ProjectThreeInactive'
        assert b.is_active is False
        assert testdb.query(Project).with_entities(
            func.max(Project.id)
        ).scalar() == 3
        assert testdb.query(BoMItem).with_entities(
            func.max(BoMItem.id)
        ).scalar() == 5
项目:pyabc    作者:neuralyzer    | 项目源码 | 文件源码
def max_t(self):
        """
        The population number of the last populations.
        This is equivalent to ``n_populations - 1``.
        """
        max_t = (self._session.query(func.max(Population.t))
                 .join(ABCSMC).filter(ABCSMC.id == self.id).one()[0])
        return max_t
项目:nokia-deployer    作者:nokia    | 项目源码 | 文件源码
def _cleanup(self):
        if not os.path.exists(self.base_repos_path):
            logger.info("Base repo path {} does not exist yet, nothing to do.".format(self.base_repos_path))
            return
        now = datetime.utcnow()
        with session_scope() as session:
            deletion_candidates = set(os.listdir(self.base_repos_path))
            subquery = session.query(m.Environment.id, func.max(m.DeploymentView.queued_date).label("max_queued_date")).\
                filter(m.Environment.id == m.DeploymentView.environment_id).\
                group_by(m.Environment.id).subquery()
            recently_deployed_envs = session.query(m.Environment).\
                join((subquery, subquery.c.id == m.Environment.id)).\
                filter(subquery.c.max_queued_date > now - self.max_unused_age).\
                all()
            to_keep = set(e.local_repo_directory_name for e in recently_deployed_envs)
            for path in deletion_candidates - to_keep:
                path = os.path.join(self.base_repos_path, path)
                if os.path.exists(path):
                    with lock_repository_fetch(path), lock_repository_write(path):
                        rmtree(path)
                    logger.info(
                        "Deleted unused directory {}".format(path)
                    )
项目:versionalchemy    作者:NerdWalletOSS    | 项目源码 | 文件源码
def _latest_version(cls, session, row, use_dirty=True):
        """
        :param session: a session instance to execute a select on the log table
        :param row: an instance of the user table row object

        :return: the maximum version ID recorded for the specified row or None if version_id
        has not been inserted
        :rtype: int
        """
        and_clause = \
            utils.generate_and_clause(cls, row, cls._version_col_names, use_dirty=use_dirty)
        result = session.execute(
            sa.select([func.max(cls.va_version)]).
            where(and_clause)
        ).first()
        return None if result is None else result[0]
项目:thunderingplains    作者:lyctc    | 项目源码 | 文件源码
def post_add_user_sql(name, password, email):
    """Gets current max uid + 1, and then creates new User."""
    # check if username already exists -- raise exception if it does

    q = DBS.query(User).filter_by(name=name)
    r = q.first()

    if r:
        raise ValueError('Username already exists')
    else:
        r = DBS.query(func.max(User.uid)).first()[0]
        uid = 1 if not r else r + 1  # set to 1 for the first user
        r = DBS.query(func.max(Group.gid)).first()[0]
        gid = 1 if not r else r + 1
        DBS.add(User(gid, uid, name, hash_password(password), email))
        DBS.add(Group(gid, "No Name"))
        transaction.commit()
        return uid, gid
项目:xiaoxiang-oj    作者:hanfei19910905    | 项目源码 | 文件源码
def prob_view_get(hid, pid):
    result, ok, problem, homework = checkValid(hid, pid)
    if not ok:
        return result
    form = SubmitForm()
    plist = ['rank', 'user name', problem.name, 'Entries', 'last submit time']
    result = db.session.query(ProbUserStatic.user_id, func.sum(ProbUserStatic.real_score), func.max(ProbUserStatic.score), func.sum(ProbUserStatic.submit_times), func.max(ProbUserStatic.last_time)). \
        filter(ProbUserStatic.prob_id == problem.id, ProbUserStatic.score > 0) \
        .group_by(ProbUserStatic.user_id).order_by(func.sum(ProbUserStatic.real_score)).all()
    prank = []
    result = reversed(result)
    for i, res in enumerate(result):
        name = User.query.filter_by(id=res[0]).first().name
        prank.append([i + 1, name, res[2], res[3], res[4]])
    if hid == -1:
        return render_template('prob_view.html', problem=problem, form=form, hid=-1, data=problem.data, active='problem', attach = False, plist = plist, prank = prank)
    attach = os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], problem.data.attach))
    return render_template('prob_view.html', problem=problem, form=form, hid=hid, data=problem.data, active='homework', attach =attach, plist = plist, prank = prank)
项目:pmi_sprint_reporter    作者:cumc-dbmi    | 项目源码 | 文件源码
def download_latest():
    """
    Download and save the latest submission files possibly overwriting existing, presumably older files
    :return:
    """
    q1 = select([table.c.file_name, func.max(table.c.sent_time_epoch).label('sent_time_max_epoch')]).group_by('file_name')
    latest_files = engine.execute(q1).fetchall()
    permitted = list(permitted_file_names())
    for f in latest_files:
        selection = [table.c.sender_name, table.c.file_name, table.c.url]
        q2 = select(selection).where(
            and_(table.c.file_name == f['file_name'], table.c.sent_time_epoch == f['sent_time_max_epoch']))
        r = engine.execute(q2).fetchone()
        file_name = r['file_name'].lower()
        sender_name = r['sender_name']
        if file_name not in permitted:
            print 'Notify %(sender_name)s that "%(file_name)s" is not a valid file name' % locals()
        # download either way just in case
        dest = os.path.join(settings.csv_dir, file_name)
        content = file_transfer.download(r['url'])
        if 'MD5 token has expired' not in content:
            with open(dest, 'wb') as out:
                out.write(content)
项目:scrobbler    作者:hatarist    | 项目源码 | 文件源码
def dashboard(period=None):
    period, days = PERIODS.get(period, PERIODS['1w'])

    col_year = func.extract('year', Scrobble.played_at)
    year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first()
    year_from, year_to = int(year_from), int(year_to)

    return render_template(
        'dashboard.html',
        period=period,
        year_min=year_from,
        year_max=year_to,
    )
项目:HTSOHM-dev    作者:akaija    | 项目源码 | 文件源码
def run_ids():
    cols = [materials.c.run_id,
            func.max(materials.c.generation),
            func.count(materials.c.id)]
    rows = or_(materials.c.retest_passed == None, materials.c.retest_passed == True)
    sort = materials.c.run_id
    s = select(cols, rows).group_by(sort).order_by(asc(sort))
    print('\nrun-id\t\t\t\tgenerations\tmaterial')
    result = engine.execute(s)
    for row in result:
        print('%s\t%s\t\t%s' % (row[0], row[1], row[2]))
    result.close()
项目:iot    作者:jdupl    | 项目源码 | 文件源码
def get_lastest_dht11():
    return __to_pub_list(
        DHT11Record.query.group_by(DHT11Record.sensor_uuid)
        .having(func.max(DHT11Record.timestamp)).all())
项目:iot    作者:jdupl    | 项目源码 | 文件源码
def get_lastest_photocell():
    return __to_pub_list(
        PhotocellRecord.query.group_by(PhotocellRecord.sensor_uuid)
        .having(func.max(PhotocellRecord.timestamp)).all())
项目:iot    作者:jdupl    | 项目源码 | 文件源码
def get_lastest_rain():
    return __to_pub_list(
        RainRecord.query.group_by(RainRecord.sensor_uuid)
        .having(func.max(RainRecord.timestamp)).all())
项目:iot    作者:jdupl    | 项目源码 | 文件源码
def get_lastest_bmp():
    return __to_pub_list(
        BMPRecord.query.group_by(BMPRecord.sensor_uuid)
        .having(func.max(BMPRecord.timestamp)).all())
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def get_session_stats(session):
    query = session.query(func.min(Sighting.expire_timestamp),
        func.max(Sighting.expire_timestamp))
    if conf.REPORT_SINCE:
        query = query.filter(Sighting.expire_timestamp > SINCE_TIME)
    min_max_result = query.one()
    length_hours = (min_max_result[1] - min_max_result[0]) // 3600
    if length_hours == 0:
        length_hours = 1
    # Convert to datetime
    return {
        'start': datetime.fromtimestamp(min_max_result[0]),
        'end': datetime.fromtimestamp(min_max_result[1]),
        'length_hours': length_hours
    }
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def get_first_last(session, spawn_id):
    return session.query(func.min(Mystery.first_seconds), func.max(Mystery.last_seconds)) \
        .filter(Mystery.spawn_id == spawn_id) \
        .filter(Mystery.first_seen > conf.LAST_MIGRATION) \
        .first()
项目:Monocle    作者:Noctem    | 项目源码 | 文件源码
def get_widest_range(session, spawn_id):
    return session.query(func.max(Mystery.seen_range)) \
        .filter(Mystery.spawn_id == spawn_id) \
        .filter(Mystery.first_seen > conf.LAST_MIGRATION) \
        .scalar()
项目:game_recommendations    作者:ceorourke    | 项目源码 | 文件源码
def set_val_user_id():
    """Set value for the next user_id after seeding database"""

    # Get the Max user_id in the database
    result = db.session.query(func.max(User.user_id)).one()
    max_id = int(result[0])

    # Set the value for the next user_id to be max_id + 1
    query = "SELECT setval('users_user_id_seq', :new_id)"
    db.session.execute(query, {'new_id': max_id + 1})
    db.session.commit()


#*****************************************************************************#
项目:ozelot    作者:trycs    | 项目源码 | 文件源码
def get_max_id(cls, session):
        """Get the current max value of the ``id`` column.

        When creating and storing ORM objects in bulk, :mod:`sqlalchemy` does not automatically
        generate an incrementing primary key ``id``. To do this manually, one needs to know the
        current max ``id``. For ORM object classes that are derived from other ORM object classes,
        the max ``id`` of the lowest base class is returned. This is designed to be used with
        inheritance by joining, in which derived and base class objects have identical ``id`` values.

        Args:
            session: database session to operate in
        """
        # sqlalchemy allows only one level of inheritance, so just check this class and all its bases
        id_base = None
        for c in [cls] + list(cls.__bases__):
            for base_class in c.__bases__:
                if base_class.__name__ == 'Base':
                    if id_base is None:
                        # we found our base class for determining the ID
                        id_base = c
                    else:
                        raise RuntimeError("Multiple base object classes for class " + cls.__name__)

        # this should never happen
        if id_base is None:
            raise RuntimeError("Error searching for base class of " + cls.__name__)

        # get its max ID
        max_id = session.query(func.max(id_base.id)).scalar()

        # if no object is present, None is returned
        if max_id is None:
            max_id = 0

        return max_id
项目:ozelot    作者:trycs    | 项目源码 | 文件源码
def truncate_to_field_length(self, field, value):
        """Truncate the value of a string field to the field's max length.

        Use this in a validator to check/truncate values before inserting them into the database.
        Copy the below example code after ``@validates`` to your model class and replace ``field1`` and ``field2`` with
        your field name(s).

        :Example:

            from sqlalchemy.orm import validates
            # ... omitting other imports ...

            class MyModel(base.Base):

                field1 = Column(String(128))
                field2 = Column(String(64))

                @validates('field1', 'field2')
                def truncate(self, field, value):
                    return self.truncate_to_field_length(field, value)

        Args:
            field (str): field name to validate
            value (str/unicode): value to validate

        Returns:
            str/unicode: value truncated to field max length

        """
        max_len = getattr(self.__class__, field).prop.columns[0].type.length
        if value and len(value) > max_len:
            return value[:max_len]
        else:
            return value
项目:BlogSpider    作者:hack4code    | 项目源码 | 文件源码
def get_end_day():
    r = db.session.query(
        func.max(Article.crawl_date).label('end')
        ).one()
    return r.end.date()
项目:BlogSpider    作者:hack4code    | 项目源码 | 文件源码
def get_last_aid(category):
    r = db.session.query(
        func.max(Article.id).label('max_aid')
        ).filter_by(category=category).one()
    return r.max_aid
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def clear_task_instances(tis, session, activate_dag_runs=True, dag=None):
    """
    Clears a set of task instances, but makes sure the running ones
    get killed.
    """
    job_ids = []
    for ti in tis:
        if ti.state == State.RUNNING:
            if ti.job_id:
                ti.state = State.SHUTDOWN
                job_ids.append(ti.job_id)
        else:
            task_id = ti.task_id
            if dag and dag.has_task(task_id):
                task = dag.get_task(task_id)
                task_retries = task.retries
                ti.max_tries = ti.try_number + task_retries
            else:
                # Ignore errors when updating max_tries if dag is None or
                # task not found in dag since database records could be
                # outdated. We make max_tries the maximum value of its
                # original max_tries or the current task try number.
                ti.max_tries = max(ti.max_tries, ti.try_number)
            ti.state = State.NONE
            session.merge(ti)

    if job_ids:
        from airflow.jobs import BaseJob as BJ
        for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all():
            job.state = State.SHUTDOWN

    if activate_dag_runs and tis:
        drs = session.query(DagRun).filter(
            DagRun.dag_id.in_({ti.dag_id for ti in tis}),
            DagRun.execution_date.in_({ti.execution_date for ti in tis}),
        ).all()
        for dr in drs:
            dr.state = State.RUNNING
            dr.start_date = timezone.utcnow()
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def next_retry_datetime(self):
        """
        Get datetime of the next retry if the task instance fails. For exponential
        backoff, retry_delay is used as base and will be converted to seconds.
        """
        delay = self.task.retry_delay
        if self.task.retry_exponential_backoff:
            min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2)))
            # deterministic per task instance
            hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id,
                self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16)
            # between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number)
            modded_hash = min_backoff + hash % min_backoff
            # timedelta has a maximum representable value. The exponentiation
            # here means this value can be exceeded after a certain number
            # of tries (around 50 if the initial delay is 1s, even fewer if
            # the delay is larger). Cap the value here before creating a
            # timedelta object so the operation doesn't fail.
            delay_backoff_in_seconds = min(
                modded_hash,
                timedelta.max.total_seconds() - 1
            )
            delay = timedelta(seconds=delay_backoff_in_seconds)
            if self.task.max_retry_delay:
                delay = min(self.task.max_retry_delay, delay)
        return self.end_date + delay
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def latest_execution_date(self, session=None):
        """
        Returns the latest date for which at least one dag run exists
        """
        execution_date = session.query(func.max(DagRun.execution_date)).filter(
            DagRun.dag_id == self.dag_id
        ).scalar()
        return execution_date
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def add_task(self, task):
        """
        Add a task to the DAG

        :param task: the task you want to add
        :type task: task
        """
        if not self.start_date and not task.start_date:
            raise AirflowException("Task is missing the start_date parameter")
        # if the task has no start date, assign it the same as the DAG
        elif not task.start_date:
            task.start_date = self.start_date
        # otherwise, the task will start on the later of its own start date and
        # the DAG's start date
        elif self.start_date:
            task.start_date = max(task.start_date, self.start_date)

        # if the task has no end date, assign it the same as the dag
        if not task.end_date:
            task.end_date = self.end_date
        # otherwise, the task will end on the earlier of its own end date and
        # the DAG's end date
        elif task.end_date and self.end_date:
            task.end_date = min(task.end_date, self.end_date)

        if task.task_id in self.task_dict:
            # TODO: raise an error in Airflow 2.0
            warnings.warn(
                'The requested task could not be added to the DAG because a '
                'task with task_id {} is already in the DAG. Starting in '
                'Airflow 2.0, trying to overwrite a task will raise an '
                'exception.'.format(task.task_id),
                category=PendingDeprecationWarning)
        else:
            self.tasks.append(task)
            self.task_dict[task.task_id] = task
            task.dag = self

        self.task_count = len(self.tasks)
项目:destiny-gotg    作者:adherrling    | 项目源码 | 文件源码
def single_stat_request(player, code, stat):
    """Actually retrieves the stat and returns the stat info in an embed"""
    session = Session()
    message = ""
    if code == 1:
        (table, col, message) = stat_dict[stat]
        columns = [col]
        res = session.query(*(getattr(table, column) for column in columns), Account.display_name).join(Account).filter(Account.display_name == player).first()
        #Returns a tuple containing the stat, but only the first element is defined for some reason.
        num = truncate_decimals(res[0])
        name = res[1]
    elif code == 2 or code == 3:
        (table, col, message) = medal_dict[stat]
        columns = [col]
        res = session.query(func.sum(*(getattr(table, column) for column in columns))).join(Account).filter(Account.display_name == player).group_by(AccountMedals.id).first()
        num = float(res[0])
        name = player
        if message != "Activities Entered" and message != "Total Number of Medals" and message != "Total Medal Score":
            message = f"Total {message} Medals"
        if code == 3:
            denominator = session.query(PvPAggregate.activitiesEntered).join(Account).filter(Account.display_name == player).first()
            act = denominator[0]
            num = num/act
            if message != "Activities Entered" and message != "Total Number of Medals" and message != "Total Medal Score":
                message = f"{message} Medals per Game"
    elif code == 4:
        (table, col, message) = character_dict[stat]
        columns = [col]
        res = session.query(func.max(*(getattr(table, column) for column in columns)), Account.display_name).join(Account).filter(Account.display_name == player).first()
        #Returns a tuple containing the stat, but only the first element is defined for some reason.
        num = truncate_decimals(res[0])
        name = res[1]
    #em = discord.Embed(title = f"{author}{message}{result}", colour=0xADD8E6)
    em = f"```{message} for {name}: {num}```"
    return em
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_04_verify_db(self, testdb):
        b = testdb.query(Project).get(1)
        assert b is not None
        assert b.name == 'P1'
        assert b.notes == 'ProjectOne'
        assert b.is_active is True
        b = testdb.query(Project).get(2)
        assert b is not None
        assert b.name == 'P2'
        assert b.notes == 'ProjectTwo'
        assert b.is_active is True
        b = testdb.query(Project).get(3)
        assert b is not None
        assert b.name == 'P3Inactive'
        assert b.notes == 'ProjectThreeInactive'
        assert b.is_active is False
        b = testdb.query(Project).get(4)
        assert b is not None
        assert b.name == 'NewP'
        assert b.notes == 'My New Project'
        assert b.is_active is True
        assert testdb.query(Project).with_entities(
            func.max(Project.id)
        ).scalar() == 4
        assert testdb.query(BoMItem).with_entities(
            func.max(BoMItem.id)
        ).scalar() == 5
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_06_verify_db(self, testdb):
        b = testdb.query(Project).get(1)
        assert b is not None
        assert b.name == 'P1'
        assert b.notes == 'ProjectOne'
        assert b.is_active is False
        b = testdb.query(Project).get(2)
        assert b is not None
        assert b.name == 'P2'
        assert b.notes == 'ProjectTwo'
        assert b.is_active is True
        b = testdb.query(Project).get(3)
        assert b is not None
        assert b.name == 'P3Inactive'
        assert b.notes == 'ProjectThreeInactive'
        assert b.is_active is False
        b = testdb.query(Project).get(4)
        assert b is not None
        assert b.name == 'NewP'
        assert b.notes == 'My New Project'
        assert b.is_active is True
        assert testdb.query(Project).with_entities(
            func.max(Project.id)
        ).scalar() == 4
        assert testdb.query(BoMItem).with_entities(
            func.max(BoMItem.id)
        ).scalar() == 5
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_08_verify_db(self, testdb):
        b = testdb.query(Project).get(1)
        assert b is not None
        assert b.name == 'P1'
        assert b.notes == 'ProjectOne'
        assert b.is_active is False
        b = testdb.query(Project).get(2)
        assert b is not None
        assert b.name == 'P2'
        assert b.notes == 'ProjectTwo'
        assert b.is_active is True
        b = testdb.query(Project).get(3)
        assert b is not None
        assert b.name == 'P3Inactive'
        assert b.notes == 'ProjectThreeInactive'
        assert b.is_active is True
        b = testdb.query(Project).get(4)
        assert b is not None
        assert b.name == 'NewP'
        assert b.notes == 'My New Project'
        assert b.is_active is True
        assert testdb.query(Project).with_entities(
            func.max(Project.id)
        ).scalar() == 4
        assert testdb.query(BoMItem).with_entities(
            func.max(BoMItem.id)
        ).scalar() == 5
项目:biweeklybudget    作者:jantman    | 项目源码 | 文件源码
def test_0_verify_db(self, testdb):
        assert testdb.query(OFXStatement).with_entities(
            func.max(OFXStatement.id)
        ).scalar() == 9
        assert testdb.query(OFXTransaction).with_entities(
            func.max(OFXTransaction.statement_id)
        ).scalar() == 9
        assert len(testdb.query(OFXTransaction).all()) == 33
项目:ratings    作者:kjlundsgaard    | 项目源码 | 文件源码
def set_val_user_id():
    """Set value for the next user_id after seeding database"""

    # Get the Max user_id in the database
    result = db.session.query(func.max(User.user_id)).one()
    max_id = int(result[0])

    # Set the value for the next user_id to be max_id + 1
    query = "SELECT setval('users_user_id_seq', :new_id)"
    db.session.execute(query, {'new_id': max_id + 1})
    db.session.commit()
项目:CAPE    作者:ctxis    | 项目源码 | 文件源码
def minmax_tasks(self):
         """Find tasks minimum and maximum
         @return: unix timestamps of minimum and maximum
         """
         session = self.Session()
         try:
             _min = session.query(func.min(Task.started_on).label("min")).first()
             _max = session.query(func.max(Task.completed_on).label("max")).first()
             return int(_min[0].strftime("%s")), int(_max[0].strftime("%s"))
         except SQLAlchemyError as e:
             log.debug("Database error counting tasks: {0}".format(e))
             return 0
         finally:
             session.close()
项目:gallery-wall    作者:elsabirch    | 项目源码 | 文件源码
def load_users(seed_file_path):
    """Add users to database from text file.

    Data file is pipe seperated:
    user_id | username | email | password
    """

    with open(seed_file_path) as seed_file:
        for line in seed_file:
            user_id, username, email, password = line.rstrip().split("|")

            user_id = int(user_id)
            username = username.strip()
            email = email.strip()
            password = password.strip()

            user = User(
                user_id=user_id,
                username=username,
                email=email,
                password=password
                )

            db.session.add(user)

        db.session.commit()

    # Reset seq/counter
    result = db.session.query(func.max(User.user_id)).one()
    max_id = int(result[0])
    query = "ALTER SEQUENCE users_user_id_seq RESTART WITH :next_id"
    db.session.execute(query, {'next_id': max_id+1})
    db.session.commit()
项目:gallery-wall    作者:elsabirch    | 项目源码 | 文件源码
def load_galleries(seed_file_path):
    """Add sample galleries to database from text file.

    Data file is pipe seperated:
    gallery_id | gallery_name | curator_id
    """

    with open(seed_file_path) as seed_file:
        for line in seed_file:
            gallery_id, gallery_name, curator_id = line.rstrip().split("|")

            # Currently gallery_id is discarded
            gallery_name = gallery_name.strip()
            curator_id = int(curator_id)

            gallery = Gallery(
                gallery_id=gallery_id,
                gallery_name=gallery_name,
                curator_id=curator_id,
                )

            db.session.add(gallery)

        db.session.commit()

    result = db.session.query(func.max(Gallery.gallery_id)).one()
    max_id = int(result[0])
    query = "ALTER SEQUENCE galleries_gallery_id_seq RESTART WITH :next_id"
    db.session.execute(query, {'next_id': max_id+1})
    db.session.commit()
项目:gallery-wall    作者:elsabirch    | 项目源码 | 文件源码
def load_walls(seed_file_path):
    """Add sample walls to database from text file.

    Data file is pipe seperated:
    wall_id | gallery_id | wall_width | wall_height | saved
    """

    with open(seed_file_path) as seed_file:
        for line in seed_file:
            wall_id, gallery_id, wall_width, wall_height, saved = line.rstrip().split("|")

            # Currently gallery_id is discarded
            wall_id = int(wall_id)
            gallery_id = int(gallery_id)
            wall_width = int(wall_width)
            wall_height = int(wall_height)
            saved = saved.strip().lower() == 'saved'

            wall = Wall(wall_id=wall_id,
                        gallery_id=gallery_id,
                        wall_width=wall_width,
                        wall_height=wall_height,
                        saved=saved)

            db.session.add(wall)

        db.session.commit()

    # Reset counter
    result = db.session.query(func.max(Wall.wall_id)).one()
    max_id = int(result[0])
    query = "ALTER SEQUENCE walls_wall_id_seq RESTART WITH :next_id"
    db.session.execute(query, {'next_id': max_id+1})
    db.session.commit()
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def last_update(self):
        return max(imap(lambda e: e.import_time, self.entries))
项目:pycroft    作者:agdsn    | 项目源码 | 文件源码
def last_update(self):
        return (
            select(func.max(JournalEntry.import_time))
            .where(JournalEntry.journal_id == self.id)
            .label("last_update")
        )
项目:airflow    作者:apache-airflow    | 项目源码 | 文件源码
def latest_execution_date(self):
        """
        Returns the latest date for which at least one task instance exists
        """
        TI = TaskInstance
        session = settings.Session()
        execution_date = session.query(func.max(TI.execution_date)).filter(
            TI.dag_id == self.dag_id,
            TI.task_id.in_(self.task_ids)
        ).scalar()
        session.commit()
        session.close()
        return execution_date
项目:AppServer    作者:skytoup    | 项目源码 | 文件源码
def get_apps(request: Request, app_type: str, page: int):
    """
    ??app
    - uri[app??(all/iOS/android)-app_type: str, ??(?1?)-page: int], format[??s-t: int]
    :param request:
    :return:
    """
    time = Date.time2datetime(request.args.get('t'))
    if not time:
        raise BadRequest('')

    if page <= 0:
        log.debug('page need greater zero')
        raise BadRequest('')

    kw = request.args.get('kw')

    session = Session()
    query = session.query(AppModel, AppVersionModel.version_code, AppVersionModel.version_name,
                          func.max(AppVersionModel.create_at).label('_update_at')) \
        .join(AppVersionModel, AppModel.id == AppVersionModel.app_id) \
        .filter(AppModel.create_at <= time)
    if app_type != 'all':  # ???????
        query = query.filter(AppModel.type == app_type)

    if kw:
        query = query.filter(AppModel.name.like('%{}%'.format(kw)))

    result = query.order_by(desc(AppModel.create_at)) \
        .group_by(AppModel.short_chain_uri_) \
        .offset((page - 1) * Config.apps_limit) \
        .limit(Config.apps_limit) \
        .all()

    datas = []
    for app, version_code, version_name, _ in result:
        app.version_code = version_code
        app.version_name = version_name
        datas.append(app)

    return JsonResult.ok(datas).response_json()
项目:DublinBikeApp    作者:charlawl    | 项目源码 | 文件源码
def last_updated(self):
        """this method is used in the scraper to return the last updated station.
        this lets us pull only updated data."""
        try:
            return max(self.station_usage, key=lambda x: x.last_update).dt_last_update
        except ValueError:
            return datetime.fromtimestamp(0)
项目:DublinBikeApp    作者:charlawl    | 项目源码 | 文件源码
def get_current_station_info(cls, dbsession):
        """as the method name suggests this returns the up to date station information."""
        sub = dbsession.query(UsageData.station_id, func.max(UsageData.id).label('max_update')).group_by(
            UsageData.station_id).subquery()
        return dbsession.query(
            UsageData.last_update,
             UsageData.available_bike_stands, UsageData.available_bikes).join(sub, and_(
                sub.c.max_update == UsageData.id)).all()