我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.sql.and_()。
def delete_obj(self, objects, uow): """called by a UnitOfWork object to delete objects, which involves a DELETE statement for each table used by this mapper, for each object in the list.""" for table in self.tables: if not self._has_pks(table): continue delete = [] for obj in objects: params = {} if not hasattr(obj, "_instance_key"): continue else: delete.append(params) for col in self.pks_by_table[table]: params[col.key] = self._getattrbycolumn(obj, col) self.extension.before_delete(self, obj) if len(delete): clause = sql.and_() for col in self.pks_by_table[table]: clause.clauses.append(col == sql.bindparam(col.key)) statement = table.delete(clause) c = statement.execute(*delete) if table.engine.supports_sane_rowcount() and c.rowcount != len(delete): raise "ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete))
def setup_loader(self, instance): def lazyload(): clause = sql.and_() try: pk = self.parent.pks_by_table[self.columns[0].table] except KeyError: pk = self.columns[0].table.primary_key for primary_key in pk: attr = self.parent._getattrbycolumn(instance, primary_key) if not attr: return None clause.clauses.append(primary_key == attr) if self.group is not None: groupcols = [p for p in self.parent.props.values() if isinstance(p, DeferredColumnProperty) and p.group==self.group] row = sql.select([g.columns[0] for g in groupcols], clause, use_labels=True).execute().fetchone() for prop in groupcols: if prop is self: continue instance.__dict__[prop.key] = row[prop.columns[0]] objectstore.global_attributes.create_history(instance, prop.key, uselist=False) return row[self.columns[0]] else: return sql.select([self.columns[0]], clause, use_labels=True).scalar() return lazyload
def create_lazy_clause(table, primaryjoin, secondaryjoin, foreignkey): binds = {} def visit_binary(binary): circular = isinstance(binary.left, schema.Column) and isinstance(binary.right, schema.Column) and binary.left.table is binary.right.table if isinstance(binary.left, schema.Column) and ((not circular and binary.left.table is table) or (circular and binary.right is foreignkey)): binary.left = binds.setdefault(binary.left, sql.BindParamClause(binary.right.table.name + "_" + binary.right.name, None, shortname = binary.left.name)) binary.swap() if isinstance(binary.right, schema.Column) and ((not circular and binary.right.table is table) or (circular and binary.left is foreignkey)): binary.right = binds.setdefault(binary.right, sql.BindParamClause(binary.left.table.name + "_" + binary.left.name, None, shortname = binary.right.name)) if secondaryjoin is not None: lazywhere = sql.and_(primaryjoin, secondaryjoin) else: lazywhere = primaryjoin lazywhere = lazywhere.copy_container() li = BinaryVisitor(visit_binary) lazywhere.accept_visitor(li) return (lazywhere, binds)
def insertDb(self, ip_list): print '[+]', 'insert db' save_pool = [] # print ip_list for ip in ip_list: old_ip = self.session.query(IpPool).filter(and_(IpPool.ip == ip['ip'], IpPool.port == ip['port'])).first() if old_ip: continue if len(save_pool) > 100: self.session.add_all(save_pool) self.session.commit() print '[+] ', "session commit" save_pool = [] else: ip_obj = IpPool(ip=ip['ip'], port=ip['port'], location=ip['location'], iptype=ip['iptype'], protocol=ip['protocol']) save_pool.append(ip_obj) if save_pool: self.session.add_all(save_pool) self.session.commit() # self.session.close()
def find_author(ain): async with engine.acquire() as conn: author = model.Author.__table__ if ain.get('first_name'): where = and_(author.c.first_name == ain[ 'first_name'], author.c.last_name == ain['last_name']) else: where = and_( author.c.last_name == ain['last_name'], author.c.first_name == None) res = await conn.execute(select([author.c.id, author.c.first_name, author.c.last_name]).where(where)) a = await res.fetchone() if a: ao = {'id': a[0], 'last_name': a[2]} if a[1]: ao['first_name'] = a[1] return ao
def get_ebooks_ids_for_object(object_name, id): async with engine.acquire() as conn: if object_name.lower() == 'author': q = select([model.ebook_authors.c.ebook_id]).where(model.ebook_authors.c.author_id == id) elif object_name.lower() == 'series': ebook = model.Ebook.__table__ q = select([ebook.c.id]).where(ebook.c.series_id == id) elif object_name.lower() == 'bookshelf': bookshelf_item = model.BookshelfItem.__table__ q = select([bookshelf_item.c.ebook_id]).where(and_(bookshelf_item.c.ebook_id != None, bookshelf_item.c.bookshelf_id == id)).distinct() else: raise ValueError('Invalid object_name') res = await conn.execute(q) res = await res.fetchall() return list(map(lambda x: x[0], res))
def get_conversion_candidate(ebook_id, to_format): to_format_id = await get_format_id(to_format) async with engine.acquire() as conn: source = model.Source.__table__ format = model.Format.__table__ res = await conn.execute(select([source.c.id, format.c.extension]).where(and_(source.c.ebook_id == ebook_id, source.c.format_id == to_format_id, source.c.format_id == format.c.id))\ .order_by(nullslast(desc(source.c.quality)))) res = await res.first() if res: return res.as_tuple() #TODO: Consider optimal selection of the source # in previous version we first selected format (from available convertable in ebook) # and then one with best quality - so actually the other way around q=select([source.c.id, format.c.extension])\ .where(and_(source.c.format_id == format.c.id, source.c.ebook_id == ebook_id)).order_by(nullslast(desc(source.c.quality))) async for row in conn.execute(q): if row.extension in settings.CONVERTABLE_TYPES: return row.id, row.extension return None, None
def delete_report_data(report_id): PlotData.query.filter(PlotData.report_id==report_id).delete() db.session.commit() PlotCategory.query.filter(PlotCategory.plot_category_id.in_(db.session.query(PlotCategory.plot_category_id).outerjoin(PlotData).filter(PlotData.plot_data_id==None))).delete(synchronize_session='fetch') db.session.commit() db.session.execute(user_plotconfig_map.delete().where(user_plotconfig_map.c.plot_config_id.in_(db.session.query(PlotConfig.config_id).outerjoin(PlotData).filter(PlotData.plot_data_id==None)))) db.session.commit() #user_plotconfig_map.query.filter(user_plotconfig_map.plot_config_id.in_(db.session.query(PlotConfig.plot_config_id).outerjoin(PlotData).filter(PlotData.plot_data_id==None))).delete(synchronize_session='fetch') PlotConfig.query.filter(PlotConfig.config_id.in_(db.session.query(PlotConfig.config_id).outerjoin(PlotData).outerjoin(PlotCategory, PlotCategory.config_id==PlotConfig.config_id).filter(and_(PlotData.plot_data_id==None, PlotCategory.plot_category_id==None)))).delete(synchronize_session='fetch') db.session.commit() SampleData.query.filter(SampleData.report_id==report_id).delete() db.session.commit() SampleDataType.query.filter(SampleDataType.sample_data_type_id.in_(db.session.query(SampleDataType.sample_data_type_id).outerjoin(SampleData).filter(SampleData.sample_data_id==None))).delete(synchronize_session='fetch') db.session.commit() ReportMeta.query.filter(ReportMeta.report_id==report_id).delete() db.session.commit() Sample.query.filter(Sample.report_id==report_id).delete() db.session.commit() Report.query.filter(Report.report_id==report_id).delete() db.session.commit()
def fixed_ip_get_by_instance(context, instance_uuid): if not uuidutils.is_uuid_like(instance_uuid): raise exception.InvalidUUID(uuid=instance_uuid) vif_and = and_(models.VirtualInterface.id == models.FixedIp.virtual_interface_id, models.VirtualInterface.deleted == 0) result = model_query(context, models.FixedIp, read_deleted="no").\ filter_by(instance_uuid=instance_uuid).\ outerjoin(models.VirtualInterface, vif_and).\ options(contains_eager("virtual_interface")).\ options(joinedload('network')).\ options(joinedload('floating_ips')).\ order_by(asc(models.VirtualInterface.created_at), asc(models.VirtualInterface.id)).\ all() if not result: raise exception.FixedIpNotFoundForInstance(instance_uuid=instance_uuid) return result
def network_get_all_by_host(context, host): fixed_host_filter = or_(models.FixedIp.host == host, and_(models.FixedIp.instance_uuid != null(), models.Instance.host == host)) fixed_ip_query = model_query(context, models.FixedIp, (models.FixedIp.network_id,)).\ outerjoin((models.Instance, models.Instance.uuid == models.FixedIp.instance_uuid)).\ filter(fixed_host_filter) # NOTE(vish): return networks that have host set # or that have a fixed ip with host set # or that have an instance with host set host_filter = or_(models.Network.host == host, models.Network.id.in_(fixed_ip_query.subquery())) return _network_get_query(context).filter(host_filter).all()
def _get_nonansi_join_whereclause(self, froms): clauses = [] def visit_join(join): if join.isouter: def visit_binary(binary): if binary.operator == sql_operators.eq: if join.right.is_derived_from(binary.left.table): binary.left = _OuterJoinColumn(binary.left) elif join.right.is_derived_from(binary.right.table): binary.right = _OuterJoinColumn(binary.right) clauses.append(visitors.cloned_traverse( join.onclause, {}, {'binary': visit_binary})) else: clauses.append(join.onclause) for j in join.left, join.right: if isinstance(j, expression.Join): visit_join(j) elif isinstance(j, expression.FromGrouping): visit_join(j.element) for f in froms: if isinstance(f, expression.Join): visit_join(f) if not clauses: return None else: return sql.and_(*clauses)
def generate_query_from_keywords(self, model, fulltextsearch=None, **kwargs): clauses = [_entity_descriptor(model, key) == value for key, value in kwargs.items() if key != 'info' and key != 'fav_user_ids'] queries = [] headlines = [] order_by_ranks = [] if 'info' in kwargs.keys(): #clauses = clauses + self.handle_info_json(model, kwargs['info'], # fulltextsearch) queries, headlines, order_by_ranks = self.handle_info_json(model, kwargs['info'], fulltextsearch) clauses = clauses + queries if len(clauses) != 1: return and_(*clauses), queries, headlines, order_by_ranks else: return (and_(*clauses), ), queries, headlines, order_by_ranks
def _compile(self, whereclause = None, **kwargs): order_by = kwargs.pop('order_by', False) if order_by is False: order_by = self.order_by if order_by is False: if self.table.default_order_by() is not None: order_by = self.table.default_order_by() if self._should_nest(**kwargs): s2 = sql.select(self.table.primary_key, whereclause, use_labels=True, **kwargs) if not kwargs.get('distinct', False) and order_by: s2.order_by(*util.to_list(order_by)) s3 = s2.alias('rowcount') crit = [] for i in range(0, len(self.table.primary_key)): crit.append(s3.primary_key[i] == self.table.primary_key[i]) statement = sql.select([], sql.and_(*crit), from_obj=[self.table], use_labels=True) if order_by: statement.order_by(*util.to_list(order_by)) else: statement = sql.select([], whereclause, from_obj=[self.table], use_labels=True, **kwargs) if order_by: statement.order_by(*util.to_list(order_by)) # for a DISTINCT query, you need the columns explicitly specified in order # to use it in "order_by". insure they are in the column criterion (particularly oid). # TODO: this should be done at the SQL level not the mapper level if kwargs.get('distinct', False) and order_by: statement.append_column(*util.to_list(order_by)) # plugin point # give all the attached properties a chance to modify the query for key, value in self.props.iteritems(): value.setup(key, statement, **kwargs) return statement
def _loadAttributes(self): for row in self._connection.execute(ex.select([md.InventoryClasses.c.class_namespace, md.InventoryClasses.c.class_name, md.InventoryClassAttributes]).select_from(ex.join(md.InventoryClassAttributes, md.InventoryClasses, md.InventoryClassAttributes.c.class_id == md.InventoryClasses.c.class_id)).where(and_(md.InventoryClasses.c.class_namespace == self._namespace, md.InventoryClasses.c.class_name == self._class_name))): self._classId = row["class_id"] self._attributes[row["attr_key"]] = {} for i in ["attr_name", "attr_type", "attr_default", "attr_mandatory"]: self._attributes[row["attr_key"]][i] = row[i]
def getObjectIdByName(self, object_name, object_subname=None): andList = [ md.InventoryObjects.c.class_id == self._classId, md.InventoryObjects.c.object_name == object_name ] if not object_subname is None: andList.append(md.InventoryObjects.c.object_subname == object_subname) object_id = None i = 0 for row in self._connection.execute(md.InventoryObjects.select().where(and_(*andList))): i = i + 1 object_id = row["object_id"] if i > 1: raise LookupException("Too many objects were found") if i == 0: raise EmptyLookupException("No objects were found") return object_id
def search(self, object_id=None, object_name=None, object_subname=None, **kwargs): andList = [ md.InventoryObjects.c.class_id == self._classId ] if not object_id is None: andList.append(md.InventoryObjects.c.object_id == object_id) if not object_name is None: andList.append(md.InventoryObjects.c.object_name.like(object_name)) if not object_subname is None: andList.append(md.InventoryObjects.c.object_subname.like(object_subname)) # append attributes subqueries for k in kwargs: if k in self._attributes: andList.append(md.InventoryObjects.c.object_id.in_( ex.select([md.InventoryObjectAttributes.c.object_id]).select_from(md.InventoryObjectAttributes).where(and_( md.InventoryObjectAttributes.c.class_id == self._classId, md.InventoryObjectAttributes.c.attr_key == k, md.InventoryObjectAttributes.c.attr_value.like(kwargs[k]) )) )) data = [] for row in self._connection.execute(md.InventoryObjects.select().where(and_(*andList))): data.append({ "object_id": row["object_id"], self._objectName : row["object_name"], self._objectSubName : row["object_subname"] }) return data
def delete(self, object_id=None, object_name=None, object_subname=None): assert not (object_id is None and object_name is None and object_subname is None), "At least one identifier must be set" if object_id is None: if object_subname is None: object_id = self.getObjectIdByName(object_name) else: object_id = self.getObjectIdByName(object_name, object_subname) else: self.getObjectId(object_id) result = self._connection.execute(md.InventoryObjects.delete().where(and_(md.InventoryObjects.c.class_id == self._classId, md.InventoryObjects.c.object_id == object_id))) return result.rowcount
def attributeExists(self, object_id, attribute_name): assert not (object_id is None), "At least one identifier must be set" for count in self._connection.execute(ex.select([func.count()]).select_from(md.InventoryObjectAttributes).where(and_(md.InventoryObjectAttributes.c.class_id == self._classId, md.InventoryObjectAttributes.c.object_id == object_id, md.InventoryObjectAttributes.c.attr_key == attribute_name))): count = count[0] if count == 0: return False else: return True
def updateAttributes(self, object_id=None, object_name=None, object_subname=None, **kwargs): assert not (object_id is None and object_name is None and object_subname is None), "At least one identifier must be set" if object_id is None: if object_subname is None: object_id = self.getObjectIdByName(object_name) else: object_id = self.getObjectIdByName(object_name, object_subname) else: self.getObjectId(object_id) self._validateAttributes(kwargs, checkMandatoryAttrs=False) for k in kwargs: if self.attributeExists(object_id, k): self._connection.execute(md.InventoryObjectAttributes.update().where(and_(md.InventoryObjectAttributes.c.class_id == self._classId, md.InventoryObjectAttributes.c.object_id == object_id, md.InventoryObjectAttributes.c.attr_key == k)).values(attr_value=str(kwargs[k]))) else: self._connection.execute(md.InventoryObjectAttributes.insert().values(object_id=object_id, class_id=self._classId, attr_key=str(k), attr_value=str(kwargs[k])))
def _lookupAttribtue(self, index): if isinstance(index, dict): if "object_id" in index: index["attributes"] = {} for row in self._connection.execute(md.InventoryObjectAttributes.select().where(and_(md.InventoryObjectAttributes.c.object_id == index["object_id"]))): index["attributes"][row["attr_key"]] = row["attr_value"] return index else: d = {} for row in self._connection.execute(md.InventoryObjectAttributes.select().where(and_(md.InventoryObjectAttributes.c.object_id == index))): d[row["attr_key"]] = row["attr_value"] return d
def get_relationships(self, with_package=None, type=None, active=True, direction='both'): '''Returns relationships this package has. Keeps stored type/ordering (not from pov of self).''' assert direction in ('both', 'forward', 'reverse') if with_package: assert isinstance(with_package, Package) from package_relationship import PackageRelationship forward_filters = [PackageRelationship.subject==self] reverse_filters = [PackageRelationship.object==self] if with_package: forward_filters.append(PackageRelationship.object==with_package) reverse_filters.append(PackageRelationship.subject==with_package) if active: forward_filters.append(PackageRelationship.state==core.State.ACTIVE) reverse_filters.append(PackageRelationship.state==core.State.ACTIVE) if type: forward_filters.append(PackageRelationship.type==type) reverse_type = PackageRelationship.reverse_type(type) reverse_filters.append(PackageRelationship.type==reverse_type) q = meta.Session.query(PackageRelationship) if direction == 'both': q = q.filter(or_( and_(*forward_filters), and_(*reverse_filters), )) elif direction == 'forward': q = q.filter(and_(*forward_filters)) elif direction == 'reverse': q = q.filter(and_(*reverse_filters)) return q.all()
def upgrade(migrate_engine): '''#1066 Change Visitor role on System from "reader" to "anon_editor".''' metadata = MetaData(migrate_engine) # get visitor ID user = Table('user', metadata, autoload=True) s = select([user.c.id, user.c.name], user.c.name == u'visitor') results = migrate_engine.execute(s).fetchall() if len(results) == 0: log.debug('No visitor on the system - obviously init hasn\'t been run yet' \ 'and that will init visitor to an anon_editor') return visitor_id, visitor_name = results[0] # find visitor role as reader on system uor = Table('user_object_role', metadata, autoload=True) visitor_system_condition = and_(uor.c.context == u'System', uor.c.user_id == visitor_id) s = select([uor.c.context, uor.c.user_id, uor.c.role], visitor_system_condition) results = migrate_engine.execute(s).fetchall() if len(results) != 1: log.warn('Could not find a Right for a Visitor on the System') return context, user_id, role = results[0] if role != 'reader': log.info('Visitor right for the System is not "reader", so not upgrading it to anon_editor.') return # change visitor role to anon_editor log.info('Visitor is a "reader" on the System, so upgrading it to "anon_editor".') sql = uor.update().where(visitor_system_condition).\ values(role=u'anon_editor') migrate_engine.execute(sql)
def _get_nonansi_join_whereclause(self, froms): clauses = [] def visit_join(join): if join.isouter: def visit_binary(binary): if binary.operator == sql_operators.eq: if binary.left.table is join.right: binary.left = _OuterJoinColumn(binary.left) elif binary.right.table is join.right: binary.right = _OuterJoinColumn(binary.right) clauses.append(visitors.cloned_traverse(join.onclause, {}, {'binary': visit_binary})) else: clauses.append(join.onclause) for j in join.left, join.right: if isinstance(j, expression.Join): visit_join(j) for f in froms: if isinstance(f, expression.Join): visit_join(f) if not clauses: return None else: return sql.and_(*clauses)
def update_repository_table(self, startver, endver): """Update version_table with new information""" update = self.table.update(and_(self.table.c.version == int(startver), self.table.c.repository_id == str(self.repository.id))) self.engine.execute(update, version=int(endver))
def find_synonym(name, what): async with engine.acquire() as conn: synonym = model.Synonym.__table__ res=await conn.execute(select([synonym.c.our_name]).where(and_(func.lower(synonym.c.other_name) == name.lower(), synonym.c.category == what))) s = await res.fetchone() if s: return s[0]
def get_conversion_id(source_id, user_id, format): async with engine.acquire() as conn: conversion = model.Conversion.__table__ format_id = await get_format_id(format) res = await conn.execute(select([conversion.c.id]).where(and_(conversion.c.source_id==source_id, conversion.c.created_by_id == user_id, conversion.c.format_id == format_id))) res= await res.fetchone() if res: return res[0]
def get_existing_conversion(ebook_id, user_id, to_format): format_id = await get_format_id(to_format) async with engine.acquire() as conn: source = model.Source.__table__ conversion = model.Conversion.__table__ res = await conn.execute(select([conversion.c.id]).select_from(conversion.join(source))\ .where(and_(source.c.ebook_id == ebook_id, conversion.c.created_by_id == user_id, conversion.c.format_id == format_id))\ .order_by(nullslast(desc(source.c.quality)))) return await res.scalar()
def in_grid(cls, grid): # check if a point is within the boundaries of the grid return or_(and_(cls.lon_min > grid.lon_min, cls.lon_min < grid.lon_max, cls.lat_min > grid.lat_min, cls.lat_min < grid.lat_max), and_(cls.lon_min > grid.lon_min, cls.lon_min < grid.lon_max, cls.lat_max > grid.lat_min, cls.lat_max < grid.lat_max), and_(cls.lon_max > grid.lon_min, cls.lon_max < grid.lon_max, cls.lat_min > grid.lat_min, cls.lat_min < grid.lat_max), and_(cls.lon_max > grid.lon_min, cls.lon_max < grid.lon_max, cls.lat_max > grid.lat_min, cls.lat_max < grid.lat_max), and_(cls.lon_min < grid.lon_min, cls.lon_max > grid.lon_min, cls.lat_min < grid.lat_min, cls.lat_max > grid.lat_min), and_(cls.lon_min < grid.lon_min, cls.lon_max > grid.lon_min, cls.lat_min < grid.lat_max, cls.lat_max > grid.lat_max), and_(cls.lon_min < grid.lon_max, cls.lon_max > grid.lon_max, cls.lat_min < grid.lat_min, cls.lat_max > grid.lat_min), and_(cls.lon_min < grid.lon_max, cls.lon_max > grid.lon_max, cls.lat_min < grid.lat_max, cls.lat_max > grid.lat_max))
def point_inside(cls, point): return and_(cls.lat_min <= point.lat, cls.lat_max >= point.lat, cls.lon_min <= point.lon, cls.lon_max >= point.lon)
def old_maps(self): """ Returns 0 for false and an integer count of old shakemaps for true """ stmt = (select([ShakeMap.__table__.c.shakecast_id]) .where(and_(ShakeMap.__table__.c.shakemap_id == self.shakemap_id, ShakeMap.__table__.c.shakemap_version < self.shakemap_version))) result = engine.execute(stmt) old_shakemaps = [row for row in result] return len(old_shakemaps)
def is_new(self): stmt = (select([ShakeMap.__table__.c.shakecast_id]) .where(and_(ShakeMap.__table__.c.shakemap_id == self.shakemap_id, ShakeMap.__table__.c.shakemap_version == self.shakemap_version))) result = engine.execute(stmt) shakemaps = [row for row in result] if shakemaps: return False else: return True
def toolbar_icon_clicked(self, widget, movie): # # remove unused posters # session = self.db.Session() delete_posters = delete(posters_table) delete_posters = delete_posters.where(not_(exists([movies_table.c.movie_id], and_(posters_table.c.md5sum==movies_table.c.poster_md5)).correlate(posters_table))) log.debug(delete_posters) session.execute(delete_posters) session.commit() # # compressing sqlite databases # if self.app.config.get('type', 'sqlite', section='database') == 'sqlite': databasefilename = "%s.db" % os.path.join(self.app.locations['home'], self.app.config.get('name', section='database')) pagesize = gutils.get_filesystem_pagesize(databasefilename) # works since sqlite 3.5.8 # python 2.5 doesn't include 3.x but perhaps in future versions # another way is the installation of pysqlite2 with 2.5.6/2.6.0 or higher try: from pysqlite2 import dbapi2 as sqlite3 con = sqlite3.connect(databasefilename) try: con.isolation_level = None cur = con.cursor() cur.execute('PRAGMA page_size=' + str(pagesize)) cur.execute('VACUUM;') finally: con.close() except: log.error('fallback to default driver') self.app.db.engine.execute('PRAGMA page_size=' + str(pagesize)) self.app.db.engine.execute('VACUUM;') gutils.info(_("Finished"))
def has_table(self, connection, table_name, schema=None): if schema is None: schema=self.default_schema_name stmt = select([column('tablename')], from_obj=[text('dbc.tablesvx')]).where( and_(text('DatabaseName=:schema'), text('TableName=:table_name'))) res = connection.execute(stmt, schema=schema, table_name=table_name).fetchone() return res is not None
def get_columns(self, connection, table_name, schema=None, **kw): helpView=False if schema is None: schema = self.default_schema_name if int(self.server_version_info.split('.')[0])<16: dbc_columninfo='dbc.ColumnsV' #Check if the object us a view stmt = select([column('tablekind')],\ from_obj=[text('dbc.tablesV')]).where(\ and_(text('DatabaseName=:schema'),\ text('TableName=:table_name'),\ text("tablekind='V'"))) res = connection.execute(stmt, schema=schema, table_name=table_name).rowcount helpView = (res==1) else: dbc_columninfo='dbc.ColumnsQV' stmt = select([column('columnname'), column('columntype'),\ column('columnlength'), column('chartype'),\ column('decimaltotaldigits'), column('decimalfractionaldigits'),\ column('columnformat'),\ column('nullable'), column('defaultvalue'), column('idcoltype')],\ from_obj=[text(dbc_columninfo)]).where(\ and_(text('DatabaseName=:schema'),\ text('TableName=:table_name'))) res = connection.execute(stmt, schema=schema, table_name=table_name).fetchall() #If this is a view in pre-16 version, get types for individual columns if helpView: res=[self._get_column_help(connection, schema,table_name,r['columnname']) for r in res] return [self._get_column_info(row) for row in res]