我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.func.max()。
def Get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date = datetime.now()): power_data = SQL.session.query(PowerModel, \ label('low', func.min(PowerModel.low)), label('high', func.max(PowerModel.high)), label('total', func.sum(PowerModel.average)), label('date', PowerModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ filter(PowerModel.date > from_date).\ filter(PowerModel.date < to_date).\ group_by(PowerModel.date).all() if not power_data: return False ret_json = {'icpe' : icpe} ret_json['power'] = [] for data in power_data: ret_json['power'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high, 'total' : data.total}) return ret_json
def latest(icpe): heat_data = SQL.session.query(HeatModel, \ label('low', func.min(HeatModel.low)), label('high', func.max(HeatModel.high)), label('total', func.sum(HeatModel.average)), label('date', HeatModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ order_by(HeatModel.date.desc()).\ group_by(HeatModel.date).first() if not heat_data: return False return {'icpe' : icpe, 'date' : str(heat_data.date), 'low' : power_data.low,\ 'high' : heat_data.high, 'total' : power_data.total}
def get(icpe, from_date = (datetime.now() - timedelta(days=7)), to_date = datetime.now()): heat_data = SQL.session.query(HeatModel, \ label('low', func.min(HeatModel.low)), label('high', func.max(HeatModel.high)), label('total', func.sum(HeatModel.average)), label('date', HeatModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ filter(HeatModel.date > from_date).\ filter(HeatModel.date < to_date).\ group_by(HeatModel.date).all() if not heat_data: return False ret_json = {'icpe' : icpe} ret_json['heat'] = [] for data in heat_data: ret_json['heat'].append({'date' : str(data.date), 'low' : data.low, 'high' : data.high, 'total' : data.total}) return ret_json
def latest(icpe): power_data = SQL.session.query(PowerModel, \ label('low', func.min(PowerModel.low)), label('high', func.max(PowerModel.high)), label('total', func.sum(PowerModel.average)), label('date', PowerModel.date)).\ join(iCPEModel).\ filter(iCPEModel.mac_address == icpe).\ order_by(PowerModel.date.desc()).\ group_by(PowerModel.date).first() if not power_data: return False return {'icpe' : icpe, 'date' : str(power_data.date), 'low' : power_data.low,\ 'high' : power_data.high, 'total' : power_data.total}
def count_water_stock(message): """???????????????? :param message: slackbot?????????????class """ s = Session() stock_number, latest_ctime = ( s.query(func.sum(WaterHistory.delta), func.max(case(whens=(( WaterHistory.delta != 0, WaterHistory.ctime),), else_=None))).first() ) if stock_number: # SQLite???????????????? if not isinstance(latest_ctime, datetime.datetime): latest_ctime = datetime.datetime.strptime(latest_ctime, '%Y-%m-%d %H:%M:%S') message.send('??: {}? ({:%Y?%m?%d?} ??)' .format(stock_number, latest_ctime)) else: message.send('??????????')
def get_latest_soil_humidity(): """Gets latest info about soil humdity and predict waterings. """ pub = [] records = __to_pub_list( HygroRecord.query.group_by(HygroRecord.sensor_uuid) .having(func.max(HygroRecord.timestamp)).all()) for r in records: last_watering_timestamp = analytics\ ._get_last_watering_timestamp(r['sensor_uuid']) if last_watering_timestamp: polyn = analytics._get_polynomial( r['sensor_uuid'], last_watering_timestamp) next_watering_timestamp = analytics\ ._predict_next_watering(polyn, last_watering_timestamp) r['last_watering_timestamp'] = last_watering_timestamp r['next_watering_timestamp'] = next_watering_timestamp pub.append(r) return pub
def highest_block(cls, session): """ Return integer result of MAX(block_num) db query. This does not have the same meaning as last irreversible block, ie, it makes no claim that the all blocks lower than the MAX(block_num) exist in the database. Args: session (sqlalchemy.orm.session.Session): Returns: int: """ highest = session.query(func.max(cls.block_num)).scalar() if not highest: return 0 else: return highest
def estimate_remaining_time(session, spawn_id, seen): first, last = get_first_last(session, spawn_id) if not first: return 90, 1800 if seen > last: last = seen elif seen < first: first = seen if last - first > 1710: estimates = [ time_until_time(x, seen) for x in (first + 90, last + 90, first + 1800, last + 1800)] return min(estimates), max(estimates) soonest = last + 90 latest = first + 1800 return time_until_time(soonest, seen), time_until_time(latest, seen)
def get_latest_runs(cls, session): """Returns the latest DagRun for each DAG. """ subquery = ( session .query( cls.dag_id, func.max(cls.execution_date).label('execution_date')) .group_by(cls.dag_id) .subquery() ) dagruns = ( session .query(cls) .join(subquery, and_(cls.dag_id == subquery.c.dag_id, cls.execution_date == subquery.c.execution_date)) .all() ) return dagruns
def test_00_verify_db(self, testdb): b = testdb.query(Project).get(1) assert b is not None assert b.name == 'P1' assert b.notes == 'ProjectOne' assert b.is_active is True b = testdb.query(Project).get(2) assert b is not None assert b.name == 'P2' assert b.notes == 'ProjectTwo' assert b.is_active is True b = testdb.query(Project).get(3) assert b is not None assert b.name == 'P3Inactive' assert b.notes == 'ProjectThreeInactive' assert b.is_active is False assert testdb.query(Project).with_entities( func.max(Project.id) ).scalar() == 3 assert testdb.query(BoMItem).with_entities( func.max(BoMItem.id) ).scalar() == 5
def max_t(self): """ The population number of the last populations. This is equivalent to ``n_populations - 1``. """ max_t = (self._session.query(func.max(Population.t)) .join(ABCSMC).filter(ABCSMC.id == self.id).one()[0]) return max_t
def _cleanup(self): if not os.path.exists(self.base_repos_path): logger.info("Base repo path {} does not exist yet, nothing to do.".format(self.base_repos_path)) return now = datetime.utcnow() with session_scope() as session: deletion_candidates = set(os.listdir(self.base_repos_path)) subquery = session.query(m.Environment.id, func.max(m.DeploymentView.queued_date).label("max_queued_date")).\ filter(m.Environment.id == m.DeploymentView.environment_id).\ group_by(m.Environment.id).subquery() recently_deployed_envs = session.query(m.Environment).\ join((subquery, subquery.c.id == m.Environment.id)).\ filter(subquery.c.max_queued_date > now - self.max_unused_age).\ all() to_keep = set(e.local_repo_directory_name for e in recently_deployed_envs) for path in deletion_candidates - to_keep: path = os.path.join(self.base_repos_path, path) if os.path.exists(path): with lock_repository_fetch(path), lock_repository_write(path): rmtree(path) logger.info( "Deleted unused directory {}".format(path) )
def _latest_version(cls, session, row, use_dirty=True): """ :param session: a session instance to execute a select on the log table :param row: an instance of the user table row object :return: the maximum version ID recorded for the specified row or None if version_id has not been inserted :rtype: int """ and_clause = \ utils.generate_and_clause(cls, row, cls._version_col_names, use_dirty=use_dirty) result = session.execute( sa.select([func.max(cls.va_version)]). where(and_clause) ).first() return None if result is None else result[0]
def post_add_user_sql(name, password, email): """Gets current max uid + 1, and then creates new User.""" # check if username already exists -- raise exception if it does q = DBS.query(User).filter_by(name=name) r = q.first() if r: raise ValueError('Username already exists') else: r = DBS.query(func.max(User.uid)).first()[0] uid = 1 if not r else r + 1 # set to 1 for the first user r = DBS.query(func.max(Group.gid)).first()[0] gid = 1 if not r else r + 1 DBS.add(User(gid, uid, name, hash_password(password), email)) DBS.add(Group(gid, "No Name")) transaction.commit() return uid, gid
def prob_view_get(hid, pid): result, ok, problem, homework = checkValid(hid, pid) if not ok: return result form = SubmitForm() plist = ['rank', 'user name', problem.name, 'Entries', 'last submit time'] result = db.session.query(ProbUserStatic.user_id, func.sum(ProbUserStatic.real_score), func.max(ProbUserStatic.score), func.sum(ProbUserStatic.submit_times), func.max(ProbUserStatic.last_time)). \ filter(ProbUserStatic.prob_id == problem.id, ProbUserStatic.score > 0) \ .group_by(ProbUserStatic.user_id).order_by(func.sum(ProbUserStatic.real_score)).all() prank = [] result = reversed(result) for i, res in enumerate(result): name = User.query.filter_by(id=res[0]).first().name prank.append([i + 1, name, res[2], res[3], res[4]]) if hid == -1: return render_template('prob_view.html', problem=problem, form=form, hid=-1, data=problem.data, active='problem', attach = False, plist = plist, prank = prank) attach = os.path.exists(os.path.join(app.config['UPLOAD_FOLDER'], problem.data.attach)) return render_template('prob_view.html', problem=problem, form=form, hid=hid, data=problem.data, active='homework', attach =attach, plist = plist, prank = prank)
def download_latest(): """ Download and save the latest submission files possibly overwriting existing, presumably older files :return: """ q1 = select([table.c.file_name, func.max(table.c.sent_time_epoch).label('sent_time_max_epoch')]).group_by('file_name') latest_files = engine.execute(q1).fetchall() permitted = list(permitted_file_names()) for f in latest_files: selection = [table.c.sender_name, table.c.file_name, table.c.url] q2 = select(selection).where( and_(table.c.file_name == f['file_name'], table.c.sent_time_epoch == f['sent_time_max_epoch'])) r = engine.execute(q2).fetchone() file_name = r['file_name'].lower() sender_name = r['sender_name'] if file_name not in permitted: print 'Notify %(sender_name)s that "%(file_name)s" is not a valid file name' % locals() # download either way just in case dest = os.path.join(settings.csv_dir, file_name) content = file_transfer.download(r['url']) if 'MD5 token has expired' not in content: with open(dest, 'wb') as out: out.write(content)
def dashboard(period=None): period, days = PERIODS.get(period, PERIODS['1w']) col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) return render_template( 'dashboard.html', period=period, year_min=year_from, year_max=year_to, )
def run_ids(): cols = [materials.c.run_id, func.max(materials.c.generation), func.count(materials.c.id)] rows = or_(materials.c.retest_passed == None, materials.c.retest_passed == True) sort = materials.c.run_id s = select(cols, rows).group_by(sort).order_by(asc(sort)) print('\nrun-id\t\t\t\tgenerations\tmaterial') result = engine.execute(s) for row in result: print('%s\t%s\t\t%s' % (row[0], row[1], row[2])) result.close()
def get_lastest_dht11(): return __to_pub_list( DHT11Record.query.group_by(DHT11Record.sensor_uuid) .having(func.max(DHT11Record.timestamp)).all())
def get_lastest_photocell(): return __to_pub_list( PhotocellRecord.query.group_by(PhotocellRecord.sensor_uuid) .having(func.max(PhotocellRecord.timestamp)).all())
def get_lastest_rain(): return __to_pub_list( RainRecord.query.group_by(RainRecord.sensor_uuid) .having(func.max(RainRecord.timestamp)).all())
def get_lastest_bmp(): return __to_pub_list( BMPRecord.query.group_by(BMPRecord.sensor_uuid) .having(func.max(BMPRecord.timestamp)).all())
def get_session_stats(session): query = session.query(func.min(Sighting.expire_timestamp), func.max(Sighting.expire_timestamp)) if conf.REPORT_SINCE: query = query.filter(Sighting.expire_timestamp > SINCE_TIME) min_max_result = query.one() length_hours = (min_max_result[1] - min_max_result[0]) // 3600 if length_hours == 0: length_hours = 1 # Convert to datetime return { 'start': datetime.fromtimestamp(min_max_result[0]), 'end': datetime.fromtimestamp(min_max_result[1]), 'length_hours': length_hours }
def get_first_last(session, spawn_id): return session.query(func.min(Mystery.first_seconds), func.max(Mystery.last_seconds)) \ .filter(Mystery.spawn_id == spawn_id) \ .filter(Mystery.first_seen > conf.LAST_MIGRATION) \ .first()
def get_widest_range(session, spawn_id): return session.query(func.max(Mystery.seen_range)) \ .filter(Mystery.spawn_id == spawn_id) \ .filter(Mystery.first_seen > conf.LAST_MIGRATION) \ .scalar()
def set_val_user_id(): """Set value for the next user_id after seeding database""" # Get the Max user_id in the database result = db.session.query(func.max(User.user_id)).one() max_id = int(result[0]) # Set the value for the next user_id to be max_id + 1 query = "SELECT setval('users_user_id_seq', :new_id)" db.session.execute(query, {'new_id': max_id + 1}) db.session.commit() #*****************************************************************************#
def get_max_id(cls, session): """Get the current max value of the ``id`` column. When creating and storing ORM objects in bulk, :mod:`sqlalchemy` does not automatically generate an incrementing primary key ``id``. To do this manually, one needs to know the current max ``id``. For ORM object classes that are derived from other ORM object classes, the max ``id`` of the lowest base class is returned. This is designed to be used with inheritance by joining, in which derived and base class objects have identical ``id`` values. Args: session: database session to operate in """ # sqlalchemy allows only one level of inheritance, so just check this class and all its bases id_base = None for c in [cls] + list(cls.__bases__): for base_class in c.__bases__: if base_class.__name__ == 'Base': if id_base is None: # we found our base class for determining the ID id_base = c else: raise RuntimeError("Multiple base object classes for class " + cls.__name__) # this should never happen if id_base is None: raise RuntimeError("Error searching for base class of " + cls.__name__) # get its max ID max_id = session.query(func.max(id_base.id)).scalar() # if no object is present, None is returned if max_id is None: max_id = 0 return max_id
def truncate_to_field_length(self, field, value): """Truncate the value of a string field to the field's max length. Use this in a validator to check/truncate values before inserting them into the database. Copy the below example code after ``@validates`` to your model class and replace ``field1`` and ``field2`` with your field name(s). :Example: from sqlalchemy.orm import validates # ... omitting other imports ... class MyModel(base.Base): field1 = Column(String(128)) field2 = Column(String(64)) @validates('field1', 'field2') def truncate(self, field, value): return self.truncate_to_field_length(field, value) Args: field (str): field name to validate value (str/unicode): value to validate Returns: str/unicode: value truncated to field max length """ max_len = getattr(self.__class__, field).prop.columns[0].type.length if value and len(value) > max_len: return value[:max_len] else: return value
def get_end_day(): r = db.session.query( func.max(Article.crawl_date).label('end') ).one() return r.end.date()
def get_last_aid(category): r = db.session.query( func.max(Article.id).label('max_aid') ).filter_by(category=category).one() return r.max_aid
def clear_task_instances(tis, session, activate_dag_runs=True, dag=None): """ Clears a set of task instances, but makes sure the running ones get killed. """ job_ids = [] for ti in tis: if ti.state == State.RUNNING: if ti.job_id: ti.state = State.SHUTDOWN job_ids.append(ti.job_id) else: task_id = ti.task_id if dag and dag.has_task(task_id): task = dag.get_task(task_id) task_retries = task.retries ti.max_tries = ti.try_number + task_retries else: # Ignore errors when updating max_tries if dag is None or # task not found in dag since database records could be # outdated. We make max_tries the maximum value of its # original max_tries or the current task try number. ti.max_tries = max(ti.max_tries, ti.try_number) ti.state = State.NONE session.merge(ti) if job_ids: from airflow.jobs import BaseJob as BJ for job in session.query(BJ).filter(BJ.id.in_(job_ids)).all(): job.state = State.SHUTDOWN if activate_dag_runs and tis: drs = session.query(DagRun).filter( DagRun.dag_id.in_({ti.dag_id for ti in tis}), DagRun.execution_date.in_({ti.execution_date for ti in tis}), ).all() for dr in drs: dr.state = State.RUNNING dr.start_date = timezone.utcnow()
def next_retry_datetime(self): """ Get datetime of the next retry if the task instance fails. For exponential backoff, retry_delay is used as base and will be converted to seconds. """ delay = self.task.retry_delay if self.task.retry_exponential_backoff: min_backoff = int(delay.total_seconds() * (2 ** (self.try_number - 2))) # deterministic per task instance hash = int(hashlib.sha1("{}#{}#{}#{}".format(self.dag_id, self.task_id, self.execution_date, self.try_number).encode('utf-8')).hexdigest(), 16) # between 0.5 * delay * (2^retry_number) and 1.0 * delay * (2^retry_number) modded_hash = min_backoff + hash % min_backoff # timedelta has a maximum representable value. The exponentiation # here means this value can be exceeded after a certain number # of tries (around 50 if the initial delay is 1s, even fewer if # the delay is larger). Cap the value here before creating a # timedelta object so the operation doesn't fail. delay_backoff_in_seconds = min( modded_hash, timedelta.max.total_seconds() - 1 ) delay = timedelta(seconds=delay_backoff_in_seconds) if self.task.max_retry_delay: delay = min(self.task.max_retry_delay, delay) return self.end_date + delay
def latest_execution_date(self, session=None): """ Returns the latest date for which at least one dag run exists """ execution_date = session.query(func.max(DagRun.execution_date)).filter( DagRun.dag_id == self.dag_id ).scalar() return execution_date
def add_task(self, task): """ Add a task to the DAG :param task: the task you want to add :type task: task """ if not self.start_date and not task.start_date: raise AirflowException("Task is missing the start_date parameter") # if the task has no start date, assign it the same as the DAG elif not task.start_date: task.start_date = self.start_date # otherwise, the task will start on the later of its own start date and # the DAG's start date elif self.start_date: task.start_date = max(task.start_date, self.start_date) # if the task has no end date, assign it the same as the dag if not task.end_date: task.end_date = self.end_date # otherwise, the task will end on the earlier of its own end date and # the DAG's end date elif task.end_date and self.end_date: task.end_date = min(task.end_date, self.end_date) if task.task_id in self.task_dict: # TODO: raise an error in Airflow 2.0 warnings.warn( 'The requested task could not be added to the DAG because a ' 'task with task_id {} is already in the DAG. Starting in ' 'Airflow 2.0, trying to overwrite a task will raise an ' 'exception.'.format(task.task_id), category=PendingDeprecationWarning) else: self.tasks.append(task) self.task_dict[task.task_id] = task task.dag = self self.task_count = len(self.tasks)
def single_stat_request(player, code, stat): """Actually retrieves the stat and returns the stat info in an embed""" session = Session() message = "" if code == 1: (table, col, message) = stat_dict[stat] columns = [col] res = session.query(*(getattr(table, column) for column in columns), Account.display_name).join(Account).filter(Account.display_name == player).first() #Returns a tuple containing the stat, but only the first element is defined for some reason. num = truncate_decimals(res[0]) name = res[1] elif code == 2 or code == 3: (table, col, message) = medal_dict[stat] columns = [col] res = session.query(func.sum(*(getattr(table, column) for column in columns))).join(Account).filter(Account.display_name == player).group_by(AccountMedals.id).first() num = float(res[0]) name = player if message != "Activities Entered" and message != "Total Number of Medals" and message != "Total Medal Score": message = f"Total {message} Medals" if code == 3: denominator = session.query(PvPAggregate.activitiesEntered).join(Account).filter(Account.display_name == player).first() act = denominator[0] num = num/act if message != "Activities Entered" and message != "Total Number of Medals" and message != "Total Medal Score": message = f"{message} Medals per Game" elif code == 4: (table, col, message) = character_dict[stat] columns = [col] res = session.query(func.max(*(getattr(table, column) for column in columns)), Account.display_name).join(Account).filter(Account.display_name == player).first() #Returns a tuple containing the stat, but only the first element is defined for some reason. num = truncate_decimals(res[0]) name = res[1] #em = discord.Embed(title = f"{author}{message}{result}", colour=0xADD8E6) em = f"```{message} for {name}: {num}```" return em
def test_04_verify_db(self, testdb): b = testdb.query(Project).get(1) assert b is not None assert b.name == 'P1' assert b.notes == 'ProjectOne' assert b.is_active is True b = testdb.query(Project).get(2) assert b is not None assert b.name == 'P2' assert b.notes == 'ProjectTwo' assert b.is_active is True b = testdb.query(Project).get(3) assert b is not None assert b.name == 'P3Inactive' assert b.notes == 'ProjectThreeInactive' assert b.is_active is False b = testdb.query(Project).get(4) assert b is not None assert b.name == 'NewP' assert b.notes == 'My New Project' assert b.is_active is True assert testdb.query(Project).with_entities( func.max(Project.id) ).scalar() == 4 assert testdb.query(BoMItem).with_entities( func.max(BoMItem.id) ).scalar() == 5
def test_06_verify_db(self, testdb): b = testdb.query(Project).get(1) assert b is not None assert b.name == 'P1' assert b.notes == 'ProjectOne' assert b.is_active is False b = testdb.query(Project).get(2) assert b is not None assert b.name == 'P2' assert b.notes == 'ProjectTwo' assert b.is_active is True b = testdb.query(Project).get(3) assert b is not None assert b.name == 'P3Inactive' assert b.notes == 'ProjectThreeInactive' assert b.is_active is False b = testdb.query(Project).get(4) assert b is not None assert b.name == 'NewP' assert b.notes == 'My New Project' assert b.is_active is True assert testdb.query(Project).with_entities( func.max(Project.id) ).scalar() == 4 assert testdb.query(BoMItem).with_entities( func.max(BoMItem.id) ).scalar() == 5
def test_08_verify_db(self, testdb): b = testdb.query(Project).get(1) assert b is not None assert b.name == 'P1' assert b.notes == 'ProjectOne' assert b.is_active is False b = testdb.query(Project).get(2) assert b is not None assert b.name == 'P2' assert b.notes == 'ProjectTwo' assert b.is_active is True b = testdb.query(Project).get(3) assert b is not None assert b.name == 'P3Inactive' assert b.notes == 'ProjectThreeInactive' assert b.is_active is True b = testdb.query(Project).get(4) assert b is not None assert b.name == 'NewP' assert b.notes == 'My New Project' assert b.is_active is True assert testdb.query(Project).with_entities( func.max(Project.id) ).scalar() == 4 assert testdb.query(BoMItem).with_entities( func.max(BoMItem.id) ).scalar() == 5
def test_0_verify_db(self, testdb): assert testdb.query(OFXStatement).with_entities( func.max(OFXStatement.id) ).scalar() == 9 assert testdb.query(OFXTransaction).with_entities( func.max(OFXTransaction.statement_id) ).scalar() == 9 assert len(testdb.query(OFXTransaction).all()) == 33
def set_val_user_id(): """Set value for the next user_id after seeding database""" # Get the Max user_id in the database result = db.session.query(func.max(User.user_id)).one() max_id = int(result[0]) # Set the value for the next user_id to be max_id + 1 query = "SELECT setval('users_user_id_seq', :new_id)" db.session.execute(query, {'new_id': max_id + 1}) db.session.commit()
def minmax_tasks(self): """Find tasks minimum and maximum @return: unix timestamps of minimum and maximum """ session = self.Session() try: _min = session.query(func.min(Task.started_on).label("min")).first() _max = session.query(func.max(Task.completed_on).label("max")).first() return int(_min[0].strftime("%s")), int(_max[0].strftime("%s")) except SQLAlchemyError as e: log.debug("Database error counting tasks: {0}".format(e)) return 0 finally: session.close()
def load_users(seed_file_path): """Add users to database from text file. Data file is pipe seperated: user_id | username | email | password """ with open(seed_file_path) as seed_file: for line in seed_file: user_id, username, email, password = line.rstrip().split("|") user_id = int(user_id) username = username.strip() email = email.strip() password = password.strip() user = User( user_id=user_id, username=username, email=email, password=password ) db.session.add(user) db.session.commit() # Reset seq/counter result = db.session.query(func.max(User.user_id)).one() max_id = int(result[0]) query = "ALTER SEQUENCE users_user_id_seq RESTART WITH :next_id" db.session.execute(query, {'next_id': max_id+1}) db.session.commit()
def load_galleries(seed_file_path): """Add sample galleries to database from text file. Data file is pipe seperated: gallery_id | gallery_name | curator_id """ with open(seed_file_path) as seed_file: for line in seed_file: gallery_id, gallery_name, curator_id = line.rstrip().split("|") # Currently gallery_id is discarded gallery_name = gallery_name.strip() curator_id = int(curator_id) gallery = Gallery( gallery_id=gallery_id, gallery_name=gallery_name, curator_id=curator_id, ) db.session.add(gallery) db.session.commit() result = db.session.query(func.max(Gallery.gallery_id)).one() max_id = int(result[0]) query = "ALTER SEQUENCE galleries_gallery_id_seq RESTART WITH :next_id" db.session.execute(query, {'next_id': max_id+1}) db.session.commit()
def load_walls(seed_file_path): """Add sample walls to database from text file. Data file is pipe seperated: wall_id | gallery_id | wall_width | wall_height | saved """ with open(seed_file_path) as seed_file: for line in seed_file: wall_id, gallery_id, wall_width, wall_height, saved = line.rstrip().split("|") # Currently gallery_id is discarded wall_id = int(wall_id) gallery_id = int(gallery_id) wall_width = int(wall_width) wall_height = int(wall_height) saved = saved.strip().lower() == 'saved' wall = Wall(wall_id=wall_id, gallery_id=gallery_id, wall_width=wall_width, wall_height=wall_height, saved=saved) db.session.add(wall) db.session.commit() # Reset counter result = db.session.query(func.max(Wall.wall_id)).one() max_id = int(result[0]) query = "ALTER SEQUENCE walls_wall_id_seq RESTART WITH :next_id" db.session.execute(query, {'next_id': max_id+1}) db.session.commit()
def last_update(self): return max(imap(lambda e: e.import_time, self.entries))
def last_update(self): return ( select(func.max(JournalEntry.import_time)) .where(JournalEntry.journal_id == self.id) .label("last_update") )
def latest_execution_date(self): """ Returns the latest date for which at least one task instance exists """ TI = TaskInstance session = settings.Session() execution_date = session.query(func.max(TI.execution_date)).filter( TI.dag_id == self.dag_id, TI.task_id.in_(self.task_ids) ).scalar() session.commit() session.close() return execution_date
def get_apps(request: Request, app_type: str, page: int): """ ??app - uri[app??(all/iOS/android)-app_type: str, ??(?1?)-page: int], format[??s-t: int] :param request: :return: """ time = Date.time2datetime(request.args.get('t')) if not time: raise BadRequest('') if page <= 0: log.debug('page need greater zero') raise BadRequest('') kw = request.args.get('kw') session = Session() query = session.query(AppModel, AppVersionModel.version_code, AppVersionModel.version_name, func.max(AppVersionModel.create_at).label('_update_at')) \ .join(AppVersionModel, AppModel.id == AppVersionModel.app_id) \ .filter(AppModel.create_at <= time) if app_type != 'all': # ??????? query = query.filter(AppModel.type == app_type) if kw: query = query.filter(AppModel.name.like('%{}%'.format(kw))) result = query.order_by(desc(AppModel.create_at)) \ .group_by(AppModel.short_chain_uri_) \ .offset((page - 1) * Config.apps_limit) \ .limit(Config.apps_limit) \ .all() datas = [] for app, version_code, version_name, _ in result: app.version_code = version_code app.version_name = version_name datas.append(app) return JsonResult.ok(datas).response_json()
def last_updated(self): """this method is used in the scraper to return the last updated station. this lets us pull only updated data.""" try: return max(self.station_usage, key=lambda x: x.last_update).dt_last_update except ValueError: return datetime.fromtimestamp(0)
def get_current_station_info(cls, dbsession): """as the method name suggests this returns the up to date station information.""" sub = dbsession.query(UsageData.station_id, func.max(UsageData.id).label('max_update')).group_by( UsageData.station_id).subquery() return dbsession.query( UsageData.last_update, UsageData.available_bike_stands, UsageData.available_bikes).join(sub, and_( sub.c.max_update == UsageData.id)).all()