我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用cx_Oracle.LOB。
def _fetchall(self): if any(x[1]==cx_Oracle.CLOB for x in self.cursor.description): return [tuple([(c.read() if type(c) == cx_Oracle.LOB else c) \ for c in r]) for r in self.cursor] else: return self.cursor.fetchall()
def lastrowid(self,table): sequence_name = table._sequence_name self.execute('SELECT %s.currval FROM dual;' % sequence_name) return long(self.cursor.fetchone()[0]) #def parse_value(self, value, field_type, blob_decode=True): # if blob_decode and isinstance(value, cx_Oracle.LOB): # try: # value = value.read() # except self.driver.ProgrammingError: # # After a subsequent fetch the LOB value is not valid anymore # pass # return BaseAdapter.parse_value(self, value, field_type, blob_decode)
def __cover_clob_to_str(self, datas): for i in range(len(datas)): temp_data = [] for data in datas[i]: if isinstance(data, cx_Oracle.LOB) or isinstance(data, datetime.datetime): data = str(data) temp_data.append(data) datas[i] = temp_data return datas
def lastrowid(self, table): sequence_name = table._sequence_name self.execute('SELECT %s.currval FROM dual;' % sequence_name) return long(self.cursor.fetchone()[0]) # def parse_value(self, value, field_type, blob_decode=True): # if blob_decode and isinstance(value, cx_Oracle.LOB): # try: # value = value.read() # except self.driver.ProgrammingError: # # After a subsequent fetch the LOB value is not valid anymore # pass # return BaseAdapter.parse_value(self, value, field_type, blob_decode)
def query(self, _sql): """Execute the SQL statement, get a dataset""" if self.connection is None: raise Exception("Error: dal.query() is called without a database connection. Query:\n" + _sql) print("Info: dal.query() at "+ str(self) + "/" + str(self.connection) + " running the following SQL:\n" + _sql) # py-postgres doesn't use the DB-API, as it doesn't work well- if self.db_type == DB_POSTGRESQL: _ps = self.connection.prepare(_sql) _res = _ps() if _ps.column_names is not None: self.field_names = _ps.column_names self.field_types = [] for _curr_type in _ps.column_types: self.field_types.append(python_type_to_sql_type(_curr_type)) else: cur = self.connection.cursor() cur.execute(_sql) self.field_names, self.field_types = parse_description(cur.description, self.db_type) _res = cur.fetchall() # Untuple. TODO: This might need to be optimised, perhaps by working with the same array. _results = [] for _row in _res: _results.append(list(_row)) if self.db_type == DB_ORACLE: import cx_Oracle if cx_Oracle.BLOB in self.field_types: # Loop results and read all blob data for _curr_row_idx, _curr_row in enumerate(_results): for _curr_col_idx, _curr_col in enumerate(_curr_row): if isinstance(_curr_col, cx_Oracle.LOB): _results[_curr_row_idx][_curr_col_idx] = _curr_col.read() return _results
def DBGetCursorData(self, sql, id_th): result = [] tryconn = 0 while tryconn<2: try: connection = self.Voracle_connect.acquire() cursor = connection.cursor() if id_th >= 0: self.WinThUpdate(id_th, self.TH_SQL_RUNNING) cursor.execute(sql) cursor.arraysize = 1500 if id_th >= 0: self.WinThUpdate(id_th, self.TH_ROW_FETCHING) for row in cursor: # fetch rows newrow=[] for i in range(0,len(row)): if type(row[i]) is cx_Oracle.LOB: newrow.append(row[i].read() ) # read lob locator else: newrow.append(row[i]) result.append(list(newrow)) cursor.close() self.Voracle_connect.release(connection) break except Exception as err: result=[] if id_th >= 0: self.WinThUpdate(id_th,self.TH_REQUEST_DB_CONN) self.lockdbconn.acquire() if self.DBpingconn(self.Voracle_connect) == 0: # global connection handle status broken if id_th >= 0: self.WinThUpdate(id_th,self.TH_DB_RECONNECTING) oconn = self.DBreconnect() if oconn < 0: #error on reconnecting tryconn = 2 # exit try result=-1 else: tryconn += 1 else: tryconn = 2 #exit try result=-1 self.lockdbconn.release() return(result)
def _fetchall(self): if any(x[1] == cx_Oracle.CLOB for x in self.cursor.description): return [tuple([(c.read() if type(c) == cx_Oracle.LOB else c) for c in r]) for r in self.cursor] else: return self.cursor.fetchall()