我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用mysql.connector()。
def create_connect_args(self, url): opts = url.translate_connect_args(username='user') opts.update(url.query) util.coerce_kw_type(opts, 'buffered', bool) util.coerce_kw_type(opts, 'raise_on_warnings', bool) # unfortunately, MySQL/connector python refuses to release a # cursor without reading fully, so non-buffered isn't an option opts.setdefault('buffered', True) # FOUND_ROWS must be set in ClientFlag to enable # supports_sane_rowcount. if self.dbapi is not None: try: from mysql.connector.constants import ClientFlag client_flags = opts.get( 'client_flags', ClientFlag.get_default()) client_flags |= ClientFlag.FOUND_ROWS opts['client_flags'] = client_flags except Exception: pass return [[], opts]
def _connect(): config = { 'user': 'root', 'password': 'mysql', 'host': '192.168.200.33', 'database': 'fastloan3', 'raise_on_warnings': True, } cnx = None try: cnx = mysql.connector.connect(**config) except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print("Something is wrong with your user name or password") elif err.errno == errorcode.ER_BAD_DB_ERROR: print("Database does not exist") else: print(err) if cnx: cnx.close() return cnx
def connect(self): if self.__mysql is None: try: cnx = mysql.connector.connect(user=self.config['user'], password=self.config['password'], host=self.config['host'], database=self.config['database']) self.__mysql = cnx except mysql.connector.Error as err: if err.errno == errorcode.ER_ACCESS_DENIED_ERROR: print "Something is wrong with your user name or password" return False elif err.errno == errorcode.ER_BAD_DB_ERROR: print "Database does not exist" return False else: print err return False return self.__mysql
def create_connect_args(self, url): opts = url.translate_connect_args(username='user') opts.update(url.query) util.coerce_kw_type(opts, 'buffered', bool) util.coerce_kw_type(opts, 'raise_on_warnings', bool) opts.setdefault('buffered', True) opts.setdefault('raise_on_warnings', True) # FOUND_ROWS must be set in ClientFlag to enable # supports_sane_rowcount. if self.dbapi is not None: try: from mysql.connector.constants import ClientFlag client_flags = opts.get( 'client_flags', ClientFlag.get_default()) client_flags |= ClientFlag.FOUND_ROWS opts['client_flags'] = client_flags except: pass return [[], opts]
def _execute_wrapper(self, method, query, args): """Wrapper around execute() and executemany()""" try: return method(query, args) except (mysql.connector.ProgrammingError) as err: six.reraise(utils.ProgrammingError, utils.ProgrammingError(err.msg), sys.exc_info()[2]) except (mysql.connector.IntegrityError) as err: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) except mysql.connector.OperationalError as err: # Map some error codes to IntegrityError, since they seem to be # misclassified and Django would prefer the more logical place. if err.args[0] in self.codes_for_integrityerror: six.reraise(utils.IntegrityError, utils.IntegrityError(err.msg), sys.exc_info()[2]) else: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2]) except mysql.connector.DatabaseError as err: six.reraise(utils.DatabaseError, utils.DatabaseError(err.msg), sys.exc_info()[2])
def create_connect_args(self, url): opts = url.translate_connect_args(username='user') opts.update(url.query) util.coerce_kw_type(opts, 'buffered', bool) util.coerce_kw_type(opts, 'raise_on_warnings', bool) opts.setdefault('buffered', True) opts.setdefault('raise_on_warnings', True) # FOUND_ROWS must be set in ClientFlag to enable # supports_sane_rowcount. if self.dbapi is not None: try: from mysql.connector.constants import ClientFlag client_flags = opts.get('client_flags', ClientFlag.get_default()) client_flags |= ClientFlag.FOUND_ROWS opts['client_flags'] = client_flags except: pass return [[], opts]
def main(): try: import mysql.connector except: import sys print "\n[IP2C: WARNING]" print 'ERROR: mysql.connector library not available. Exitting...' sys.exit(1) import os args = parse_arguments() try: csv_file = os.path.join(os.environ['tmp'], 'ip2c-results.csv') except: csv_file = os.path.join('/tmp', 'ip2c-results.csv') ##create_table("securityonion_db", "ip2c") print "\n[IP2C: Parse RIR DBs]" create_tmp_file(args.source_dir, csv_file) print "\n[IP2C: Update MySQL DB]" update_table("securityonion_db", "ip2c", csv_file) print "\n[IP2C: Update Validation]" read_table("securityonion_db", "ip2c")
def execute(self, sql=None, params=None, db=None): if sql is None: return None resultRow = [] with contextlib.\ closing(mysql.connector.connect(host=str(self.host), port=int(self.port), user=str(self.user), password=str(self.passwd), db=str(self.database) if not db else db)) as conn: conn.autocommit = True with contextlib.closing(conn.cursor(buffered=True)) as cursor: cursor.execute(sql, params) try: resultRow = cursor.fetchall() except errors.InterfaceError: # Raised on empty result - DML resultRow = [] return resultRow
def result_processor(self, dialect, coltype): """MySQL-connector already converts mysql bits, so.""" return None
def dbapi(cls): from mysql import connector return connector
def get_connection_params(self): # Django 1.6 kwargs = { 'charset': 'utf8', 'use_unicode': True, 'buffered': False, 'consume_results': True, } settings_dict = self.settings_dict if settings_dict['USER']: kwargs['user'] = settings_dict['USER'] if settings_dict['NAME']: kwargs['database'] = settings_dict['NAME'] if settings_dict['PASSWORD']: kwargs['passwd'] = settings_dict['PASSWORD'] if settings_dict['HOST'].startswith('/'): kwargs['unix_socket'] = settings_dict['HOST'] elif settings_dict['HOST']: kwargs['host'] = settings_dict['HOST'] if settings_dict['PORT']: kwargs['port'] = int(settings_dict['PORT']) # Raise exceptions for database warnings if DEBUG is on kwargs['raise_on_warnings'] = settings.DEBUG kwargs['client_flags'] = [ # Need potentially affected rows on UPDATE mysql.connector.constants.ClientFlag.FOUND_ROWS, ] try: kwargs.update(settings_dict['OPTIONS']) except KeyError: # OPTIONS missing is OK pass return kwargs
def get_new_connection(self, conn_params): # Django 1.6 if not self.use_pure: conn_params['converter_class'] = DjangoCMySQLConverter else: conn_params['converter_class'] = DjangoMySQLConverter cnx = mysql.connector.connect(**conn_params) return cnx
def mysql_version(self): config = self.get_connection_params() temp_conn = mysql.connector.connect(**config) server_version = temp_conn.get_server_version() temp_conn.close() return server_version
def create_connect_args(self, url): opts = url.translate_connect_args(username='user') opts.update(url.query) util.coerce_kw_type(opts, 'allow_local_infile', bool) util.coerce_kw_type(opts, 'autocommit', bool) util.coerce_kw_type(opts, 'buffered', bool) util.coerce_kw_type(opts, 'compress', bool) util.coerce_kw_type(opts, 'connection_timeout', int) util.coerce_kw_type(opts, 'connect_timeout', int) util.coerce_kw_type(opts, 'consume_results', bool) util.coerce_kw_type(opts, 'force_ipv6', bool) util.coerce_kw_type(opts, 'get_warnings', bool) util.coerce_kw_type(opts, 'pool_reset_session', bool) util.coerce_kw_type(opts, 'pool_size', int) util.coerce_kw_type(opts, 'raise_on_warnings', bool) util.coerce_kw_type(opts, 'raw', bool) util.coerce_kw_type(opts, 'ssl_verify_cert', bool) util.coerce_kw_type(opts, 'use_pure', bool) util.coerce_kw_type(opts, 'use_unicode', bool) # unfortunately, MySQL/connector python refuses to release a # cursor without reading fully, so non-buffered isn't an option opts.setdefault('buffered', True) # FOUND_ROWS must be set in ClientFlag to enable # supports_sane_rowcount. if self.dbapi is not None: try: from mysql.connector.constants import ClientFlag client_flags = opts.get( 'client_flags', ClientFlag.get_default()) client_flags |= ClientFlag.FOUND_ROWS opts['client_flags'] = client_flags except Exception: pass return [[], opts]