我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用utils.iterbetter()。
def query(self, sql_query, vars=None, processed=False, _test=False): """ Execute SQL query `sql_query` using dictionary `vars` to interpolate it. If `processed=True`, `vars` is a `reparam`-style list to use instead of interpolating. >>> db = DB(None, {}) >>> db.query("SELECT * FROM foo", _test=True) <sql: 'SELECT * FROM foo'> >>> db.query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> >>> db.query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> """ if vars is None: vars = {} if not processed and not isinstance(sql_query, SQLQuery): sql_query = reparam(sql_query, vars) if _test: return sql_query db_cursor = self._db_cursor() self._db_execute(db_cursor, sql_query) if db_cursor.description: names = [x[0] for x in db_cursor.description] def iterwrapper(): row = db_cursor.fetchone() while row: yield storage(dict(zip(names, row))) row = db_cursor.fetchone() out = iterbetter(iterwrapper()) out.__len__ = lambda: int(db_cursor.rowcount) out.list = lambda: [storage(dict(zip(names, x))) \ for x in db_cursor.fetchall()] else: out = db_cursor.rowcount if not self.ctx.transactions: self.ctx.commit() return out
def query(self, *a, **kw): out = DB.query(self, *a, **kw) if isinstance(out, iterbetter): del out.__len__ return out
def query(sql_query, vars=None, processed=False, _test=False): """ Execute SQL query `sql_query` using dictionary `vars` to interpolate it. If `processed=True`, `vars` is a `reparam`-style list to use instead of interpolating. >>> query("SELECT * FROM foo", _test=True) <sql: 'SELECT * FROM foo'> >>> query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> >>> query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> """ if vars is None: vars = {} if not processed and not isinstance(sql_query, SQLQuery): sql_query = reparam(sql_query, vars) if _test: return sql_query db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, sql_query) if db_cursor.description: names = [x[0] for x in db_cursor.description] def iterwrapper(): row = db_cursor.fetchone() while row: yield storage(dict(zip(names, row))) row = db_cursor.fetchone() out = iterbetter(iterwrapper()) if web.ctx.db_name != "sqlite": out.__len__ = lambda: int(db_cursor.rowcount) out.list = lambda: [storage(dict(zip(names, x))) \ for x in db_cursor.fetchall()] else: out = db_cursor.rowcount if not web.ctx.db_transaction: web.ctx.db.commit() return out
def query(self, *a, **kw): out = DB.query(self, *a, **kw) if isinstance(out, iterbetter): # rowcount is not provided by sqlite del out.__len__ return out # as with PostgresDB, the database is assumed to be in UTF-8. # This doesn't mean we turn byte-strings coming out of it into # Unicode objects, but we avoid trying to put Unicode objects into # it.