我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.types.NULLTYPE。
def __init__(self, cursor, engine, typemap = None): """ResultProxy objects are constructed via the execute() method on SQLEngine.""" self.cursor = cursor self.engine = engine self.echo = engine.echo=="debug" self.rowcount = engine.context.rowcount metadata = cursor.description self.props = {} self.keys = [] i = 0 if metadata is not None: for item in metadata: # sqlite possibly prepending table name to colnames so strip colname = item[0].split('.')[-1].lower() if typemap is not None: rec = (typemap.get(colname, types.NULLTYPE), i) else: rec = (types.NULLTYPE, i) if rec[0] is None: raise "None for metadata " + colname if self.props.setdefault(colname, rec) is not rec: self.props[colname] = (ResultProxy.AmbiguousColumn(colname), 0) self.keys.append(colname) self.props[i] = rec i+=1
def to_constraint(self, migration_context=None): if not util.sqla_100: raise NotImplementedError( "ExcludeConstraint not supported until SQLAlchemy 1.0") if self._orig_constraint is not None: return self._orig_constraint schema_obj = schemaobj.SchemaObjects(migration_context) t = schema_obj.table(self.table_name, schema=self.schema) excl = ExcludeConstraint( *self.elements, name=self.constraint_name, where=self.where, **self.kw ) for expr, name, oper in excl._render_exprs: t.append_column(Column(name, NULLTYPE)) t.append_constraint(excl) return excl
def _setup_referent(self, metadata, constraint): spec = constraint.elements[0]._get_colspec() parts = spec.split(".") tname = parts[-2] if len(parts) == 3: referent_schema = parts[0] else: referent_schema = None if tname != '_alembic_batch_temp': key = sql_schema._get_table_key(tname, referent_schema) if key in metadata.tables: t = metadata.tables[key] for elem in constraint.elements: colname = elem._get_colspec().split(".")[-1] if not t.c.contains_column(colname): t.append_column( Column(colname, sqltypes.NULLTYPE) ) else: Table( tname, metadata, *[Column(n, sqltypes.NULLTYPE) for n in [elem._get_colspec().split(".")[-1] for elem in constraint.elements]], schema=referent_schema)
def _get_column_info(self, name, type_, nullable, autoincrement, default, precision, scale, length): coltype = self.ischema_names.get(type_, None) kwargs = {} if coltype in (NUMERIC, DECIMAL): args = (precision, scale) elif coltype == FLOAT: args = (precision,) elif coltype in (CHAR, VARCHAR, UNICHAR, UNIVARCHAR, NCHAR, NVARCHAR): args = (length,) else: args = () if coltype: coltype = coltype(*args, **kwargs) # is this necessary # if is_array: # coltype = ARRAY(coltype) else: util.warn("Did not recognize type '%s' of column '%s'" % (type_, name)) coltype = sqltypes.NULLTYPE if default: default = re.sub("DEFAULT", "", default).strip() default = re.sub("^'(.*)'$", lambda m: m.group(1), default) else: default = None column_info = dict(name=name, type=coltype, nullable=nullable, default=default, autoincrement=autoincrement) return column_info
def _textual_index_column(table, text_): """a workaround for the Index construct's severe lack of flexibility""" if isinstance(text_, compat.string_types): c = Column(text_, sqltypes.NULLTYPE) table.append_column(c) return c elif isinstance(text_, TextClause): return _textual_index_element(table, text_) else: raise ValueError("String or text() construct expected")
def __init__(self, table, text): self.table = table self.text = text self.key = text.text self.fake_column = schema.Column(self.text.text, sqltypes.NULLTYPE) table.append_column(self.fake_column)
def to_column(self, migration_context=None): if self._orig_column is not None: return self._orig_column schema_obj = schemaobj.SchemaObjects(migration_context) return schema_obj.column(self.column_name, NULLTYPE)
def primary_key_constraint(self, name, table_name, cols, schema=None): m = self.metadata() columns = [sa_schema.Column(n, NULLTYPE) for n in cols] t = sa_schema.Table( table_name, m, *columns, schema=schema) p = sa_schema.PrimaryKeyConstraint( *[t.c[n] for n in cols], name=name) t.append_constraint(p) return p
def unique_constraint(self, name, source, local_cols, schema=None, **kw): t = sa_schema.Table( source, self.metadata(), *[sa_schema.Column(n, NULLTYPE) for n in local_cols], schema=schema) kw['name'] = name uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw) # TODO: need event tests to ensure the event # is fired off here t.append_constraint(uq) return uq
def _ensure_table_for_fk(self, metadata, fk): """create a placeholder Table object for the referent of a ForeignKey. """ if isinstance(fk._colspec, string_types): table_key, cname = fk._colspec.rsplit('.', 1) sname, tname = self._parse_table_key(table_key) if table_key not in metadata.tables: rel_t = sa_schema.Table(tname, metadata, schema=sname) else: rel_t = metadata.tables[table_key] if cname not in rel_t.c: rel_t.append_column(sa_schema.Column(cname, NULLTYPE))
def __init__(self, name, *clauses, **kwargs): self.name = name self.type = kwargs.get('type', sqltypes.NULLTYPE) ClauseList.__init__(self, parens=True, *clauses)
def _primary_key_constraint(self, name, table_name, cols, schema=None): m = sa_schema.MetaData() columns = [sa_schema.Column(n, NULLTYPE) for n in cols] t1 = sa_schema.Table(table_name, m, *columns, schema=schema) p = sa_schema.PrimaryKeyConstraint(*columns, name=name) t1.append_constraint(p) return p
def _foreign_key_constraint(self, name, source, referent, local_cols, remote_cols, onupdate=None, ondelete=None, deferrable=None, source_schema=None, referent_schema=None): m = sa_schema.MetaData() if source == referent: t1_cols = local_cols + remote_cols else: t1_cols = local_cols sa_schema.Table(referent, m, *[sa_schema.Column(n, NULLTYPE) for n in remote_cols], schema=referent_schema) t1 = sa_schema.Table(source, m, *[sa_schema.Column(n, NULLTYPE) for n in t1_cols], schema=source_schema) tname = "%s.%s" % (referent_schema, referent) if referent_schema \ else referent f = sa_schema.ForeignKeyConstraint(local_cols, ["%s.%s" % (tname, n) for n in remote_cols], name=name, onupdate=onupdate, ondelete=ondelete, deferrable=deferrable ) t1.append_constraint(f) return f
def _unique_constraint(self, name, source, local_cols, schema=None, **kw): t = sa_schema.Table(source, sa_schema.MetaData(), *[sa_schema.Column(n, NULLTYPE) for n in local_cols], schema=schema) kw['name'] = name uq = sa_schema.UniqueConstraint(*[t.c[n] for n in local_cols], **kw) # TODO: need event tests to ensure the event # is fired off here t.append_constraint(uq) return uq
def _index(self, name, tablename, columns, schema=None, **kw): t = sa_schema.Table(tablename or 'no_table', sa_schema.MetaData(), *[sa_schema.Column(n, NULLTYPE) for n in columns], schema=schema ) return sa_schema.Index(name, *[t.c[n] for n in columns], **kw)
def _get_column_info(self, name, type_, nullable, autoincrement, default, precision, scale, length): coltype = self.ischema_names.get(type_, None) kwargs = {} if coltype in (NUMERIC, DECIMAL): args = (precision, scale) elif coltype == FLOAT: args = (precision,) elif coltype in (CHAR, VARCHAR, UNICHAR, UNIVARCHAR, NCHAR, NVARCHAR): args = (length,) else: args = () if coltype: coltype = coltype(*args, **kwargs) # is this necessary # if is_array: # coltype = ARRAY(coltype) else: util.warn("Did not recognize type '%s' of column '%s'" % (type_, name)) coltype = sqltypes.NULLTYPE if default: default = default.replace("DEFAULT", "").strip() default = re.sub("^'(.*)'$", lambda m: m.group(1), default) else: default = None column_info = dict(name=name, type=coltype, nullable=nullable, default=default, autoincrement=autoincrement) return column_info
def _get_column_info(self, name, type_, nullable, autoincrement, default, precision, scale, length): coltype = self.ischema_names.get(type_, None) kwargs = {} if coltype in (NUMERIC, DECIMAL): args = (precision, scale) elif coltype == FLOAT: args = (precision,) elif coltype in (CHAR, VARCHAR, UNICHAR, UNIVARCHAR, NCHAR, NVARCHAR): args = (length,) else: args = () if coltype: coltype = coltype(*args, **kwargs) #is this necessary #if is_array: # coltype = ARRAY(coltype) else: util.warn("Did not recognize type '%s' of column '%s'" % (type_, name)) coltype = sqltypes.NULLTYPE if default: default = re.sub("DEFAULT", "", default).strip() default = re.sub("^'(.*)'$", lambda m: m.group(1), default) else: default = None column_info = dict(name=name, type=coltype, nullable=nullable, default=default, autoincrement=autoincrement) return column_info