我们从Python开源项目中,提取了以下9个代码示例,用于说明如何使用sqlalchemy.types()。
def _get_dtype(self, sqltype): from sqlalchemy.types import (Integer, Float, Boolean, DateTime, Date, TIMESTAMP) if isinstance(sqltype, Float): return float elif isinstance(sqltype, Integer): # TODO: Refine integer size. return np.dtype('int64') elif isinstance(sqltype, TIMESTAMP): # we have a timezone capable type if not sqltype.timezone: return datetime return DatetimeTZDtype elif isinstance(sqltype, DateTime): # Caution: np.datetime64 is also a subclass of np.number. return datetime elif isinstance(sqltype, Date): return date elif isinstance(sqltype, Boolean): return bool return object
def get_columns(self, connection, table_name, schema=None, **kw): q = "SELECT * FROM `%(table_id)s` LIMIT 0" % ({"table_id": table_name}) columns = connection.execute(q) result = [] for column_name in columns.keys(): # TODO Handle types better column = { "name": column_name, "type": VARCHAR, "default": None, "autoincrement": None, "nullable": False, } result.append(column) return result
def __init__(self, table): super(Model, self).__init__() self.table = table self.schema = table.schema # Adapt column types to the most reasonable generic types (ie. VARCHAR -> String) for column in table.columns: cls = column.type.__class__ for supercls in cls.__mro__: if hasattr(supercls, '__visit_name__'): cls = supercls if supercls.__name__ != supercls.__name__.upper() and not supercls.__name__.startswith('_'): break column.type = column.type.adapt(cls)
def render_column_type(coltype): args = [] if isinstance(coltype, Enum): args.extend(repr(arg) for arg in coltype.enums) if coltype.name is not None: args.append('name={0!r}'.format(coltype.name)) else: # All other types argspec = _getargspec_init(coltype.__class__.__init__) defaults = dict(zip(argspec.args[-len(argspec.defaults or ()):], argspec.defaults or ())) missing = object() use_kwargs = False for attr in argspec.args[1:]: # Remove annoyances like _warn_on_bytestring if attr.startswith('_'): continue value = getattr(coltype, attr, missing) default = defaults.get(attr, missing) if value is missing or value == default: use_kwargs = True elif use_kwargs: args.append('{0}={1}'.format(attr, repr(value))) else: args.append(repr(value)) rendered = coltype.__class__.__name__ if args: rendered += '({0})'.format(', '.join(args)) return rendered
def LIMIT1_get_columns(self, connection, table_name, schema=None, **kw): # This method stinks at getting columns from a speed perspective, because it's slow (it has to process a query) # However, it's accurate, because it takes a look at the data, and provides a good representation of the types it saw. (for 1 record, not idea...) q = "SELECT * FROM %(table_id)s LIMIT 1" % ({"table_id": table_name})# # q = "DESCRIBE %(table_id)s" % ({"table_id": table_name}) cursor = connection.execute(q) desc = cursor.cursor.getdesc() result = [] for col in desc: cname = col[0] bisnull = True ctype = _type_map[col[1]] column = { "name": cname, "type": ctype, "default": None, "autoincrement": None, "nullable": bisnull, } result.append(column) return(result)
def _harmonize_columns(self, parse_dates=None): """ Make the DataFrame's column types align with the SQL table column types. Need to work around limited NA value support. Floats are always fine, ints must always be floats if there are Null values. Booleans are hard because converting bool column with None replaces all Nones with false. Therefore only convert bool if there are no NA values. Datetimes should already be converted to np.datetime64 if supported, but here we also force conversion if required """ # handle non-list entries for parse_dates gracefully if parse_dates is True or parse_dates is None or parse_dates is False: parse_dates = [] if not hasattr(parse_dates, '__iter__'): parse_dates = [parse_dates] for sql_col in self.table.columns: col_name = sql_col.name try: df_col = self.frame[col_name] # the type the dataframe column should have col_type = self._get_dtype(sql_col.type) if (col_type is datetime or col_type is date or col_type is DatetimeTZDtype): self.frame[col_name] = _handle_date_column(df_col) elif col_type is float: # floats support NA, can always convert! self.frame[col_name] = df_col.astype(col_type, copy=False) elif len(df_col) == df_col.count(): # No NA values, can convert ints and bools if col_type is np.dtype('int64') or col_type is bool: self.frame[col_name] = df_col.astype( col_type, copy=False) # Handle date parsing if col_name in parse_dates: try: fmt = parse_dates[col_name] except TypeError: fmt = None self.frame[col_name] = _handle_date_column( df_col, format=fmt) except KeyError: pass # this column not in results
def _sqlalchemy_type(self, col): dtype = self.dtype or {} if col.name in dtype: return self.dtype[col.name] col_type = self._get_notnull_col_dtype(col) from sqlalchemy.types import (BigInteger, Integer, Float, Text, Boolean, DateTime, Date, Time) if col_type == 'datetime64' or col_type == 'datetime': try: tz = col.tzinfo # noqa return DateTime(timezone=True) except: return DateTime if col_type == 'timedelta64': warnings.warn("the 'timedelta' type is not supported, and will be " "written as integer values (ns frequency) to the " "database.", UserWarning, stacklevel=8) return BigInteger elif col_type == 'floating': if col.dtype == 'float32': return Float(precision=23) else: return Float(precision=53) elif col_type == 'integer': if col.dtype == 'int32': return Integer else: return BigInteger elif col_type == 'boolean': return Boolean elif col_type == 'date': return Date elif col_type == 'time': return Time elif col_type == 'complex': raise ValueError('Complex datatypes not supported') return Text
def to_sql(self, frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None, dtype=None): """ Write records stored in a DataFrame to a SQL database. Parameters ---------- frame : DataFrame name : string Name of SQL table if_exists : {'fail', 'replace', 'append'}, default 'fail' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. index : boolean, default True Write DataFrame index as a column index_label : string or sequence, default None Column label for index column(s). If None is given (default) and `index` is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex. schema : string, default None Name of SQL schema in database to write to (if database flavor supports this). If specified, this overwrites the default schema of the SQLDatabase object. chunksize : int, default None If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once. dtype : dict of column name to SQL type, default None Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type. """ if dtype is not None: from sqlalchemy.types import to_instance, TypeEngine for col, my_type in dtype.items(): if not isinstance(to_instance(my_type), TypeEngine): raise ValueError('The type of %s is not a SQLAlchemy ' 'type ' % col) table = SQLTable(name, self, frame=frame, index=index, if_exists=if_exists, index_label=index_label, schema=schema, dtype=dtype) table.create() table.insert(chunksize) # check for potentially case sensitivity issues (GH7815) engine = self.connectable.engine with self.connectable.connect() as conn: table_names = engine.table_names( schema=schema or self.meta.schema, connection=conn, ) if name not in table_names: warnings.warn("The provided table name '{0}' is not found exactly " "as such in the database after writing the table, " "possibly due to case sensitivity issues. Consider " "using lower case table names.".format(name), UserWarning)