我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.types.DateTime()。
def initialize(self, connection): super(FBDialect, self).initialize(connection) self._version_two = ('firebird' in self.server_version_info and self.server_version_info >= (2, ) ) or \ ('interbase' in self.server_version_info and self.server_version_info >= (6, ) ) if not self._version_two: # TODO: whatever other pre < 2.0 stuff goes here self.ischema_names = ischema_names.copy() self.ischema_names['TIMESTAMP'] = sqltypes.DATE self.colspecs = { sqltypes.DateTime: sqltypes.DATE } self.implicit_returning = self._version_two and \ self.__dict__.get('implicit_returning', True)
def initialize(self, connection): super(FBDialect, self).initialize(connection) self._version_two = ('firebird' in self.server_version_info and \ self.server_version_info >= (2, ) ) or \ ('interbase' in self.server_version_info and \ self.server_version_info >= (6, ) ) if not self._version_two: # TODO: whatever other pre < 2.0 stuff goes here self.ischema_names = ischema_names.copy() self.ischema_names['TIMESTAMP'] = sqltypes.DATE self.colspecs = { sqltypes.DateTime: sqltypes.DATE } self.implicit_returning = self._version_two and \ self.__dict__.get('implicit_returning', True)
def test_querying_table(metadata): """ Create an object for test table. """ # When using pytest-xdist, we don't want concurrent table creations # across test processes so we assign a unique name for table based on # the current worker id. worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master') return Table( 'test_querying_table_' + worker_id, metadata, Column('id', types.Integer, autoincrement=True, primary_key=True), Column('t_string', types.String(60)), Column('t_list', types.ARRAY(types.String(60))), Column('t_enum', types.Enum(MyEnum)), Column('t_int_enum', types.Enum(MyIntEnum)), Column('t_datetime', types.DateTime()), Column('t_date', types.DateTime()), Column('t_interval', types.Interval()), Column('uniq_uuid', PG_UUID, nullable=False, unique=True, default=uuid4), )
def _compare_type_affinity(self, other): return other._type_affinity in (sqltypes.DateTime, sqltypes.Date)
def convert_bind_param(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime # this one doesnt seem to work with the "emulation" mode return psycopg.TimestampFromMx(value)
def convert_result_value(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime return value
def convert_bind_param(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime # this one doesnt seem to work with the "emulation" mode return psycopg.DateFromMx(value)
def convert_bind_param(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime # this one doesnt seem to work with the "emulation" mode return psycopg.TimeFromMx(value)
def upgrade(): for column in ('last_fetch', 'last_refresh', 'last_delete', 'next_delete'): op.alter_column('accounts', column, type_=DateTime(timezone=True))
def downgrade(): for column in ('last_fetch', 'last_refresh', 'last_delete', 'next_delete'): op.alter_column('accounts', column, type_=DateTime(timezone=False))
def upgrade(): for table in ('accounts', 'oauth_tokens', 'posts', 'sessions', 'twitter_archives', 'mastodon_apps'): for column in ('created_at', 'updated_at'): op.alter_column(table, column, type_=DateTime(timezone=True))
def downgrade(): for table in ('account', 'oauth_tokens', 'posts', 'sessions', 'twitter_archives', 'mastodon_apps'): for column in ('created_at', 'updated_at'): op.alter_column(table, column, type_=DateTime(timezone=False))
def define_request_data_table(): global request_data_table request_data_table = Table('ckanext_requestdata_requests', metadata, Column('id', types.UnicodeText, primary_key=True, default=make_uuid), Column('sender_name', types.UnicodeText, nullable=False), Column('sender_user_id', types.UnicodeText, nullable=False), Column('email_address', types.UnicodeText, nullable=False), Column('message_content', types.UnicodeText, nullable=False), Column('package_id', types.UnicodeText, nullable=False), Column('state', types.UnicodeText, default=u'new'), Column('data_shared', types.Boolean, default=False), Column('rejected', types.Boolean, default=False), Column('created_at', types.DateTime, default=datetime.datetime.now), Column('modified_at', types.DateTime, default=datetime.datetime.now), Index('ckanext_requestdata_requests_id_idx', 'id')) mapper( ckanextRequestdata, request_data_table )
def bind_processor(self, dialect): datetime_datetime = datetime.datetime datetime_date = datetime.date format = self._storage_format def process(value): if value is None: return None elif isinstance(value, datetime_datetime): return format % { 'year': value.year, 'month': value.month, 'day': value.day, 'hour': value.hour, 'minute': value.minute, 'second': value.second, 'microsecond': value.microsecond, } elif isinstance(value, datetime_date): return format % { 'year': value.year, 'month': value.month, 'day': value.day, 'hour': 0, 'minute': 0, 'second': 0, 'microsecond': 0, } else: raise TypeError("SQLite DateTime type only accepts Python " "datetime and date objects as input.") return process