我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用models.User.findAll()。
def authenticate(*,email,passwd): if not email: raise APIValueError('email','Invalid email') if not passwd: raise APIValueError('passwd','Invalid passwd') users = await User.findAll('email=?',[email]) if len(users) == 0: raise APIValueError('email','Email not exist.') user = users[0] # check passwd sha1 = hashlib.sha1() sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd','passwd error') # authenticate ok set cookie r = web.Response() r.set_cookie(COOKIE_NAME,user2cookie(user,86400),max_age=86400,httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user,ensure_ascii=False).encode('utf-8') return r # ??
def index(*, page='1'): page_index = get_page_index(page) num = yield from Blog.findNumber('count(id)') page = Page(num, page_index) if num == 0: blogs = [] else: blogs = yield from Blog.findAll(orderBy='created_at desc', limit=(page.offset, page.limit)) # ????????????????????? # app.py?response_factory???handler.py?????????? return { '__template__': 'blogs.html', 'page': page, 'blogs': blogs } # day11?? # ????????
def get_blog(id, request): blog = yield from Blog.find(id) # ??id??????????? # ????????blog?????????????????????? comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc') # ?????????html?? for c in comments: c.html_content = text2html(c.content) # blog??markdown????????html?? blog.html_content = markdown2.markdown(blog.content) return { '__template__': 'blog.html', 'blog': blog, '__user__':request.__user__, 'comments': comments } # day10??? # ???????
def authenticate(*, email, passwd): if not email: raise APIValueError('email', 'Invalid email.') if not passwd: raise APIValueError('passwd', 'Invalid password.') users = yield from User.findAll('email=?', [email]) if len(users) == 0: raise APIValueError('email', 'Email not exist.') user = users[0] # check passwd: sha1 = hashlib.sha1() sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd', 'Invalid password.') # authenticate ok, set cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r
def index(request,*,page='1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') page = Page(num,page_index) if page == 0: blogs = [] else: blogs = await Blog.findAll(orderBy = 'created_at desc',limit=(page.offset,page.limit)) return{ '__template__' : 'blogs.html', 'page' : page, 'blogs' : blogs, '__user__' : request.__user__ }
def get_blog(id,request): blog = await Blog.find(id) comments = await Comment.findAll('blog_id=?',[id],orderBy='created_at desc') for c in comments: c.html_content = text2html(c.content) blog.html_content = markdown2.markdown(blog.content) return { '__template__' : 'blog.html', 'blog' : blog, 'comments' : comments, '__user__' : request.__user__ } # ????
def api_comments(*,page='1'): page_index = get_page_index(page) num = await Comment.findNumber('count(id)') p = Page(num,page_index) if num == 0: return dict(page=p,comments=()) comments = await Comment.findAll(orderBy='created_at desc',limit=(p.offset,p.limit)) return dict(page=p,comments=comments)
def api_get_users(*,page='1'): page_index = get_page_index(page) num = await User.findNumber('count(id)') p = Page(num,page_index) if num ==0 : return dict(page=p,users=()) users = await User.findAll(orderBy='created_at desc',limit=(p.offset,p.limit)) for u in users: u.passwd = '******' return dict(page=p,users=users)
def api_blogs(*,page='1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') p = Page(num,page_index) if num == 0: return dict(page=0,blogs={}) blogs = await Blog.findAll(orderBy='created_at desc',limit=(p.offset,p.limit)) return dict(page=p,blogs=blogs)
def authenticate(*, email, passwd): # ??????????? # ???????????? if not email: raise APIValueError('email', 'Invalid email.') if not passwd: raise APIValueError('passwd', 'Invalid password.') # ???????email???list????? users = yield from User.findAll('email=?', [email]) # ??list???0?????????????????????? if len(users) == 0: raise APIValueError('email', 'Email not exist.') user = users[0] # ??????.???,?????????,???????list # ????: # ????????????????,???????? # ????????????????????,???????????????,????????? # ???????????:sha1 = hashlib.sha1((user.id+":"+passwd).encode("utf-8")) # ???????????????(?api_register_user),?????? sha1 = hashlib.sha1() sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd', 'Invalid password.') # ???????????cookie: # ????????????? r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r # ??????
def api_comments(*, page='1'): page_index = get_page_index(page) num = yield from Comment.findNumber('count(id)') # num????? p = Page(num, page_index) # ??Page????????? if num == 0: return dict(page=p, comments=()) # ???????????????app.py?response?????? # ??????0,?????????? # limit??select??????????,?????????,????????????? comments = yield from Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, comments=comments) # day14?? # API?????
def find(): await orm.create_pool(loop, user='root', password='password', db='awesome') all = await User.findAll() print(all) pk = await User.find('00149276202953187d8d3176f894f1fa82d9caa7d36775a000') print(pk) num = await User.findNumber('email') print(num) await orm.destory_pool()
def index(*, page='1'): page_index = get_page_index(page) num = yield from Blog.findNumber('count(id)') page = Page(num) if num == 0: blogs = [] else: blogs = yield from Blog.findAll(orderBy='created_at desc', limit=(page.offset, page.limit)) return { '__template__': 'blogs.html', 'page': page, 'blogs': blogs }
def get_blog(id): blog = yield from Blog.find(id) comments = yield from Comment.findAll('blog_id=?', [id], orderBy='created_at desc') for c in comments: c.html_content = text2html(c.content) blog.html_content = markdown2.markdown(blog.content) return { '__template__': 'blog.html', 'blog': blog, 'comments': comments }
def api_comments(*, page='1'): page_index = get_page_index(page) num = yield from Comment.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, comments=()) comments = yield from Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, comments=comments)
def api_get_users(*, page='1'): page_index = get_page_index(page) num = yield from User.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, users=()) users = yield from User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) for u in users: u.passwd = '******' return dict(page=p, users=users)
def api_blogs(*, page='1'): page_index = get_page_index(page) num = yield from Blog.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, blogs=()) blogs = yield from Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, blogs=blogs)
def cookie2user(cookie_str): ''' Parse cookie and load user if cookie is valid ''' if not cookie_str: return None try: L = cookie_str.split('-') if len(L) != 3: return None uid, expires, sha1 = L if int(expires) < time.time(): return None user = await User.find(uid) if user is None: return None s = '%s-%s-%s-%s' % (uid, user.passwd, expires, _COOKIE_KEY) if sha1 != hashlib.sha1(s.encode('utf-8')).hexdigest(): logging.info('invalid sha1') return None user.passwd = '******' return user except Exception as e: logging.exception(e) return None # @get('/') # async def index(request): # users = await User.findAll() # return { # '__template__': 'test.html', # 'users': users # }
def index(request): summary = 'Lorem ipsum dolor amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut lobore et dolore magna aliqua.' # blogs = [ # Blog(id='1', name='Test Blog', summary=summary, created_at=time.time()-120), # Blog(id='2', name='Something New', summary=summary, created_at=time.time()-3600), # Blog(id='3', name='Learn Swift', summary=summary, created_at=time.time()-7200) # ] blogs = await Blog.findAll() return { '__template__': 'blogs.html', 'blogs': blogs }
def get_blog(id): blog = await Blog.find(id) comments = await Comment.findAll('blog_id=?', [id], orderBy='created_at desc') for c in comments: c.html_content = text2html(c.content) blog.html_content = markdown2.markdown(blog.content) return { '__template__': 'blog.html', 'blog': blog, 'comments': comments }
def api_get_users(*, page='1'): page_index = get_page_index(page) num = await User.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, users=()) users = await User.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) for u in users: u.password = '******' return dict(page=p, users=users)
def api_register_user(*, email, name, passwd): if not name or not name.strip(): raise APIValueError('name', 'Invalid name.') if not email or not _RE_EMAIL.match(email): raise APIValueError('email', 'Invalid email.') if not passwd or not _RE_SHA1.match(passwd): raise APIValueError('passwd', 'Invalid password.') users = await User.findAll('email=?', [email]) if len(users) > 0: raise APIError('register: failed', 'email', 'Email is already in use.') uid = next_id() sha1_passwd = '%s:%s' % (uid, passwd) user = User( id=uid, name=name.strip(), email=email, passwd=hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/%s?d=mm&s=120' % hashlib.md5(email.encode('utf-8')).hexdigest() ) await user.save() # make session cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r
def api_blogs(*, page='1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, blogs=()) blogs = await Blog.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, blogs=blogs)
def api_comments(*, page='1'): page_index = get_page_index(page) num = await Comment.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page=p, comments=()) comments = await Comment.findAll(orderBy='created_at desc', limit=(p.offset, p.limit)) return dict(page=p, comments=comments)
def index(*, page = '1'): page_index = get_page_index(page) num = await Blog.findNumber('count(id)') page = Page(num, page_index) if num == 0: blogs = [] else: blogs = await Blog.findAll(orderBy = 'created_at desc', limit = (page.offset, page.limit)) return { '__template__' : 'blogs.html', 'page' : page, 'blogs' : blogs }
def get_blog(id): ''' ??blog???blog??. ''' blog = await Blog.find(id) comments = await Comment.findAll('blog_id=?', [id], orderby='created_at desc') for c in comments: c.html_content = text2html(c.content) blog.html_content = markdown2.markdown(blog.content) return { '__template__' : 'blog.html', 'blog' : blog, 'comments' : comments }
def authenticate(*, email, passwd): ''' ???????????cookie, ?signin?????. ''' if not email: raise APIValueError('email', 'Invalid email') if not passwd: raise APIValueError('passwd', 'Invalid passwd.') users = await User.findAll('email=?', [email]) if len(users) == 0: raise APIValueError('email', 'Email not exist.') user = users[0] # check passwd: # sha1: create a SHA1 hash object sha1 = hashlib.sha1() # update: Update the hash object with the object arg,Repeated calls are equivalent to a single call with the concatenation of all the arguments sha1.update(user.id.encode('utf-8')) sha1.update(b':') sha1.update(passwd.encode('utf-8')) if user.passwd != sha1.hexdigest(): raise APIValueError('passwd', 'Invalid password.') # authenticate ok, set cookie: r = web.Response() r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age = 86400, httponly = True) user.passwd = '******' r.content_type = 'application/json' r.body = json.dumps(user, ensure_ascii = False).encode('utf-8') return r
def api_comments(*, page = '1'): ''' ??????, ????manage_comments.html. ''' page_index = get_page_index(page) num = await Comment.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page = p, comments = ()) comments = await Comment.findAll(orderBy = 'created_at desc', limit = (p.offset, p.limit)) return dict(page = p, comments = comments)
def api_get_users(*, page = '1'): ''' ?????????. ''' page_index = get_page_index(page) num = await User.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page = p, users = ()) users = await User.findAll(orderBy = 'created_at desc', limit = (p.offset, p.limit)) for u in users: u.passwd = '******' return dict(page = p, users = users)
def api_blogs(*, page = '1'): ''' ????????, ????manage_blogs.html. ''' page_index = get_page_index(page) num = await Blog.findNumber('count(id)') p = Page(num, page_index) if num == 0: return dict(page = p, blogs = ()) blogs = await Blog.findAll(orderby = 'created_at desc', limit = (p.offset, p.limit)) return dict(page = p, blogs = blogs)
def api_register_user(*, email, name, passwd): # ?????????????? # ???????? # ?????name if not name or not name.strip(): # s.strip(rm)??????s?????????????rm??????? # ??rm???????????? raise APIValueError('name') # ??email????????????? if not email or not _RE_EMAIL.match(email): raise APIValueError('email') # ??passwd???SHA1?????????? if not passwd or not _RE_SHA1.match(passwd): raise APIValueError('passwd') # ?????????????email users = yield from User.findAll('email=?', [email]) # users??????????????????email??????? if len(users) > 0: raise APIError('register:failed', 'email', 'Email is already in use.') # ???????email??????????? uid = next_id() # next_id?models????????????????????id?????????????? sha1_passwd = '%s:%s' % (uid, passwd) # ???id????? # ???????????????????? # unicode???????????????????utf8?? # hashlib.sha1()??????????sha1? # hash.hexdigest()???hash?????16???????? # ?????sha1?????????md5?? # Gravatar??????????????????????????? user = User(id=uid, name=name.strip(), email=email, passwd=hashlib.sha1(sha1_passwd.encode('utf-8')).hexdigest(), image='http://www.gravatar.com/avatar/%s?d=mm&s=120' % hashlib.md5(email.encode('utf-8')).hexdigest()) yield from user.save() # ???????????? # make session cookie: r = web.Response() # ?????????cookie(???????????????????????) # http???????????,?????????????????. # ??????????????Cookies?????,???????????????? # user2cookie????cookie????????? # max_age?cookie???????,????.??????,???????cookie.???????? # ?????????24?? r.set_cookie(COOKIE_NAME, user2cookie(user, 86400), max_age=86400, httponly=True) user.passwd = '******' # ??????????* # ??content_type???data_factory???????? r.content_type = 'application/json' # json.dump?????????json?? r.body = json.dumps(user, ensure_ascii=False).encode('utf-8') return r # API?????