我们从Python开源项目中,提取了以下46个代码示例,用于说明如何使用utils.storage()。
def cookies(*requireds, **defaults): r"""Returns a `storage` object with all the request cookies in it. See `storify` for how `requireds` and `defaults` work. This is forgiving on bad HTTP_COOKIE input, it tries to parse at least the cookies it can. The values are converted to unicode if _unicode=True is passed. """ # If _unicode=True is specified, use decode_cookie to convert cookie value to unicode if defaults.get("_unicode") is True: defaults['_unicode'] = decode_cookie # parse cookie string and cache the result for next time. if '_parsed_cookies' not in ctx: http_cookie = ctx.env.get("HTTP_COOKIE", "") ctx._parsed_cookies = parse_cookies(http_cookie) try: return storify(ctx._parsed_cookies, *requireds, **defaults) except KeyError: badrequest() raise StopIteration
def background(func): """A function decorator to run a long-running function as a background thread.""" def internal(*a, **kw): web.data() # cache it tmpctx = web._context[threading.currentThread()] web._context[threading.currentThread()] = utils.storage(web.ctx.copy()) def newfunc(): web._context[threading.currentThread()] = tmpctx func(*a, **kw) myctx = web._context[threading.currentThread()] for k in myctx.keys(): if k not in ['status', 'headers', 'output']: try: del myctx[k] except KeyError: pass t = threading.Thread(target=newfunc) background.threaddb[id(t)] = t t.start() web.ctx.headers = [] return seeother(changequery(_t=id(t))) return internal
def query(self, sql_query, vars=None, processed=False, _test=False): """ Execute SQL query `sql_query` using dictionary `vars` to interpolate it. If `processed=True`, `vars` is a `reparam`-style list to use instead of interpolating. >>> db = DB(None, {}) >>> db.query("SELECT * FROM foo", _test=True) <sql: 'SELECT * FROM foo'> >>> db.query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> >>> db.query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> """ if vars is None: vars = {} if not processed and not isinstance(sql_query, SQLQuery): sql_query = reparam(sql_query, vars) if _test: return sql_query db_cursor = self._db_cursor() self._db_execute(db_cursor, sql_query) if db_cursor.description: names = [x[0] for x in db_cursor.description] def iterwrapper(): row = db_cursor.fetchone() while row: yield storage(dict(zip(names, row))) row = db_cursor.fetchone() out = iterbetter(iterwrapper()) out.__len__ = lambda: int(db_cursor.rowcount) out.list = lambda: [storage(dict(zip(names, x))) \ for x in db_cursor.fetchall()] else: out = db_cursor.rowcount if not self.ctx.transactions: self.ctx.commit() return out
def input(*requireds, **defaults): """ Returns a `storage` object with the GET and POST arguments. See `storify` for how `requireds` and `defaults` work. """ _method = defaults.pop('_method', 'both') out = rawinput(_method) try: defaults.setdefault('_unicode', True) # force unicode conversion by default. return storify(out, *requireds, **defaults) except KeyError: raise badrequest()
def cookies(*requireds, **defaults): """ Returns a `storage` object with all the cookies in it. See `storify` for how `requireds` and `defaults` work. """ cookie = Cookie.SimpleCookie() cookie.load(ctx.env.get('HTTP_COOKIE', '')) try: d = storify(cookie, *requireds, **defaults) for k, v in d.items(): d[k] = v and urllib.unquote(v) return d except KeyError: badrequest() raise StopIteration
def next(self): type, t, begin, end, line = self.tokens.next() row, col = end self.index = col return storage(type=type, value=t, begin=begin, end=end)
def __init__(self, app, store, initializer=None): self.store = store self._initializer = initializer self._last_cleanup_time = 0 self._config = utils.storage(web.config.session_parameters) self._data = utils.threadeddict() self.__getitem__ = self._data.__getitem__ self.__setitem__ = self._data.__setitem__ self.__delitem__ = self._data.__delitem__ if app: app.add_processor(self._processor)
def __init__(self, root): # if the storage root doesn't exists, create it. if not os.path.exists(root): os.makedirs( os.path.abspath(root) ) self.root = root
def _get_d(self): #@@ should really be form.attr, no? return utils.storage([(i.name, i.get_value()) for i in self.inputs])
def query(sql_query, vars=None, processed=False, _test=False): """ Execute SQL query `sql_query` using dictionary `vars` to interpolate it. If `processed=True`, `vars` is a `reparam`-style list to use instead of interpolating. >>> query("SELECT * FROM foo", _test=True) <sql: 'SELECT * FROM foo'> >>> query("SELECT * FROM foo WHERE x = $x", vars=dict(x='f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> >>> query("SELECT * FROM foo WHERE x = " + sqlquote('f'), _test=True) <sql: "SELECT * FROM foo WHERE x = 'f'"> """ if vars is None: vars = {} if not processed and not isinstance(sql_query, SQLQuery): sql_query = reparam(sql_query, vars) if _test: return sql_query db_cursor = web.ctx.db_cursor() web.ctx.db_execute(db_cursor, sql_query) if db_cursor.description: names = [x[0] for x in db_cursor.description] def iterwrapper(): row = db_cursor.fetchone() while row: yield storage(dict(zip(names, row))) row = db_cursor.fetchone() out = iterbetter(iterwrapper()) if web.ctx.db_name != "sqlite": out.__len__ = lambda: int(db_cursor.rowcount) out.list = lambda: [storage(dict(zip(names, x))) \ for x in db_cursor.fetchall()] else: out = db_cursor.rowcount if not web.ctx.db_transaction: web.ctx.db.commit() return out
def input(*requireds, **defaults): """ Returns a `storage` object with the GET and POST arguments. See `storify` for how `requireds` and `defaults` work. """ from cStringIO import StringIO def dictify(fs): return dict([(k, fs[k]) for k in fs.keys()]) _method = defaults.pop('_method', 'both') e = ctx.env.copy() a = b = {} if _method.lower() in ['both', 'post']: if e['REQUEST_METHOD'] == 'POST': a = cgi.FieldStorage(fp = StringIO(data()), environ=e, keep_blank_values=1) a = dictify(a) if _method.lower() in ['both', 'get']: e['REQUEST_METHOD'] = 'GET' b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1)) out = dictadd(b, a) try: return storify(out, *requireds, **defaults) except KeyError: badrequest() raise StopIteration
def cookies(*requireds, **defaults): """ Returns a `storage` object with all the cookies in it. See `storify` for how `requireds` and `defaults` work. """ cookie = Cookie.SimpleCookie() cookie.load(ctx.env.get('HTTP_COOKIE', '')) try: return storify(cookie, *requireds, **defaults) except KeyError: badrequest() raise StopIteration
def load(): """ Loads a new context for the thread. You can ask for a function to be run at loadtime by adding it to the dictionary `loadhooks`. """ _context[threading.currentThread()] = storage() ctx.status = '200 OK' ctx.headers = [] if config.get('db_parameters'): import db db.connect(**config.db_parameters) for x in loadhooks.values(): x()
def _get_d(self): #@@ should really be form.attr, no? return utils.storage([(i.name, i.value) for i in self.inputs])
def __init__(self, root): # if the storage root doesn't exists, create it. if not os.path.exists(root): os.mkdir(root) self.root = root
def rawinput(method=None): """Returns storage object with GET or POST arguments. """ method = method or "both" from cStringIO import StringIO def dictify(fs): # hack to make web.input work with enctype='text/plain. if fs.list is None: fs.list = [] return dict([(k, fs[k]) for k in fs.keys()]) e = ctx.env.copy() a = b = {} if method.lower() in ['both', 'post', 'put']: if e['REQUEST_METHOD'] in ['POST', 'PUT']: if e.get('CONTENT_TYPE', '').lower().startswith('multipart/'): # since wsgi.input is directly passed to cgi.FieldStorage, # it can not be called multiple times. Saving the FieldStorage # object in ctx to allow calling web.input multiple times. a = ctx.get('_fieldstorage') if not a: fp = e['wsgi.input'] a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1) ctx._fieldstorage = a else: fp = StringIO(data()) a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1) a = dictify(a) if method.lower() in ['both', 'get']: e['REQUEST_METHOD'] = 'GET' b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1)) def process_fieldstorage(fs): if isinstance(fs, list): return [process_fieldstorage(x) for x in fs] elif fs.filename is None: return fs.value else: return fs return storage([(k, process_fieldstorage(v)) for k, v in dictadd(b, a).items()])
def test(): import sys verbose = '-v' in sys.argv def assertEqual(a, b): if a == b: if verbose: sys.stderr.write('.') sys.stderr.flush() else: assert a == b, "\nexpected: %s\ngot: %s" % (repr(b), repr(a)) from utils import storage, group t = Template tests = [ lambda: t('1')(), '1\n', lambda: t('$def with ()\n1')(), '1\n', lambda: t('$def with (a)\n$a')(1), '1\n', lambda: t('$def with (a=0)\n$a')(1), '1\n', lambda: t('$def with (a=0)\n$a')(a=1), '1\n', lambda: t('$if 1: 1')(), '1\n', lambda: t('$if 1:\n 1')(), '1\n', lambda: t('$if 0: 0\n$elif 1: 1')(), '1\n', lambda: t('$if 0: 0\n$elif None: 0\n$else: 1')(), '1\n', lambda: t('$if (0 < 1) and (1 < 2): 1')(), '1\n', lambda: t('$for x in [1, 2, 3]: $x')(), '1\n2\n3\n', lambda: t('$for x in []: 0\n$else: 1')(), '1\n', lambda: t('$def with (a)\n$while a and a.pop(): 1')([1, 2, 3]), '1\n1\n1\n', lambda: t('$while 0: 0\n$else: 1')(), '1\n', lambda: t('$ a = 1\n$a')(), '1\n', lambda: t('$# 0')(), '', lambda: t('$def with (d)\n$for k, v in d.iteritems(): $k')({1: 1}), '1\n', lambda: t('$def with (a)\n$(a)')(1), '1\n', lambda: t('$def with (a)\n$a')(1), '1\n', lambda: t('$def with (a)\n$a.b')(storage(b=1)), '1\n', lambda: t('$def with (a)\n$a[0]')([1]), '1\n', lambda: t('${0 or 1}')(), '1\n', lambda: t('$ a = [1]\n$a[0]')(), '1\n', lambda: t('$ a = {1: 1}\n$a.keys()[0]')(), '1\n', lambda: t('$ a = []\n$if not a: 1')(), '1\n', lambda: t('$ a = {}\n$if not a: 1')(), '1\n', lambda: t('$ a = -1\n$a')(), '-1\n', lambda: t('$ a = "1"\n$a')(), '1\n', lambda: t('$if 1 is 1: 1')(), '1\n', lambda: t('$if not 0: 1')(), '1\n', lambda: t('$if 1:\n $if 1: 1')(), '1\n', lambda: t('$ a = 1\n$a')(), '1\n', lambda: t('$ a = 1.\n$a')(), '1.0\n', lambda: t('$({1: 1}.keys()[0])')(), '1\n', lambda: t('$for x in [1, 2, 3]:\n\t$x') (), ' 1\n 2\n 3\n', lambda: t('$def with (a)\n$:a')(1), '1\n', ] for func, value in group(tests, 2): assertEqual(func(), value) j = Template("$var foo: bar")() assertEqual(str(j), '') assertEqual(j.foo, 'bar\n') if verbose: sys.stderr.write('\n')
def input(*requireds, **defaults): """ Returns a `storage` object with the GET and POST arguments. See `storify` for how `requireds` and `defaults` work. """ from cStringIO import StringIO def dictify(fs): # hack to make web.input work with enctype='text/plain. if fs.list is None: fs.list = [] return dict([(k, fs[k]) for k in fs.keys()]) _method = defaults.pop('_method', 'both') e = ctx.env.copy() a = b = {} if _method.lower() in ['both', 'post', 'put']: if e['REQUEST_METHOD'] in ['POST', 'PUT']: if e.get('CONTENT_TYPE', '').lower().startswith('multipart/'): # since wsgi.input is directly passed to cgi.FieldStorage, # it can not be called multiple times. Saving the FieldStorage # object in ctx to allow calling web.input multiple times. a = ctx.get('_fieldstorage') if not a: fp = e['wsgi.input'] a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1) ctx._fieldstorage = a else: fp = StringIO(data()) a = cgi.FieldStorage(fp=fp, environ=e, keep_blank_values=1) a = dictify(a) if _method.lower() in ['both', 'get']: e['REQUEST_METHOD'] = 'GET' b = dictify(cgi.FieldStorage(environ=e, keep_blank_values=1)) out = dictadd(b, a) try: defaults.setdefault('_unicode', True) # force unicode conversion by default. return storify(out, *requireds, **defaults) except KeyError: raise badrequest()