我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.util.defaultdict()。
def get_foreign_keys(self, connection, table_name, schema=None, **kw): # Query to extract the details of each UK/FK of the given table fkqry = """ SELECT rc.rdb$constraint_name AS cname, cse.rdb$field_name AS fname, ix2.rdb$relation_name AS targetrname, se.rdb$field_name AS targetfname FROM rdb$relation_constraints rc JOIN rdb$indices ix1 ON ix1.rdb$index_name=rc.rdb$index_name JOIN rdb$indices ix2 ON ix2.rdb$index_name=ix1.rdb$foreign_key JOIN rdb$index_segments cse ON cse.rdb$index_name=ix1.rdb$index_name JOIN rdb$index_segments se ON se.rdb$index_name=ix2.rdb$index_name AND se.rdb$field_position=cse.rdb$field_position WHERE rc.rdb$constraint_type=? AND rc.rdb$relation_name=? ORDER BY se.rdb$index_name, se.rdb$field_position """ tablename = self.denormalize_name(table_name) c = connection.execute(fkqry, ["FOREIGN KEY", tablename]) fks = util.defaultdict(lambda: { 'name': None, 'constrained_columns': [], 'referred_schema': None, 'referred_table': None, 'referred_columns': [] }) for row in c: cname = self.normalize_name(row['cname']) fk = fks[cname] if not fk['name']: fk['name'] = cname fk['referred_table'] = self.normalize_name(row['targetrname']) fk['constrained_columns'].append( self.normalize_name(row['fname'])) fk['referred_columns'].append( self.normalize_name(row['targetfname'])) return list(fks.values())
def get_indexes(self, connection, table_name, schema=None, **kw): qry = """ SELECT ix.rdb$index_name AS index_name, ix.rdb$unique_flag AS unique_flag, ic.rdb$field_name AS field_name FROM rdb$indices ix JOIN rdb$index_segments ic ON ix.rdb$index_name=ic.rdb$index_name LEFT OUTER JOIN rdb$relation_constraints ON rdb$relation_constraints.rdb$index_name = ic.rdb$index_name WHERE ix.rdb$relation_name=? AND ix.rdb$foreign_key IS NULL AND rdb$relation_constraints.rdb$constraint_type IS NULL ORDER BY index_name, ic.rdb$field_position """ c = connection.execute(qry, [self.denormalize_name(table_name)]) indexes = util.defaultdict(dict) for row in c: indexrec = indexes[row['index_name']] if 'name' not in indexrec: indexrec['name'] = self.normalize_name(row['index_name']) indexrec['column_names'] = [] indexrec['unique'] = bool(row['unique_flag']) indexrec['column_names'].append( self.normalize_name(row['field_name'])) return list(indexes.values())
def get_foreign_keys(self, connection, table_name, schema=None, **kw): # Query to extract the details of each UK/FK of the given table fkqry = """ SELECT rc.rdb$constraint_name AS cname, cse.rdb$field_name AS fname, ix2.rdb$relation_name AS targetrname, se.rdb$field_name AS targetfname FROM rdb$relation_constraints rc JOIN rdb$indices ix1 ON ix1.rdb$index_name=rc.rdb$index_name JOIN rdb$indices ix2 ON ix2.rdb$index_name=ix1.rdb$foreign_key JOIN rdb$index_segments cse ON cse.rdb$index_name=ix1.rdb$index_name JOIN rdb$index_segments se ON se.rdb$index_name=ix2.rdb$index_name AND se.rdb$field_position=cse.rdb$field_position WHERE rc.rdb$constraint_type=? AND rc.rdb$relation_name=? ORDER BY se.rdb$index_name, se.rdb$field_position """ tablename = self.denormalize_name(table_name) c = connection.execute(fkqry, ["FOREIGN KEY", tablename]) fks = util.defaultdict(lambda: { 'name': None, 'constrained_columns': [], 'referred_schema': None, 'referred_table': None, 'referred_columns': [] }) for row in c: cname = self.normalize_name(row['cname']) fk = fks[cname] if not fk['name']: fk['name'] = cname fk['referred_table'] = self.normalize_name(row['targetrname']) fk['constrained_columns'].append( self.normalize_name(row['fname'])) fk['referred_columns'].append( self.normalize_name(row['targetfname'])) return fks.values()
def get_indexes(self, connection, table_name, schema=None, **kw): qry = """ SELECT ix.rdb$index_name AS index_name, ix.rdb$unique_flag AS unique_flag, ic.rdb$field_name AS field_name FROM rdb$indices ix JOIN rdb$index_segments ic ON ix.rdb$index_name=ic.rdb$index_name LEFT OUTER JOIN rdb$relation_constraints ON rdb$relation_constraints.rdb$index_name = ic.rdb$index_name WHERE ix.rdb$relation_name=? AND ix.rdb$foreign_key IS NULL AND rdb$relation_constraints.rdb$constraint_type IS NULL ORDER BY index_name, field_name """ c = connection.execute(qry, [self.denormalize_name(table_name)]) indexes = util.defaultdict(dict) for row in c: indexrec = indexes[row['index_name']] if 'name' not in indexrec: indexrec['name'] = self.normalize_name(row['index_name']) indexrec['column_names'] = [] indexrec['unique'] = bool(row['unique_flag']) indexrec['column_names'].append( self.normalize_name(row['field_name'])) return indexes.values()