我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用pymysql.Error()。
def Execute(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def ExecuteScalar(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) r = self.v_cur.fetchone() if r != None: s = r[0] else: s = None return s except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None): try: v_columnames = [] if p_fields is None: v_fields = [] for c in p_block.Columns: v_columnames.append(c) v_fields.append(DataField(c)) else: v_fields = p_fields for p in v_fields: v_columnames.append(p.v_name) v_insert = 'begin; ' for r in p_block.Rows: v_insert = v_insert + 'insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + self.Mogrify(r, v_fields) + '; ' v_insert = v_insert + 'commit;' self.Execute(v_insert) except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def ExecuteScalar(self, p_sql): try: if self.v_con is None: raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.') else: self.v_cur.execute(p_sql) r = self.v_cur.fetchone() if r != None: s = r[0] else: s = None return s except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def GetFields(self, p_sql): try: if self.v_con is None: raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.') else: v_fields = [] self.v_cur.execute('select * from ( ' + p_sql + ' ) t limit 1') r = self.v_cur.fetchone() if r != None: k = 0 for c in self.v_cur.description: v_fields.append(DataField(c[0], p_type=type(r[k]), p_dbtype=type(r[k]))) k = k + 1 else: k = 0 for c in self.v_cur.description: v_fields.append(DataField(c[0], p_type=type(None), p_dbtype=type(None))) k = k + 1 return v_fields except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Open(self, p_autocommit=True): try: self.v_con = psycopg2.connect( self.GetConnectionString(), cursor_factory=psycopg2.extras.DictCursor ) self.v_con.autocommit = p_autocommit self.v_cur = self.v_con.cursor() self.v_start = True # PostgreSQL types self.v_cur.execute('select oid, typname from pg_type') self.v_types = dict([(r['oid'], r['typname']) for r in self.v_cur.fetchall()]) if not p_autocommit: self.v_con.commit() self.v_con.notices = DataList() except Spartacus.Database.Exception as exc: raise exc except psycopg2.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) except Spartacus.Database.Exception as exc: raise exc except psycopg2.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def ExecuteScalar(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) r = self.v_cur.fetchone() if r != None: s = r[0] else: s = None return s except Spartacus.Database.Exception as exc: raise exc except psycopg2.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None): try: v_columnames = [] if p_fields is None: v_fields = [] for c in p_block.Columns: v_columnames.append(c) v_fields.append(DataField(c)) else: v_fields = p_fields for p in v_fields: v_columnames.append(p.v_name) v_values = [] for r in p_block.Rows: v_values.append(self.Mogrify(r, v_fields)) self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '') except Spartacus.Database.Exception as exc: raise exc except psycopg2.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) except Spartacus.Database.Exception as exc: raise exc except pymysql.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None): try: v_columnames = [] if p_fields is None: v_fields = [] for c in p_block.Columns: v_columnames.append(c) v_fields.append(DataField(c)) else: v_fields = p_fields for p in v_fields: v_columnames.append(p.v_name) v_values = [] for r in p_block.Rows: v_values.append(self.Mogrify(r, v_fields)) self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '') except Spartacus.Database.Exception as exc: raise exc except pymysql.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def ExecuteScalar(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) r = self.v_cur.fetchone() if r != None: s = r[0] else: s = None return s except Spartacus.Database.Exception as exc: raise exc except pymysql.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def Execute(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) except Spartacus.Database.Exception as exc: raise exc except fdb.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None): try: v_columnames = [] if p_fields is None: v_fields = [] for c in p_block.Columns: v_columnames.append(c) v_fields.append(DataField(c)) else: v_fields = p_fields for p in v_fields: v_columnames.append(p.v_name) v_values = [] for r in p_block.Rows: v_values.append(self.Mogrify(r, v_fields)) self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '') except Spartacus.Database.Exception as exc: raise exc except fdb.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) except Spartacus.Database.Exception as exc: raise exc except cx_Oracle.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def ExecuteScalar(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) r = self.v_cur.fetchone() if r != None: s = r[0] else: s = None return s except Spartacus.Database.Exception as exc: raise exc except cx_Oracle.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None): try: v_columnames = [] if p_fields is None: v_fields = [] for c in p_block.Columns: v_columnames.append(c) v_fields.append(DataField(c)) else: v_fields = p_fields for p in v_fields: v_columnames.append(p.v_name) v_values = [] for r in p_block.Rows: v_values.append(self.Mogrify(r, v_fields)) self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '') except Spartacus.Database.Exception as exc: raise exc except cx_Oracle.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) except Spartacus.Database.Exception as exc: raise exc except pymssql.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None): try: v_columnames = [] if p_fields is None: v_fields = [] for c in p_block.Columns: v_columnames.append(c) v_fields.append(DataField(c)) else: v_fields = p_fields for p in v_fields: v_columnames.append(p.v_name) v_values = [] for r in p_block.Rows: v_values.append(self.Mogrify(r, v_fields)) self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '') except Spartacus.Database.Exception as exc: raise exc except pymssql.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Execute(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) except Spartacus.Database.Exception as exc: raise exc except ibm_db_dbi.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def ExecuteScalar(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) r = self.v_cur.fetchone() if r != None: s = r[0] else: s = None return s except Spartacus.Database.Exception as exc: raise exc except ibm_db_dbi.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def InsertBlock(self, p_block, p_tablename, p_fields=None): try: v_columnames = [] if p_fields is None: v_fields = [] for c in p_block.Columns: v_columnames.append(c) v_fields.append(DataField(c)) else: v_fields = p_fields for p in v_fields: v_columnames.append(p.v_name) v_values = [] for r in p_block.Rows: v_values.append(self.Mogrify(r, v_fields)) self.Execute('insert into ' + p_tablename + '(' + ','.join(v_columnames) + ') values ' + ','.join(v_values) + '') except Spartacus.Database.Exception as exc: raise exc except ibm_db_dbi.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def setUp(self): _skip_if_no_pymysql() import pymysql try: # Try Travis defaults. # No real user should allow root access with a blank password. self.conn = pymysql.connect(host='localhost', user='root', passwd='', db='pandas_nosetest') except: pass else: return try: self.conn = pymysql.connect(read_default_group='pandas') except pymysql.ProgrammingError: raise nose.SkipTest( "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ") except pymysql.Error: raise nose.SkipTest( "Cannot connect to database. " "Create a group of connection parameters under the heading " "[pandas] in your system's mysql default file, " "typically located at ~/.my.cnf or /etc/.my.cnf. ")
def __connect( self, host, port, user, password, charset = 'utf8' ): ''' ?????????MySQL?? :param host:???IP :param port:?? :param user:??? :param password:?? :param db:??? :param charset:??? :return: ''' try: self.__conn = pymysql.connect( host = host, port = port, user = user, password = password, charset = charset ) except pymysql.Error as e: print( 'MySQL?????%d?%s' % (e.args[ 0 ], e.args[ 1 ]) ) self.__cur = self.__conn.cursor( )
def insert(self, tablename=None, values=None, letCommit=True): assert (tablename is not None), "Error: no tablename given" assert (values is not None), "Error: no values given" if len(values) > 0: query = "insert into %s (" % tablename qvalues = [] for key in values: query += "%s, " % key qvalues.append(values.get(key)) query = query[:-2] query += ") values (" for v in qvalues: query += "'%s', " % v query = query[:-2] query += ")" self.select(query=query, letCommit=letCommit)
def getData(self, section=None, key=None, default=None, type=None): assert (section is not None), "Error: no section given" assert (key is not None), "Error: no key given" val = None if self.conf.has_section(section): try: val = self.conf.get(section, key) except cParser.NoOptionError: pass # return default if val is None and default is not None else val if type is None else Helper.safe_cast(val, type) if val is not None and len(val) > 0: if type is None: return val else: return Helper.safe_cast(val, type) elif default is not None: return default else: return None
def __init__(self, log=None, dbHost=None, dbUser=None, dbPassword=None, dbName=None, dbTableName=None, nameMapping=None, charset="utf8"): assert (dbHost is not None), "Error: database host ip is mandatory" assert (dbUser is not None), "Error: database user is mandatory" assert (dbPassword is not None), "Error: database password is mandatory" assert (dbName is not None), "Error: database name is mandatory" assert (dbTableName is not None), "Error: database table name is mandatory" self.log = log self.host = dbHost self.user = dbUser self.password = dbPassword self.dbName = dbName self.dbTableName = dbTableName self.nameMapping = nameMapping self.charset = charset self.mydb = None self.connect()
def __init__(self, environ): """ Constructor for object """ assert environ self._db = None try: if 'MYSQL_DB_HOST' in environ: self._db = mdb.connect(environ['MYSQL_DB_HOST'], environ['MYSQL_DB_USERNAME'], environ['MYSQL_DB_PASSWORD'], 'odrs', use_unicode=True, charset='utf8') else: self._db = mdb.connect('localhost', 'test', 'test', 'odrs', use_unicode=True, charset='utf8') self._db.autocommit(True) except mdb.Error as e: print("Error %d: %s" % (e.args[0], e.args[1])) assert self._db
def review_modify(self, review): """ Modifies a review """ try: cur = self._db.cursor() cur.execute("UPDATE reviews SET version = %s, " "distro = %s, locale = %s, " "summary = %s, description = %s, " "user_display = %s, reported = %s, " "user_hash = %s, date_deleted = %s " "WHERE review_id = %s;", (review.version, review.distro, review.locale, review.summary, review.description, review.user_display, review.reported, review.user_hash, review.date_deleted or 0, review.review_id,)) except mdb.Error as e: raise CursorError(cur, e) return True
def review_add(self, review, user_addr): """ Add a review to the database """ try: cur = self._db.cursor() cur.execute("INSERT INTO reviews (app_id, locale, summary, " "description, user_hash, user_display, version, " "distro, rating, user_addr) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);", (review.app_id, review.locale, review.summary, review.description, review.user_hash, review.user_display, review.version, review.distro, review.rating, user_addr,)) except mdb.Error as e: raise CursorError(cur, e)
def vote_add(self, review_id, val, user_hash): """ Votes on a specific review and add to the votes database """ try: cur = self._db.cursor() if val == -5: cur.execute("UPDATE reviews SET reported = reported + 1 " "WHERE review_id = %s;", (review_id,)) elif val == 1: cur.execute("UPDATE reviews SET karma_up = karma_up + 1 " "WHERE review_id = %s;", (review_id,)) elif val == -1: cur.execute("UPDATE reviews SET karma_down = karma_down + 1 " "WHERE review_id = %s;", (review_id,)) cur.execute("INSERT INTO votes (user_hash, review_id, val) " "VALUES (%s, %s, %s);", (user_hash, review_id, val,)) except mdb.Error as e: raise CursorError(cur, e)
def review_get_all(self): """ Gets all non-removed reviews from the server for all applications """ try: cur = self._db.cursor() cur.execute("SELECT review_id, date_created, app_id, locale, summary, " "description, version, distro, karma_up, karma_down, " "user_hash, user_display, rating, date_deleted, reported " "FROM reviews ORDER BY date_created DESC;") except mdb.Error as e: raise CursorError(cur, e) res = cur.fetchall() if not res: return [] reviews = [] for e in res: reviews.append(_create_review(e)) return reviews
def user_get_all(self): """ Get all the users on the system """ try: cur = self._db.cursor() cur.execute("SELECT user_id, date_created, " "user_hash, karma, is_banned " "FROM users ORDER BY user_id DESC;") except mdb.Error as e: raise CursorError(cur, e) res = cur.fetchall() if not res: return [] users = [] for e in res: users.append(_create_user(e)) return users
def get_users_by_karma(self, best=True): """ Returns interesting statistics for the webapp """ try: cur = self._db.cursor() if best: cur.execute("SELECT user_id, date_created, " "user_hash, karma, is_banned FROM users " "WHERE karma != 0 ORDER BY karma DESC LIMIT 10;") else: cur.execute("SELECT user_id, date_created, " "user_hash, karma, is_banned FROM users " "WHERE karma != 0 ORDER BY karma ASC LIMIT 10;") except mdb.Error as e: raise CursorError(cur, e) results = cur.fetchall() data = [] for res in results: data.append(_create_user(res)) return data
def user_update_karma(self, user_hash, val): """ Update the request time for a specific user ID """ # if not existing, create it user = self.user_get_by_hash(user_hash) if not user: self.user_add(user_hash) return # update the karma value try: cur = self._db.cursor() cur.execute("UPDATE users SET karma = karma + %s " "WHERE user_hash = %s;", (val, user_hash,)) except mdb.Error as e: raise CursorError(cur, e)
def get_stats_by_interval(self, size, interval, msg): """ Gets stats data """ cnt = [] now = datetime.date.today() # yes, there's probably a way to do this in one query cur = self._db.cursor() for i in range(size): start = now - datetime.timedelta((i * interval) + interval - 1) end = now - datetime.timedelta((i * interval) - 1) try: cur.execute("SELECT COUNT(*) FROM eventlog " "WHERE message = %s AND date_created BETWEEN %s " "AND %s", (msg, start, end,)) except mdb.Error as e: raise CursorError(cur, e) res = cur.fetchone() cnt.append(int(res[0])) return cnt
def get_analytics_by_interval(self, size, interval): """ Gets analytics data """ array = [] now = datetime.date.today() # yes, there's probably a way to do this in one query cur = self._db.cursor() for i in range(size): start = _get_datestr_from_dt(now - datetime.timedelta((i * interval) + interval - 1)) end = _get_datestr_from_dt(now - datetime.timedelta((i * interval) - 1)) try: cur.execute("SELECT fetch_cnt FROM analytics WHERE " "datestr BETWEEN %s " "AND %s", (start, end,)) except mdb.Error as e: raise CursorError(cur, e) res = cur.fetchall() # add all these up tmp = 0 for r in res: tmp = tmp + int(r[0]) array.append(tmp) return array
def mysql_query(host, user, password, port, charset, database, sql): try: connection = pymysql.connect(host=host, user=user, password=password, database=database, port=port, charset=charset, cursorclass=pymysql.cursors.DictCursor, connect_timeout=5) except pymysql.Error as e: # raise RuntimeError("Can't connect to MySQL server") print e.message or e.args sys.exit(1) try: with connection.cursor() as cursor: cursor.execute(sql) connection.commit() finally: connection.close() if cursor is not None: return cursor else: sys.exit(1)
def mdb_query(sql, host, port, user, passwd, db='', dictType = False): ''' ??mysql?????????? ''' conn = None rows = None results = None field_names = None # Connect to the cluster database try: conn = pymysql.connect(host=host, port=port, user=user, passwd=passwd, db=db, charset='utf8mb4') except pymysql.Error as e: print("Mysql Error %d: %s" % (e.args[0], e.args[1])) return None if dictType: try: with conn.cursor(pymysql.cursors.DictCursor) as cursor: rows = cursor.execute(sql) field_names = [i[0] for i in cursor.description] results = cursor.fetchall() except pymysql.Error as e: print("Mysql Error %d: %s" % (e.args[0], e.args[1])) finally: conn.close() else: try: with conn.cursor() as cursor: rows = cursor.execute(sql) field_names = [i[0] for i in cursor.description] _results = cursor.fetchall() results = list(map(list, _results)) except pymysql.Error as e: print("Mysql Error %d: %s" % (e.args[0], e.args[1])) finally: conn.close() return field_names, results
def Open(self, p_autocommit=True): try: self.v_con = sqlite3.connect(self.v_service, self.v_timeout) #self.v_con.row_factory = sqlite3.Row self.v_cur = self.v_con.cursor() if self.v_foreignkeys: self.v_cur.execute('PRAGMA foreign_keys = ON') self.v_start = True except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Query(self, p_sql, p_alltypesstr=False): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True self.v_cur.execute(p_sql) v_table = DataTable() if self.v_cur.description: for c in self.v_cur.description: v_table.Columns.append(c[0]) v_row = self.v_cur.fetchone() while v_row is not None: if p_alltypesstr: v_rowtmp = list(v_row) for j in range(0, len(v_table.Columns)): if v_rowtmp[j] != None: v_rowtmp[j] = str(v_rowtmp[j]) else: v_rowtmp[j] = '' v_row = tuple(v_rowtmp) v_table.Rows.append(OrderedDict(zip(v_table.Columns, v_row))) v_row = self.v_cur.fetchone() return v_table except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def Close(self): try: if self.v_con: self.v_con.commit() if self.v_cur: self.v_cur.close() self.v_cur = None self.v_con.close() self.v_con = None except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def GetFields(self, p_sql): try: v_keep = None if self.v_con is None: self.Open() v_keep = False else: v_keep = True v_fields = [] self.v_cur.execute('select * from ( ' + p_sql + ' ) t limit 1') r = self.v_cur.fetchone() if r != None: k = 0 for c in self.v_cur.description: v_fields.append(DataField(c[0], p_type=type(r[k]), p_dbtype=type(r[k]))) k = k + 1 else: k = 0 for c in self.v_cur.description: v_fields.append(DataField(c[0], p_type=type(None), p_dbtype=type(None))) k = k + 1 return v_fields except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc)) finally: if not v_keep: self.Close()
def Query(self, p_sql, p_alltypesstr=False): try: if self.v_con is None: raise Spartacus.Database.Exception('This method should be called in the middle of Open() and Close() calls.') else: self.v_cur.execute(p_sql) v_table = DataTable() if self.v_cur.description: for c in self.v_cur.description: v_table.Columns.append(c[0]) v_row = self.v_cur.fetchone() while v_row is not None: if p_alltypesstr: v_rowtmp = list(v_row) for j in range(0, len(v_table.Columns)): if v_rowtmp[j] != None: v_rowtmp[j] = str(v_rowtmp[j]) else: v_rowtmp[j] = '' v_row = tuple(v_rowtmp) v_table.Rows.append(OrderedDict(zip(v_table.Columns, v_row))) v_row = self.v_cur.fetchone() return v_table except Spartacus.Database.Exception as exc: raise exc except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))
def Cancel(self): try: if self.v_con: self.v_con.cancel() if self.v_cur: self.v_cur.close() self.v_cur = None self.v_con.close() self.v_con = None except sqlite3.Error as exc: raise Spartacus.Database.Exception(str(exc)) except Exception as exc: raise Spartacus.Database.Exception(str(exc))