我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.Date()。
def register_worktime(user_id, is_workon=True): """????????DB????? :param str user_id: Slack?user_id :param bool is_workon: ???????????defalt??? """ today = datetime.date.today() s = Session() # SQLite??????????cast??????MySQL?debug??? record = (s.query(KintaiHistory) .filter(cast(KintaiHistory.registered_at, Date) == today) .filter(KintaiHistory.user_id == user_id) .filter(KintaiHistory.is_workon.is_(is_workon)) .one_or_none()) if record: record.registered_at = datetime.datetime.now() else: s.add(KintaiHistory(user_id=user_id, is_workon=is_workon)) s.commit()
def ml_regression_build_prediction_test_window(self, req, num_units, rds, dbs): import pandas as pd ml_type = req["MLType"] target_column_name = req["TargetColumnName"] # What column is getting processed? target_column_values = req["TargetColumnValues"] # Possible values each int in the target_column_name maps to train_feature_names = req["TrainFeatures"] # Pass in the features to train source_df = req["SourceDF"] sample_filter_mask = (source_df["DSName"] != "") new_df = source_df.iloc[-1 * int(num_units):] if "Date" in str(source_df.columns): new_df["Date"] = pd.to_datetime(new_df["Date"], format='%Y-%m-%d') # assuming the Date column is present if "FDate" in str(source_df.columns): new_df["FDate"] = pd.to_datetime(new_df["FDate"], format='%Y-%m-%d') # assuming the Future Date column is present last_row = new_df.iloc[-1] return new_df # end of ml_regression_build_prediction_test_window
def ml_return_prediction_dates(self, simulated_date, ahead_set, ahead_type, rds, dbs, debug=False): import pandas as pd from pandas.tseries.offsets import BDay prediction_dates = {} for n in ahead_set: if ahead_type == "Days": new_prediction_date = simulated_date + BDay(int(n)) new_node = { "Date" : new_prediction_date.date(), "Timestamp" : new_prediction_date, "Ahead" : int(n) } prediction_dates[str(n)] = new_prediction_date # just get a simple dict of prediction dates # end of all units in ahead_set return prediction_dates # end of ml_return_prediction_dates
def get_url_stage(): _, metadata = setup_db() table = Table( 'url_stage', metadata, Column('id', Integer, primary_key=True), Column('ocd_division_id', String), Column('event', String), Column('event_date', Date), Column('url', String), Column('url_hash', String), Column('category', String), Column('created_at', DateTime, default=datetime.datetime.now) ) return table
def get_url_stage_hist(): _, metadata = setup_db() table = Table( 'url_stage_hist', metadata, Column('id', Integer, primary_key=True), Column('event', String), Column('event_date', Date), Column('url', String), Column('url_hash', String), Column('category', String), Column('created_at', DateTime, default=datetime.datetime.now) ) return table
def get_event(): _, metadata = setup_db() place = get_place() table = Table( 'event', metadata, Column('id', Integer, primary_key=True), Column('place_id', Integer, ForeignKey(place.c.id), nullable=False, index=True), Column('name', String), Column('scraped_datetime', DateTime, default=datetime.datetime.now), Column('record_date', Date), Column('source', String), Column('source_url', String), Column('meeting_type', String) ) return table
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.create_table('event', sa.Column('id', sa.Integer(), nullable=False), sa.Column('name', sa.String(length=120), nullable=True), sa.Column('description', sa.String(length=120), nullable=True), sa.Column('date', sa.Date(), nullable=True), sa.Column('location', sa.String(length=120), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('enrolments', sa.Column('event_id', sa.Integer(), nullable=False), sa.Column('person_id', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['event_id'], ['event.id'], ), sa.ForeignKeyConstraint(['person_id'], ['person.id'], ), sa.PrimaryKeyConstraint('event_id', 'person_id') ) # ### end Alembic commands ###
def table(): meta = sa.MetaData() post = sa.Table( 'post', meta, sa.Column('id', sa.Integer, nullable=False), sa.Column('title', sa.String(200), nullable=False), sa.Column('body', sa.Text, nullable=False), sa.Column('views', sa.Integer, nullable=False), sa.Column('average_note', sa.Float, nullable=False), sa.Column('pictures', postgresql.JSON, server_default='{}'), sa.Column('published_at', sa.Date, nullable=False), sa.Column('tags', postgresql.ARRAY(sa.Integer), server_default='[]'), # Indexes # sa.PrimaryKeyConstraint('id', name='post_id_pkey')) return post
def get_user(user_id): secret = request.headers.get('Authorization') user = db_session.query(api.models.User).filter(api.models.User.id == user_id). \ filter(api.models.User.secret == secret).one_or_none() # get koin_count, mission_count_today and mission_count mission_count = db_session.query(api.models.Solution).filter(api.models.Solution.user_id == user_id)\ .filter(api.models.Solution.valid).count() mission_count_today = db_session.query(api.models.Solution).filter(api.models.Solution.user_id == user_id)\ .filter(api.models.Solution.valid)\ .filter(cast(api.models.Solution.create_date, Date) == date.today()).count() koin_count = db_session.query(func.sum(api.models.Solution.koin_count)).filter(api.models.Solution.user_id == user_id).scalar() logger.debug('user '+str(user_id)+' logged in') if user: return user.dump(mission_count=mission_count, mission_count_today=mission_count_today, koin_count=koin_count) return 'Unauthorized', 401
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('book', sa.Column('id', sa.Integer(), nullable=False), sa.Column('title', sa.String(length=255), nullable=False), sa.Column('posted_on', sa.Date(), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_table('users', sa.Column('id', sa.Integer(), nullable=False), sa.Column('username', sa.String(length=64), nullable=True), sa.Column('password_hash', sa.String(length=128), nullable=True), sa.Column('email', sa.String(length=64), nullable=True), sa.PrimaryKeyConstraint('id') ) op.create_index(op.f('ix_users_email'), 'users', ['email'], unique=True) op.create_index(op.f('ix_users_username'), 'users', ['username'], unique=True) ### end Alembic commands ###
def is_auto_assigned_date_column(column): """ Returns whether or not given SQLAlchemy Column object's is auto assigned DateTime or Date. :param column: SQLAlchemy Column object """ return ( ( isinstance(column.type, sa.DateTime) or isinstance(column.type, sa.Date) ) and ( column.default or column.server_default or column.onupdate or column.server_onupdate ) )
def upgrade(): op.create_table('fda_dap', # Meta sa.Column('meta_id', sa.Text, unique=True), sa.Column('meta_source', sa.Text), sa.Column('meta_created', sa.DateTime(timezone=True)), sa.Column('meta_updated', sa.DateTime(timezone=True)), # General sa.Column('id', sa.Text, unique=True), sa.Column('documents', JSONB), sa.Column('approval_type', sa.Text), sa.Column('supplement_number', sa.Integer), sa.Column('action_date', sa.Date), sa.Column('fda_application_num', sa.Text), sa.Column('notes', sa.Text), )
def upgrade(): op.create_table('icdcm', # Meta sa.Column('meta_id', sa.Text, unique=True), sa.Column('meta_source', sa.Text), sa.Column('meta_created', sa.DateTime(timezone=True)), sa.Column('meta_updated', sa.DateTime(timezone=True)), # General sa.Column('name', sa.Text, primary_key=True), sa.Column('desc', sa.Text), sa.Column('terms', ARRAY(sa.Text)), sa.Column('version', sa.Text), sa.Column('last_updated', sa.Date), )
def upgrade(): op.create_table('icdpcs', # Meta sa.Column('meta_id', sa.Text, unique=True), sa.Column('meta_source', sa.Text), sa.Column('meta_created', sa.DateTime(timezone=True)), sa.Column('meta_updated', sa.DateTime(timezone=True)), # General sa.Column('code', sa.Text, primary_key=True), sa.Column('is_header', sa.Boolean), sa.Column('short_description', sa.Text), sa.Column('long_description', sa.Text), sa.Column('version', sa.Text), sa.Column('last_updated', sa.Date), )
def convert_date_string_to_date(self, date_str, optional_format="%Y-%m-%dT%H:%M:%S.%fZ"): date_to_return = None try: import datetime date_to_return = datetime.datetime.strptime(str(date_str), optional_format) except Exception,f: self.lg("ERROR: Failed Converting Date(" + str(date_str) + ") with Format(" + str(optional_format) + ")", 0) # end of tries to read this string as a valid date... return date_to_return # end of convert_date_string_to_date
def pd_convert_df_dates_to_list(self, df_list, date_format_str="%Y-%m-%d"): date_series = df_list["Date"].apply(lambda x: x.strftime(date_format_str)) return date_series.tolist() # end of pd_convert_df_dates_to_list
def pd_json_to_df(self, data_json, sorted_by_key="Date", in_ascending=True): import pandas as pd new_df = pd.read_json(data_json).sort_values(by=sorted_by_key, ascending=in_ascending) return new_df # end of pd_json_to_df
def ml_regression_build_prediction_results_df(self, pred_df, source_df, merge_on_column_name): import pandas as pd pred_df.reset_index(inplace=True) source_df.reset_index(inplace=True) merged_preds_df = pd.merge(pred_df, source_df, left_on=merge_on_column_name, right_on=merge_on_column_name, how="inner") if "Date" in str(merged_preds_df.columns): merged_preds_df["Date"] = pd.to_datetime(merged_preds_df["Date"], format='%Y-%m-%d') if "FDate" in str(merged_preds_df.columns): merged_preds_df["FDate"] = pd.to_datetime(merged_preds_df["FDate"], format='%Y-%m-%d') return merged_preds_df # end of ml_regression_build_prediction_results_df
def convert_date_string_to_date(date_str, optional_format="%Y-%m-%dT%H:%M:%S.%fZ"): date_to_return = None try: import datetime date_to_return = datetime.datetime.strptime(str(date_str), optional_format) except Exception,f: self.lg("ERROR: Failed Converting Date(" + str(date_str) + ") with Format(" + str(optional_format) + ")", 0) # end of tries to read this string as a valid date... return date_to_return # end of convert_date_string_to_date
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.add_column('popular_term', sa.Column('date_search', sa.Date(), nullable=True)) op.create_index(op.f('ix_popular_term_date_search'), 'popular_term', ['date_search'], unique=False) op.drop_index('ix_popular_term_term', table_name='popular_term') op.create_index(op.f('ix_popular_term_term'), 'popular_term', ['term'], unique=False) ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.add_column('tv_serie', sa.Column('year', sa.Date(), nullable=True)) op.create_unique_constraint(None, 'tv_serie', ['id']) ### end Alembic commands ###
def index(): conn,curr = sphinx_conn() totalsql = 'select count(*) from film' curr.execute(totalsql) totalcounts = curr.fetchall() total = int(totalcounts[0]['count(*)']) sphinx_close(curr,conn) keywords=Search_Keywords.query.order_by(Search_Keywords.order).limit(6) form=SearchForm() today = db.session.query(func.sum(Search_Statusreport.new_hashes)).filter(cast(Search_Statusreport.date, Date) == datetime.date.today()).scalar() return render_template('index.html',form=form,keywords=keywords,total=total,today=today,sitename=sitename)
def bulk_insert(self, table, rows): """Issue a "bulk insert" operation using the current migration context. This provides a means of representing an INSERT of multiple rows which works equally well in the context of executing on a live connection as well as that of generating a SQL script. In the case of a SQL script, the values are rendered inline into the statement. e.g.:: from alembic import op from datetime import date from sqlalchemy.sql import table, column from sqlalchemy import String, Integer, Date # Create an ad-hoc table to use for the insert statement. accounts_table = table('account', column('id', Integer), column('name', String), column('create_date', Date) ) op.bulk_insert(accounts_table, [ {'id':1, 'name':'John Smith', 'create_date':date(2010, 10, 5)}, {'id':2, 'name':'Ed Williams', 'create_date':date(2007, 5, 27)}, {'id':3, 'name':'Wendy Jones', 'create_date':date(2008, 8, 15)}, ] ) """ self.impl.bulk_insert(table, rows)
def is_date_field(model, fieldname): """Returns ``True`` if and only if the field of `model` with the specified name corresponds to either a :class:`datetime.date` object or a :class:`datetime.datetime` object. """ fieldtype = get_field_type(model, fieldname) return isinstance(fieldtype, Date) or isinstance(fieldtype, DateTime)
def strings_to_dates(model, dictionary): """Returns a new dictionary with all the mappings of `dictionary` but with date strings and intervals mapped to :class:`datetime.datetime` or :class:`datetime.timedelta` objects. The keys of `dictionary` are names of fields in the model specified in the constructor of this class. The values are values to set on these fields. If a field name corresponds to a field in the model which is a :class:`sqlalchemy.types.Date`, :class:`sqlalchemy.types.DateTime`, or :class:`sqlalchemy.Interval`, then the returned dictionary will have the corresponding :class:`datetime.datetime` or :class:`datetime.timedelta` Python object as the value of that mapping in place of the string. This function outputs a new dictionary; it does not modify the argument. """ result = {} for fieldname, value in dictionary.items(): if is_date_field(model, fieldname) and value is not None: if value.strip() == '': result[fieldname] = None elif value in CURRENT_TIME_MARKERS: result[fieldname] = getattr(func, value.lower())() else: result[fieldname] = parse_datetime(value) elif (is_interval_field(model, fieldname) and value is not None and isinstance(value, int)): result[fieldname] = datetime.timedelta(seconds=value) else: result[fieldname] = value return result
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('menu_entries', sa.Column('id', sa.Integer(), nullable=False), sa.Column('time_scraped', sa.DateTime(), nullable=False), sa.Column('date_valid', sa.Date(), nullable=False), sa.Column('mensa', sa.String(length=64), nullable=False), sa.Column('category', sa.String(length=64), nullable=False), sa.Column('description', sa.String(length=500), nullable=False), sa.PrimaryKeyConstraint('id') ) ### end Alembic commands ###
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('settings', sa.Column('accept_dues_until', sa.Date(), server_default=datetime.now().strftime("%Y-%m-%d"), nullable=True)) # ### end Alembic commands ###
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.add_column('major_projects', sa.Column('date', sa.Date(), server_default=datetime.now().strftime("%Y-%m-%d"), nullable=False)) ### end Alembic commands ###
def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('proposal', sa.Column('vote_day', sa.Date(), nullable=True)) # ### end Alembic commands ###
def __str__(self): ''' Method for pretty printing ''' return '\n'.join(['Content: ' + str(self.content), 'Author: ' + str(self.author), 'Posting Date: ' + str(self.posting_date), 'To: ' + str(self.to), 'From: ' + str(self.frm), 'Time: ' + str(self.time), 'Date: ' + str(self.date), 'PostId: ' + str(self.post_id) ])
def prepare_queryset(query, model, key, value): return query.filter(cast(getattr(model, key), Date) == datetime.strptime(value[0], '%Y-%m-%dT%H:%M:%S.%fZ').date())
def prepare_queryset(query, model, key, value): return query.filter(cast(getattr(model, key), Date) >= datetime.strptime(value[0], '%Y-%m-%dT%H:%M:%S.%fZ').date())
def prepare_queryset(query, model, key, value): return query.filter(cast(getattr(model, key), Date) <= datetime.strptime(value[0], '%Y-%m-%dT%H:%M:%S.%fZ').date())
def prepare_queryset(query, model, key, values): if len(values) == 1: values = values[0].split(',') return query.filter(cast(getattr(model, key), Date) .between(datetime.strptime(values[0], '%Y-%m-%dT%H:%M:%S.%fZ').date(), datetime.strptime(values[1], '%Y-%m-%dT%H:%M:%S.%fZ').date()))
def upgrade(): ### commands auto generated by Alembic - please adjust! ### op.create_table('member', sa.Column('id', sa.Integer(), nullable=False), sa.Column('first_name', sa.String(length=64), nullable=True), sa.Column('last_name', sa.String(length=64), nullable=True), sa.Column('email', sa.String(length=128), nullable=True), sa.Column('password', sa.String(length=64), nullable=True), sa.Column('points', sa.Integer(), nullable=True), sa.Column('class_nb', sa.Integer(), nullable=True), sa.Column('section', sa.String(length=64), nullable=True), sa.Column('second_lang', sa.String(length=64), nullable=True), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('email') ) op.create_table('homework', sa.Column('id', sa.Integer(), nullable=False), sa.Column('member_id', sa.Integer(), nullable=False), sa.Column('subject', sa.String(length=128), nullable=True), sa.Column('section', sa.String(length=64), nullable=True), sa.Column('description', sa.String(length=256), nullable=True), sa.Column('end_date', sa.Date(), nullable=True), sa.Column('filename', sa.String(length=128), nullable=True), sa.Column('class_nb', sa.Integer(), nullable=True), sa.Column('is_public', sa.Boolean(), nullable=True), sa.ForeignKeyConstraint(['member_id'], ['member.id'], ), sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('filename') ) ### end Alembic commands ###
def upgrade(): op.create_table('pfizer', # Meta sa.Column('meta_uuid', sa.Text), sa.Column('meta_source', sa.Text), sa.Column('meta_created', sa.DateTime(timezone=True)), sa.Column('meta_updated', sa.DateTime(timezone=True)), # General sa.Column('title', sa.Text), # Description sa.Column('study_type', sa.Text), sa.Column('organization_id', sa.Text), sa.Column('nct_id', sa.Text), sa.Column('status', sa.Text), sa.Column('study_start_date', sa.Date), sa.Column('study_end_date', sa.Date), # Eligibility sa.Column('eligibility_criteria', sa.Text), sa.Column('gender', sa.Text), sa.Column('age_range', sa.Text), sa.Column('healthy_volunteers_allowed', sa.Boolean), )
def upgrade(): op.add_column('nct', sa.Column('results_exemption_date', sa.Date))
def upgrade(): op.create_table('ictrp', # Meta sa.Column('meta_uuid', sa.Text), sa.Column('meta_source', sa.Text), sa.Column('meta_created', sa.DateTime(timezone=True)), sa.Column('meta_updated', sa.DateTime(timezone=True)), # Main sa.Column('register', sa.Text, primary_key=True), sa.Column('last_refreshed_on', sa.Date), sa.Column('main_id', sa.Text, primary_key=True), sa.Column('date_of_registration', sa.Text), sa.Column('primary_sponsor', sa.Text), sa.Column('public_title', sa.Text), sa.Column('scientific_title', sa.Text), sa.Column('date_of_first_enrollment', sa.Text), sa.Column('target_sample_size', sa.Integer), sa.Column('recruitment_status', sa.Text), sa.Column('url', sa.Text), sa.Column('study_type', sa.Text), sa.Column('study_design', sa.Text), sa.Column('study_phase', sa.Text), # Additional sa.Column('countries_of_recruitment', ARRAY(sa.Text)), sa.Column('contacts', JSONB), sa.Column('key_inclusion_exclusion_criteria', sa.Text), sa.Column('health_conditions_or_problems_studied', ARRAY(sa.Text)), sa.Column('interventions', ARRAY(sa.Text)), sa.Column('primary_outcomes', ARRAY(sa.Text)), sa.Column('secondary_outcomes', ARRAY(sa.Text)), sa.Column('secondary_ids', ARRAY(sa.Text)), sa.Column('sources_of_monetary_support', ARRAY(sa.Text)), sa.Column('secondary_sponsors', ARRAY(sa.Text)), )
def __init__(self, formats, **params): super(Date, self).__init__(**params) if not isinstance(formats, (list, tuple)): formats = [formats] self.__formats = formats