我们从Python开源项目中,提取了以下35个代码示例,用于说明如何使用sqlalchemy.func.extract()。
def ajax_dashboard_per_hour(): arg_year = request.args.get('year', 'all') arg_month = request.args.get('month', 'all') arg_artist = request.args.get('artist', '') count = func.count(Scrobble.id).label('count') time = Scrobble.played_at hour = func.extract('hour', time).label('hour') weekday = func.extract('isodow', time).label('weekday') year = func.extract('year', time).label('year') month = func.extract('month', time).label('month') year_filter = True if arg_year == 'all' else (year == arg_year) month_filter = True if arg_month == 'all' else (month == arg_month) artist_filter = True if arg_artist == '' else (Scrobble.artist == arg_artist) per_hour = ( db.session.query(weekday, hour, count) .filter(Scrobble.user_id == current_user.id) .filter(year_filter, month_filter, artist_filter) .group_by('weekday', 'hour').all() ) per_hour = [(d, h + 1, v) for d, h, v in per_hour] return dumps(per_hour)
def __init__(self, field, expr, **kwargs): """Return a :class:`.Extract` construct. This is typically available as :func:`.extract` as well as ``func.extract`` from the :data:`.func` namespace. """ self.type = type_api.INTEGERTYPE self.field = field self.expr = _literal_as_binds(expr, None)
def dashboard(period=None): period, days = PERIODS.get(period, PERIODS['1w']) col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) return render_template( 'dashboard.html', period=period, year_min=year_from, year_max=year_to, )
def query_links_by_date(year, month, day, session): links = session.query(models.Link).join(models.Link.post).filter( func.extract('year', models.Post.created_at) == year, func.extract('month', models.Post.created_at) == month, func.extract('day', models.Post.created_at) == day) \ .all() return links
def driver(): s = Session() links = query_links_by_date(2017, 8, 29, s) for link in links: tld = tldextract.extract(link.url) if tld.domain == 'youtube' or tld.domain == 'youtu': youtube_id = youtube_id_from_url(link.url) if youtube_id: save_youtube_id(youtube_id, link)
def top_yearly_artists(): scrobbles = func.count(Scrobble.artist).label('count') charts = {} col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) stat_count = 1000 show_count = 100 for year in range(year_from, year_to + 1): time_from = datetime.datetime(year, 1, 1) time_to = datetime.datetime(year, 12, 31, 23, 59, 59, 999999) charts[year] = ( db.session.query(Scrobble.artist, scrobbles) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist) .order_by(scrobbles.desc()) .limit(stat_count) .all() ) position_changes = {} for year in range(year_from + 1, year_to + 1): chart = {artist: position for position, (artist, scrobbles) in enumerate(charts[year], 1)} prev_chart = { artist: position for position, (artist, scrobbles) in enumerate(charts[year - 1], 1) } prev_charts = (chart for chart_year, chart in charts.items() if chart_year < year) prev_artists = {artist for chart in prev_charts for (artist, scrobbles) in chart} if year not in position_changes: position_changes[year] = {} for artist, data in chart.items(): if artist in prev_chart: position_changes[year][artist] = prev_chart[artist] - chart[artist] elif artist not in prev_artists: position_changes[year][artist] = 'new' charts = sorted(charts.items()) return render_template( 'charts/top_yearly_artists.html', charts=charts, position_changes=position_changes, show_count=show_count )
def unique_monthly(): stats = {} col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) for year in range(year_from, year_to + 1): for month in range(1, 13): time_from = datetime.datetime(year, month, 1) time_to = time_from + datetime.timedelta(days=calendar.monthrange(year, month)[1]) scrobbles = ( db.session.query(Scrobble) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .count() ) unique_artists = ( db.session.query(Scrobble.artist) .filter( Scrobble.user_id == current_user.id, Scrobble.played_at >= time_from, Scrobble.played_at <= time_to ) .group_by(Scrobble.artist) .count() ) unique_tracks = ( db.session.query(Scrobble.artist, Scrobble.track) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist, Scrobble.track) .count() ) key = '{:d}-{:02d}'.format(year, month) stats[key] = (scrobbles, unique_artists, unique_tracks) stats = sorted(stats.items()) return render_template( 'stats/unique.html', stats=stats )
def unique_yearly(): stats = {} col_year = func.extract('year', Scrobble.played_at) year_from, year_to = db.session.query(func.min(col_year), func.max(col_year)).first() year_from, year_to = int(year_from), int(year_to) for year in range(year_from, year_to + 1): time_from = datetime.datetime(year, 1, 1) time_to = datetime.datetime(year, 12, 31, 23, 59, 59, 999999) # select extract(year from played_at) as year, count(id) from scrobbles group by year; scrobbles = ( db.session.query(Scrobble) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .count() ) # select extract(year from played_at) as year, sum(1) from scrobbles group by year, artist; unique_artists = ( db.session.query(Scrobble.artist) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist) .count() ) unique_tracks = ( db.session.query(Scrobble.artist, Scrobble.track) .filter(Scrobble.user_id == current_user.id) .filter(Scrobble.played_at >= time_from, Scrobble.played_at <= time_to) .group_by(Scrobble.artist, Scrobble.track) .count() ) stats[year] = (scrobbles, unique_artists, unique_tracks) stats = sorted(stats.items()) return render_template( 'stats/unique.html', stats=stats )
def show_kintai_history_csv(message, time=None): """???????????CSV????? :param message: slackbot?????????????class :param str time: `/` ??????(?: 2016/1) """ user_id = message.body['user'] if time: year_str, month_str = time.split('/') else: now = datetime.datetime.now() year_str, month_str = now.strftime('%Y'), now.strftime('%m') year, month = int(year_str), int(month_str) if not 1 <= month <= 12: message.send('??????????????') return s = Session() qs = (s.query(KintaiHistory) .filter(KintaiHistory.user_id == user_id) .filter(func.extract('year', KintaiHistory.registered_at) == year) .filter(func.extract('month', KintaiHistory.registered_at) == month)) kintai = defaultdict(list) for q in qs: registered_at = q.registered_at.strftime('%Y-%m-%d') kintai[registered_at].append((q.is_workon, '{:%I:%M:%S}'.format(q.registered_at))) rows = [] for day in range(1, monthrange(year, month)[1] + 1): aligin_date = '{}-{:02d}-{:02d}'.format(year, month, day) workon, workoff = '', '' for d in sorted(kintai[aligin_date]): if d[0]: workon = d[1] else: workoff = d[1] rows.append([aligin_date, workon, workoff]) output = StringIO() w = csv.writer(output) w.writerows(rows) param = { 'token': settings.API_TOKEN, 'channels': message.body['channel'], 'title': '????' } requests.post(settings.FILE_UPLOAD_URL, params=param, files={'file': output.getvalue()})
def generate_daily_reports(date): # Need to pass app context around because of how flask works # can take a single argument date as follows # flask generate_daily_reports --date 2017/01/31 will compute the billings for jan 2017, up to the 31st day of # January try: timeend = datetime.strptime(date, '%Y/%m/%d').replace(tzinfo=pytz.UTC) except: timeend = datetime.utcnow().replace(tzinfo=pytz.UTC).replace(minute=0, second=0, hour=0, microsecond=0) # HANDLE CLOSING OUT BILLINGS at end of month if timeend.day == 1: projects = get_projects_list() for project in projects: bill = Billing.query.filter(Billing.end_date.month == (timeend.month-1) % 12) \ .filter(Billing.closed_out is False).filter(Billing.project == project).first() if bill: bill.update(end_date=timeend, closed_out=True) monthstart = timeend.replace(day=1) projects = get_projects_list() seconds_into_month = (timeend-monthstart).total_seconds() daysinmonth = calendar.monthrange(timeend.year, timeend.month)[1] portion_of_month = Decimal(seconds_into_month)/Decimal(daysinmonth*3600*24) for project in projects: print(project) file_size = get_previous_file_sizes(monthstart, project=project) this_months_files = get_months_uploads(project, monthstart, timeend) compute_cost_search = make_search_filter_query(monthstart,timeend,project) compute_costs = get_compute_costs(compute_cost_search) analysis_compute_json = create_analysis_costs_json(compute_cost_search['hits']['hits'], monthstart, timeend) all_proj_files = get_previous_file_sizes(timeend, project)['hits']['hits'] analysis_storage_json = create_storage_costs_json(all_proj_files, monthstart, timeend, daysinmonth*3600*24) storage_costs = get_storage_costs( file_size, portion_of_month, this_months_files, timeend, daysinmonth*3600*24) bill = Billing.query().filter(Billing.project == project).filter(func.extract('month', Billing.start_date) == monthstart.month).first() itemized_costs = { "itemized_compute_costs": analysis_compute_json, "itemized_storage_costs": analysis_storage_json } try: if bill: bill.update(compute_cost=compute_costs, storage_cost=storage_costs, end_date=timeend, cost_by_analysis=itemized_costs) else: Billing.create(compute_cost=compute_costs, storage_cost=storage_costs, start_date=monthstart, \ end_date=timeend, project=project, closed_out=False, cost_by_analysis=itemized_costs) except: print("IT'S GONE FAR SOUTH")
def generate_daily_reports(date): # Need to pass app context around because of how flask works # can take a single argument date as follows # flask generate_daily_reports --date 2017/01/31 will compute the billings for jan 2017, up to the 31st day of # January try: timeend = datetime.strptime(date, '%Y/%m/%d').replace(tzinfo=pytz.UTC).replace(minute=0, second=0, hour=0, microsecond=0) except: timeend = datetime.utcnow().replace(tzinfo=pytz.UTC).replace(minute=0, second=0, hour=0, microsecond=0) # HANDLE CLOSING OUT BILLINGS at end of month if timeend.day == 1: projects = get_projects_list() for project in projects: bill = Billing.query().filter(func.extract('month', Billing.end_date) == getLastMonth(timeend.month)) \ .filter(func.extract('year', Billing.end_date) == timeend.year).filter(Billing.closed_out is False) \ .filter(Billing.project == project).first() if bill: bill.update(end_date=timeend, closed_out=True) monthstart = timeend.replace(day=1) projects = get_projects_list() seconds_into_month = (timeend-monthstart).total_seconds() daysinmonth = calendar.monthrange(timeend.year, timeend.month)[1] portion_of_month = Decimal(seconds_into_month)/Decimal(daysinmonth*3600*24) for project in projects: print(project) file_size = get_previous_file_sizes(monthstart, project=project) this_months_files = get_months_uploads(project, monthstart, timeend) compute_cost_search = make_search_filter_query(monthstart,timeend,project) compute_costs = get_compute_costs(compute_cost_search) analysis_compute_json = create_analysis_costs_json(compute_cost_search['hits']['hits'], monthstart, timeend) all_proj_files = get_previous_file_sizes(timeend, project)['hits']['hits'] analysis_storage_json = create_storage_costs_json(all_proj_files, monthstart, timeend, daysinmonth*3600*24) storage_costs = get_storage_costs( file_size, portion_of_month, this_months_files, timeend, daysinmonth*3600*24) bill = Billing.query().filter(Billing.project == project).filter(func.extract('month', Billing.start_date) == monthstart.month) \ .filter(func.extract('year', Billing.start_date) == monthstart.year).first() itemized_costs = { "itemized_compute_costs": analysis_compute_json, "itemized_storage_costs": analysis_storage_json } try: if bill: bill.update(compute_cost=compute_costs, storage_cost=storage_costs, end_date=timeend, cost_by_analysis=itemized_costs) else: Billing.create(compute_cost=compute_costs, storage_cost=storage_costs, start_date=monthstart, \ end_date=timeend, project=project, closed_out=False, cost_by_analysis=itemized_costs) except: print("IT'S GONE FAR SOUTH")