我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用sqlalchemy.engine()。
def delete_obj(self, objects, uow): """called by a UnitOfWork object to delete objects, which involves a DELETE statement for each table used by this mapper, for each object in the list.""" for table in self.tables: if not self._has_pks(table): continue delete = [] for obj in objects: params = {} if not hasattr(obj, "_instance_key"): continue else: delete.append(params) for col in self.pks_by_table[table]: params[col.key] = self._getattrbycolumn(obj, col) self.extension.before_delete(self, obj) if len(delete): clause = sql.and_() for col in self.pks_by_table[table]: clause.clauses.append(col == sql.bindparam(col.key)) statement = table.delete(clause) c = statement.execute(*delete) if table.engine.supports_sane_rowcount() and c.rowcount != len(delete): raise "ConcurrencyError - updated rowcount %d does not match number of objects updated %d" % (c.cursor.rowcount, len(delete))
def __init__(self, engine, statement, parameters=None, typemap=None, **kwargs): """constructs a new ANSICompiler object. engine - SQLEngine to compile against statement - ClauseElement to be compiled parameters - optional dictionary indicating a set of bind parameters specified with this Compiled object. These parameters are the "default" key/value pairs when the Compiled is executed, and also may affect the actual compilation, as in the case of an INSERT where the actual columns inserted will correspond to the keys present in the parameters.""" sql.Compiled.__init__(self, engine, statement, parameters) self.binds = {} self.froms = {} self.wheres = {} self.strings = {} self.select_stack = [] self.typemap = typemap or {} self.isinsert = False
def retrieve_model_id_from_hash(db_engine, model_hash): """Retrieves a model id from the database that matches the given hash Args: db_engine (sqlalchemy.engine) A database engine model_hash (str) The model hash to lookup Returns: (int) The model id (if found in DB), None (if not) """ session = sessionmaker(bind=db_engine)() try: saved = session.query(Model)\ .filter_by(model_hash=model_hash)\ .one_or_none() return saved.model_id if saved else None finally: session.close()
def save_db_objects(db_engine, db_objects): """Saves a collection of SQLAlchemy model objects to the database using a COPY command Args: db_engine (sqlalchemy.engine) db_objects (list) SQLAlchemy model objects, corresponding to a valid table """ with tempfile.TemporaryFile(mode='w+') as f: writer = csv.writer(f, quoting=csv.QUOTE_MINIMAL) for db_object in db_objects: writer.writerow([ getattr(db_object, col.name) for col in db_object.__table__.columns ]) f.seek(0) postgres_copy.copy_from(f, type(db_objects[0]), db_engine, format='csv')
def build_filter(cls, engine, table, tree): try: operator, nodes = list(tree.items())[0] except Exception: raise indexer.QueryError() try: op = cls.multiple_operators[operator] except KeyError: try: op = cls.binary_operators[operator] except KeyError: try: op = cls.unary_operators[operator] except KeyError: raise indexer.QueryInvalidOperator(operator) return cls._handle_unary_op(engine, op, nodes) return cls._handle_binary_op(engine, table, op, nodes) return cls._handle_multiple_op(engine, table, op, nodes)
def test_innodb_tables(self): sa_migration.db_sync(engine=self.migrate_engine) total = self.migrate_engine.execute( "SELECT count(*) " "FROM information_schema.TABLES " "WHERE TABLE_SCHEMA = '%(database)s'" % {'database': self.migrate_engine.url.database}) self.assertGreater(total.scalar(), 0, "No tables found. Wrong schema?") noninnodb = self.migrate_engine.execute( "SELECT count(*) " "FROM information_schema.TABLES " "WHERE TABLE_SCHEMA='%(database)s' " "AND ENGINE != 'InnoDB' " "AND TABLE_NAME != 'migrate_version'" % {'database': self.migrate_engine.url.database}) count = noninnodb.scalar() self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
def _check_002(self, engine, data): for column in ['created_at', 'updated_at', 'id', 'instance_uuid', 'cell_id', 'project_id']: self.assertColumnExists(engine, 'instance_mappings', column) for index in ['instance_uuid_idx', 'project_id_idx']: self.assertIndexExists(engine, 'instance_mappings', index) self.assertUniqueConstraintExists(engine, 'instance_mappings', ['instance_uuid']) inspector = reflection.Inspector.from_engine(engine) # There should only be one foreign key here fk = inspector.get_foreign_keys('instance_mappings')[0] self.assertEqual('cell_mappings', fk['referred_table']) self.assertEqual(['id'], fk['referred_columns']) self.assertEqual(['cell_id'], fk['constrained_columns'])
def _check_006(self, engine, data): for column in ['id', 'request_spec_id', 'project_id', 'user_id', 'display_name', 'instance_metadata', 'progress', 'vm_state', 'image_ref', 'access_ip_v4', 'access_ip_v6', 'info_cache', 'security_groups', 'config_drive', 'key_name', 'locked_by']: self.assertColumnExists(engine, 'build_requests', column) self.assertIndexExists(engine, 'build_requests', 'build_requests_project_id_idx') self.assertUniqueConstraintExists(engine, 'build_requests', ['request_spec_id']) inspector = reflection.Inspector.from_engine(engine) # There should only be one foreign key here fk = inspector.get_foreign_keys('build_requests')[0] self.assertEqual('request_specs', fk['referred_table']) self.assertEqual(['id'], fk['referred_columns']) self.assertEqual(['request_spec_id'], fk['constrained_columns'])
def _check_275(self, engine, data): self.assertColumnExists(engine, 'key_pairs', 'type') self.assertColumnExists(engine, 'shadow_key_pairs', 'type') key_pairs = oslodbutils.get_table(engine, 'key_pairs') shadow_key_pairs = oslodbutils.get_table(engine, 'shadow_key_pairs') self.assertIsInstance(key_pairs.c.type.type, sqlalchemy.types.String) self.assertIsInstance(shadow_key_pairs.c.type.type, sqlalchemy.types.String) # Make sure the keypair entry will have the type 'ssh' key_pairs = oslodbutils.get_table(engine, 'key_pairs') keypair = key_pairs.select( key_pairs.c.name == 'test-migr').execute().first() self.assertEqual('ssh', keypair.type)
def _check_313(self, engine, data): self.assertColumnExists(engine, 'pci_devices', 'parent_addr') self.assertColumnExists(engine, 'shadow_pci_devices', 'parent_addr') pci_devices = oslodbutils.get_table(engine, 'pci_devices') shadow_pci_devices = oslodbutils.get_table( engine, 'shadow_pci_devices') self.assertIsInstance(pci_devices.c.parent_addr.type, sqlalchemy.types.String) self.assertTrue(pci_devices.c.parent_addr.nullable) self.assertIsInstance(shadow_pci_devices.c.parent_addr.type, sqlalchemy.types.String) self.assertTrue(shadow_pci_devices.c.parent_addr.nullable) self.assertIndexMembers(engine, 'pci_devices', 'ix_pci_devices_compute_node_id_parent_addr_deleted', ['compute_node_id', 'parent_addr', 'deleted'])
def convert_bind_param(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime # this one doesnt seem to work with the "emulation" mode return psycopg.TimestampFromMx(value)
def convert_result_value(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime return value
def convert_bind_param(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime # this one doesnt seem to work with the "emulation" mode return psycopg.DateFromMx(value)
def convert_bind_param(self, value, engine): # TODO: perform appropriate postgres1 conversion between Python DateTime/MXDateTime # this one doesnt seem to work with the "emulation" mode return psycopg.TimeFromMx(value)
def engine(opts, **params): return PGSQLEngine(opts, **params)
def engine(opts, **params): return MySQLEngine(opts, **params)
def _cvt(self, value, engine, fmt): if value is None: return None parts = value.split('.') try: (value, microsecond) = value.split('.') microsecond = int(microsecond) except ValueError: (value, microsecond) = (value, 0) return time.strptime(value, fmt)[0:6] + (microsecond,)
def convert_result_value(self, value, engine): tup = self._cvt(value, engine, "%Y-%m-%d %H:%M:%S") return tup and datetime.datetime(*tup)
def convert_result_value(self, value, engine): tup = self._cvt(value, engine, "%Y-%m-%d") return tup and datetime.date(*tup[0:3])
def engine(opts, **params): return SQLiteSQLEngine(opts, **params)
def select_text(self, text, **params): t = sql.text(text, engine=self.primarytable.engine) return self.instances(t.execute(**params))
def engine(**params): return ANSISQLEngine(**params)
def get_params(self, **params): """returns a structure of bind parameters for this compiled object. This includes bind parameters that might be compiled in via the "values" argument of an Insert or Update statement object, and also the given **params. The keys inside of **params can be any key that matches the BindParameterClause objects compiled within this object. The output is dependent on the paramstyle of the DBAPI being used; if a named style, the return result will be a dictionary with keynames matching the compiled statement. If a positional style, the output will be a list corresponding to the bind positions in the compiled statement. for an executemany style of call, this method should be called for each element in the list of parameter groups that will ultimately be executed. """ if self.parameters is not None: bindparams = self.parameters.copy() else: bindparams = {} bindparams.update(params) if self.engine.positional: d = OrderedDict() for k in self.positiontup: b = self.binds[k] d[k] = b.typeprocess(b.value, self.engine) else: d = {} for b in self.binds.values(): d[b.key] = b.typeprocess(b.value, self.engine) for key, value in bindparams.iteritems(): try: b = self.binds[key] except KeyError: continue d[b.key] = b.typeprocess(value, self.engine) return d if self.engine.positional: return d.values() else: return d
def get_named_params(self, parameters): """given the results of the get_params method, returns the parameters in dictionary format. For a named paramstyle, this just returns the same dictionary. For a positional paramstyle, the given parameters are assumed to be in list format and are converted back to a dictionary. """ # return parameters if self.engine.positional: p = {} for i in range(0, len(self.positiontup)): p[self.positiontup[i]] = parameters[i] return p else: return parameters
def bindparam_string(self, name): return self.engine.bindtemplate % name
def get_column_default_string(self, column): if isinstance(column.default, schema.PassiveDefault): if isinstance(column.default.arg, str): return repr(column.default.arg) else: return str(column.default.arg.compile(self.engine)) else: return None
def __init__(self, connection=None, echo=False): """ :param str connection: SQLAlchemy :param bool echo: True or False for SQL output of SQLAlchemy engine """ log.setLevel(logging.INFO) handler = logging.FileHandler(os.path.join(PYUNIPROT_DIR, defaults.TABLE_PREFIX + 'database.log')) handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) log.addHandler(handler) try: self.connection = get_connection_string(connection) self.engine = sqlalchemy.create_engine(self.connection, echo=echo) self.inspector = reflection.Inspector.from_engine(self.engine) self.sessionmaker = sessionmaker( bind=self.engine, autoflush=False, autocommit=False, expire_on_commit=True ) self.session = scoped_session(self.sessionmaker) except: log.warning('No valid database connection. Execute `pyuniprot connection` on command line')
def _create_tables(self, checkfirst=True): """creates all tables from models in your database :param checkfirst: True or False check if tables already exists :type checkfirst: bool :return: """ log.info('create tables in {}'.format(self.engine.url)) models.Base.metadata.create_all(self.engine, checkfirst=checkfirst)
def _drop_tables(self): """drops all tables in the database""" log.info('drop tables in {}'.format(self.engine.url)) self.session.commit() models.Base.metadata.drop_all(self.engine) self.session.commit()
def test_basic_query(self, engine, conn): rows = conn.execute('SELECT * FROM one_row').fetchall() self.assertEqual(len(rows), 1) self.assertEqual(rows[0].number_of_rows, 1) self.assertEqual(len(rows[0]), 1)
def test_reflect_no_such_table(self, engine, conn): self.assertRaises( NoSuchTableError, lambda: Table('this_does_not_exist', MetaData(bind=engine), autoload=True)) self.assertRaises( NoSuchTableError, lambda: Table('this_does_not_exist', MetaData(bind=engine), schema='also_does_not_exist', autoload=True))
def test_reflect_table(self, engine, connection): one_row = Table('one_row', MetaData(bind=engine), autoload=True) self.assertEqual(len(one_row.c), 1) self.assertIsNotNone(one_row.c.number_of_rows)
def test_reflect_table_with_schema(self, engine, connection): one_row = Table('one_row', MetaData(bind=engine), schema=SCHEMA, autoload=True) self.assertEqual(len(one_row.c), 1) self.assertIsNotNone(one_row.c.number_of_rows)
def test_reflect_table_include_columns(self, engine, connection): one_row_complex = Table('one_row_complex', MetaData(bind=engine)) engine.dialect.reflecttable( connection, one_row_complex, include_columns=['col_int'], exclude_columns=[]) self.assertEqual(len(one_row_complex.c), 1) self.assertIsNotNone(one_row_complex.c.col_int) self.assertRaises(AttributeError, lambda: one_row_complex.c.col_tinyint)
def test_reflect_schemas(self, engine, connection): insp = sqlalchemy.inspect(engine) schemas = insp.get_schema_names() self.assertIn(SCHEMA, schemas) self.assertIn('default', schemas)
def test_get_table_names(self, engine, connection): meta = MetaData() meta.reflect(bind=engine) print(meta.tables) self.assertIn('one_row', meta.tables) self.assertIn('one_row_complex', meta.tables) insp = sqlalchemy.inspect(engine) self.assertIn( 'many_rows', insp.get_table_names(schema=SCHEMA), )
def test_has_table(self, engine, connection): self.assertTrue(Table('one_row', MetaData(bind=engine)).exists()) self.assertFalse(Table('this_table_does_not_exist', MetaData(bind=engine)).exists())
def test_char_length(self, engine, connection): one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True) result = sqlalchemy.select([ sqlalchemy.func.char_length(one_row_complex.c.col_string) ]).execute().scalar() self.assertEqual(result, len('a string'))
def test_reflect_select(self, engine, connection): one_row_complex = Table('one_row_complex', MetaData(bind=engine), autoload=True) self.assertEqual(len(one_row_complex.c), 15) self.assertIsInstance(one_row_complex.c.col_string, Column) rows = one_row_complex.select().execute().fetchall() self.assertEqual(len(rows), 1) self.assertEqual(list(rows[0]), [ True, 127, 32767, 2147483647, 9223372036854775807, 0.5, 0.25, 'a string', datetime(2017, 1, 1, 0, 0, 0), date(2017, 1, 2), b'123', '[1, 2]', '{1=2, 3=4}', '{a=1, b=2}', Decimal('0.1'), ]) self.assertIsInstance(one_row_complex.c.col_boolean.type, BOOLEAN) self.assertIsInstance(one_row_complex.c.col_tinyint.type, INTEGER) self.assertIsInstance(one_row_complex.c.col_smallint.type, INTEGER) self.assertIsInstance(one_row_complex.c.col_int.type, INTEGER) self.assertIsInstance(one_row_complex.c.col_bigint.type, BIGINT) self.assertIsInstance(one_row_complex.c.col_float.type, FLOAT) self.assertIsInstance(one_row_complex.c.col_double.type, FLOAT) self.assertIsInstance(one_row_complex.c.col_string.type, type(STRINGTYPE)) self.assertIsInstance(one_row_complex.c.col_timestamp.type, TIMESTAMP) self.assertIsInstance(one_row_complex.c.col_date.type, DATE) self.assertIsInstance(one_row_complex.c.col_binary.type, BINARY) self.assertIsInstance(one_row_complex.c.col_array.type, type(STRINGTYPE)) self.assertIsInstance(one_row_complex.c.col_map.type, type(STRINGTYPE)) self.assertIsInstance(one_row_complex.c.col_struct.type, type(STRINGTYPE)) self.assertIsInstance(one_row_complex.c.col_decimal.type, DECIMAL)
def patch(): if getattr(sqlalchemy.engine, '__datadog_patch', False): return setattr(sqlalchemy.engine, '__datadog_patch', True) # patch the engine creation function _w('sqlalchemy', 'create_engine', _wrap_create_engine) _w('sqlalchemy.engine', 'create_engine', _wrap_create_engine)
def unpatch(): # unpatch sqlalchemy if getattr(sqlalchemy.engine, '__datadog_patch', False): setattr(sqlalchemy.engine, '__datadog_patch', False) unwrap(sqlalchemy, 'create_engine') unwrap(sqlalchemy.engine, 'create_engine')
def init(settings, webapp=False, db_info=None): global db_url, db_engine, session_maker if db_info: db_url = db_info.url else: db_url = _dbUrl(web.CONFIG) settings["sqlalchemy.url"] = db_url web.CONFIG.set("mishmash", "sqlalchemy.url", settings["sqlalchemy.url"]) settings["sqlalchemy.convert_unicode"] = \ web.CONFIG.get("mishmash", "sqlalchemy.convert_unicode") settings["sqlalchemy.encoding"] = web.CONFIG.get("mishmash", "sqlalchemy.encoding") if not db_info: config = Namespace() config.db_url = db_url config.various_artists_name = web.CONFIG.get("mishmash", "various_artists_name") db_info = dbinit(config.db_url) db_engine = db_info.engine session_maker = db_info.SessionMaker initAlembic(db_url)