我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用main.db()。
def testPost(self): exampleUser = copy.deepcopy(EXAMPLE_USER) exampleSchool = copy.deepcopy(EXAMPLE_SCHOOL) assert self.db.user.find_one(exampleUser) is None # Try posting without having logged in via github args = {"schoolID": exampleSchool["_id"], "userID": exampleUser["_id"]} req = self.app.post("/users", data=json.dumps(args), content_type="application/json") assert req.status_code == 400 # Fake github login self.db.tempUser.insert_one(exampleUser) self.db.school.insert_one(exampleSchool) req = self.app.post("/users", data=json.dumps(args), content_type="application/json") assert req.status_code == 201 returnedUser = json.loads(req.data.decode("utf-8")) assert returnedUser["schoolID"] == exampleSchool["_id"] returnedUser.pop("schoolID") assert areDicsEqual(exampleUser, returnedUser) assert self.db.user.find_one(exampleUser) is not None
def add_sample_search(self): uchicago_lat = 41.7886 uchicago_long = -87.5987 radius = 1000 url = "https://data.cityofchicago.org/resource/6zsd-86xi.json" data_source1 = DataSource("Crimes 2001 - Present", url, []) filter1 = Filter("primary_type", "THEFT") data_search1 = DataSearch(data_source1, []) data_search2 = DataSearch(data_source1, [filter1]) search = Search([data_search1, data_search2], uchicago_lat, uchicago_long, radius, "sample") main.db.session.add(search) main.db.session.flush() return search.id # Make sure that correctly constructed searches can retrieve the data # from their data sources
def register(): username = loggedIn(session, LoggedIn) if username != False: return render_template('index.html', username=username) form = RegisterForm() if form.validate_on_submit(): hashedPwd = hashpw(str(request.form['password']).encode('utf-8'), gensalt()) # encrypt user's password user = User(username=request.form['username'], password=hashedPwd) # create user db.session.add(user) db.session.commit() # save new user in User table new_user = User.query.filter_by(username=request.form['username']).first() # new profile user_profile = Profile(user_id=new_user.id, name="no-name", surname="no-surname", avatar="saitama-batman.jpg", description="no-description", skills="no-skills,") db.session.add(user_profile) db.session.commit() # save new profile in Profile table return render_template('registration_success.html', username=request.form['username']) return render_template('register.html', form=form)
def setUp(self): main.db = MongoClient().test self.db = main.db self.app = main.app.test_client()
def testGetAll(self): assert b'[]' in self.app.get("/users").data exampleUser = copy.deepcopy(EXAMPLE_USER) self.db.user.insert_one(exampleUser) newUser = json.loads(self.app.get("/users").data.decode("utf-8"))[0] assert areDicsEqual(exampleUser, newUser)
def testGet(self): assert self.app.get("/users/1").status_code == 404 exampleUser = copy.deepcopy(EXAMPLE_USER) self.db.user.insert_one(exampleUser) newUser = json.loads(self.app.get("/users/"+str(exampleUser['_id'])).data.decode("utf-8")) assert areDicsEqual(exampleUser, newUser)
def generateExampleEvent(db): exampleUser = copy.deepcopy(EXAMPLE_USER) db.user.insert_one(exampleUser) return {"userID": str(exampleUser["_id"]), "title": "New submission", "description": "Pushed a submission with a score of 14"}
def testGet(self): assert self.app.get("/events/1").status_code == 404 exampleEvent = generateExampleEvent(self.db) self.db.event.insert_one(exampleEvent) newEvent = json.loads(self.app.get("/events/"+str(exampleEvent['_id'])).data.decode("utf-8")) assert areDicsEqual(exampleEvent, newEvent)
def testPost(self): exampleEvent = generateExampleEvent(self.db) assert self.db.event.find_one(exampleEvent) is None req = self.app.post("/events", data=json.dumps(exampleEvent), content_type="application/json") assert req.status_code == 201 returnedEvent = json.loads(req.data.decode("utf-8")) assert "_id" in returnedEvent returnedEvent.pop("_id") assert areDicsEqual(exampleEvent, returnedEvent) assert self.db.event.find_one(exampleEvent) is not None
def testGetAll(self): assert b'[]' in self.app.get("/problems").data exampleProblem = copy.deepcopy(EXAMPLE_PROBLEM) self.db.problem.insert_one(exampleProblem) newProblem = json.loads(self.app.get("/problems").data.decode("utf-8"))[0] assert areDicsEqual(exampleProblem, newProblem)
def testGet(self): assert self.app.get("/problems/1").status_code == 404 exampleProblem = copy.deepcopy(EXAMPLE_PROBLEM) self.db.problem.insert_one(exampleProblem) newProblem = json.loads(self.app.get("/problems/"+str(exampleProblem['_id'])).data.decode("utf-8")) assert areDicsEqual(exampleProblem, newProblem)
def generateExampleEntry(db, exampleProblem=EXAMPLE_PROBLEM, exampleUser=EXAMPLE_USER): exampleUser = copy.deepcopy(exampleUser) exampleProblem = copy.deepcopy(exampleProblem) if db.user.find_one(exampleUser) is None: db.user.insert_one(exampleUser) if db.problem.find_one(exampleProblem) is None: db.problem.insert_one(exampleProblem) return {"problemID": str(exampleProblem["_id"]), "userID": str(exampleUser["_id"]), "score": "12"}
def testGetProblem(self): assert b'[]' in self.app.get("/entries").data exampleEntry1 = generateExampleEntry(self.db) exampleProblem2 = copy.deepcopy(EXAMPLE_PROBLEM) exampleProblem2["name"] = "Other Problem" exampleEntry2 = generateExampleEntry(self.db, exampleProblem=exampleProblem2) self.db.entry.insert_one(exampleEntry1) self.db.entry.insert_one(exampleEntry2) returnedEntries = json.loads(self.app.get("/entries", query_string={"problemID": exampleEntry1["problemID"]}).data.decode("utf-8")) assert areDicsEqual(exampleEntry1, returnedEntries[0]) assert len(returnedEntries) == 1
def testGet(self): assert self.app.get("/entries/1").status_code == 404 exampleEntry = generateExampleEntry(self.db) self.db.entry.insert_one(exampleEntry) newEntry = json.loads(self.app.get("/entries/"+str(exampleEntry['_id'])).data.decode("utf-8")) assert areDicsEqual(exampleEntry, newEntry)
def testGetAll(self): assert b'[]' in self.app.get("/blogs").data exampleBlog = copy.deepcopy(EXAMPLE_BLOG) self.db.blog.insert_one(exampleBlog) newBlog = json.loads(self.app.get("/blogs").data.decode("utf-8"))[0] assert areDicsEqual(exampleBlog, newBlog)
def testGet(self): assert self.app.get("/blogs/1").status_code == 404 exampleBlog = copy.deepcopy(EXAMPLE_BLOG) self.db.blog.insert_one(exampleBlog) newBlog = json.loads(self.app.get("/blogs/"+str(exampleBlog['_id'])).data.decode("utf-8")) assert areDicsEqual(exampleBlog, newBlog)
def testGet(self): assert b'{"results": {}}' == self.app.get("/search", query_string={"query": "thisshouldbeinnothing"}).data exampleUser = copy.deepcopy(EXAMPLE_USER) self.db.user.insert_one(exampleUser) req = self.app.get("/search", query_string={"query": exampleUser['name']}) returnedResults = json.loads(req.data.decode("utf-8")) correctResult = {"results": {"user": {"name": "User", "results": [{"title": exampleUser["name"], "url": "/users/?"+str(exampleUser["_id"])}]}}} assert correctResult == returnedResults
def make_shell_context(): return dict( app=app, db=db, User=User, Post=Post, Tag=Tag, Comment=Comment )
def make_shell_context(): return dict(app=app, db=db, User=User, Post=Post, Tag=Tag)
def setUp(self): main.app.config['TESTING'] = True main.app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test_testing.db' main.app.config['WTF_CSRF_ENABLED'] = False main.db.init_app(main.app) with main.app.test_request_context(): main.db.create_all() main.load_meta() self.db = main.db self.app = main.app.test_client()
def tearDown(self): main.app.config['TESTING'] = False with main.app.test_request_context(): self.db.session.remove() self.db.drop_all()
def del_course(id): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) # delete thumbnail course = Course.query.filter_by(id=id).first() target = path.join(APP_ROOT, 'static/courses/') file_path = path.join(target, course.thumbnail) remove(file_path) # delete course db.session.delete(course) db.session.commit() # render my courses page user = User.query.filter_by(username=username).first() courses = Course.query.filter_by(user_id=user.id).all() return render_template('my_courses.html', username=username, courses=courses)
def add_descr(): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) form = EditProfileForm() a_skills = AddSkillsForm() e_skills = EditSkillsForm() e_avatar = EditAvatarForm() e_descr = EditDescription() e_flname = EditFullNameForm() a_descr = AddDescription() if a_descr.validate_on_submit(): user = User.query.filter_by(username=username).first() user_profile = Profile.query.filter_by(user_id=user.id).first() # add and save description if user_profile.description == 'no-description': user_profile.description = request.form['extra_descr'] else: user_profile.description += request.form['extra_descr'] db.session.commit() # render profile page user_skills = user_profile.skills.split(',') return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills) # render edit profile pages return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def edit_descr(): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) form = EditProfileForm() a_skills = AddSkillsForm() e_skills = EditSkillsForm() e_avatar = EditAvatarForm() a_descr = AddDescription() e_flname = EditFullNameForm() e_descr = EditDescription() if e_descr.validate_on_submit(): user = User.query.filter_by(username=username).first() user_profile = Profile.query.filter_by(user_id=user.id).first() # edit and save description user_profile.description = request.form['new_descr'] db.session.commit() # render profile page user_skills = user_profile.skills.split(',') return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills) # render edit profile pages return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def edit_flname(): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) form = EditProfileForm() a_skills = AddSkillsForm() e_skills = EditSkillsForm() e_avatar = EditAvatarForm() e_descr = EditDescription() a_descr = AddDescription() e_flname = EditFullNameForm() if e_flname.validate_on_submit(): user = User.query.filter_by(username=username).first() user_profile = Profile.query.filter_by(user_id=user.id).first() # change skills user_profile.name = request.form['new_name'] user_profile.surname = request.form['new_surname'] db.session.commit() # render profile page user_skills = user_profile.skills.split(',') return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills) # render edit profile pages return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def add_skills(): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) form = EditProfileForm() e_skills = EditSkillsForm() e_avatar = EditAvatarForm() e_flname = EditFullNameForm() e_descr = EditDescription() a_descr = AddDescription() a_skills = AddSkillsForm() if a_skills.validate_on_submit(): user = User.query.filter_by(username=username).first() user_profile = Profile.query.filter_by(user_id=user.id).first() # change profile and save if user_profile.skills == 'no-skills': user_profile.skills = request.form['new_skills'] else: user_profile.skills += request.form['new_skills'] db.session.commit() # render profile page user_skills = user_profile.skills.split(',') return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills) # render edit profile page return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def course(): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) form = CourseForm() if form.validate_on_submit(): user = User.query.filter_by(username=username).first() # save thumbnail thumbnail = form.thumbnail.data filename = secure_filename(thumbnail.filename) rand_ID = str(randId()) while True: result = Course.query.filter_by(thumbnail=rand_ID).first() if result: rand_ID = str(randId()) else: break filename = ''+rand_ID+'.jpg' target = path.join(APP_ROOT, 'static/courses/') if not path.isdir(target): mkdir(target) thumbnail.save(path.join(target, filename)) # save course title = request.form['title'] description = request.form['description'] required_skills = request.form['required_skills'] course = Course(user_id=user.id, title=title, description=description, thumbnail=filename, required_skills=required_skills) db.session.add(course) db.session.commit() return render_template('index.html', username=username) return render_template('new_course.html', form=form)
def login(): username = loggedIn(session, LoggedIn) if username != False: return render_template('index.html', username=username) form = LoginForm() if form.validate_on_submit(): pwd = request.form['password'] existing_user = User.query.filter_by(username=request.form['username']).first() if not existing_user: error = 'Username or password are incorrect.' return render_template('login.html', form=form, loggedInError=error) hash1 = hashpw(str(pwd).encode('utf-8'), str(existing_user.password).encode('utf-8')) hash2 = existing_user.password if hash1 != hash2: error = 'Username or password are incorrect.' return render_template('login.html', form=form, loggedInError=error) rand_ID = str(randId()) while True: user = LoggedIn.query.filter_by(rand_id=rand_ID).first() if user: rand_ID = str(randId()) else: break userLoggedIn = LoggedIn(username=request.form['username'], rand_id=rand_ID) db.session.add(userLoggedIn) db.session.commit() session['user'] = rand_ID return render_template('index.html', username=userLoggedIn.username) return render_template('login.html', form=form)
def edit_avatar(): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) form = EditProfileForm() a_skills = AddSkillsForm() e_skills = EditSkillsForm() e_flname = EditFullNameForm() e_descr = EditDescription() a_descr = AddDescription() e_avatar = EditAvatarForm() if e_avatar.validate_on_submit(): user = User.query.filter_by(username=username).first() user_profile = Profile.query.filter_by(user_id=user.id).first() # update avatar new_avatar = e_avatar.new_avatar.data filename = secure_filename(new_avatar.filename) rand_ID = str(randId()) while True: result = Profile.query.filter_by(avatar=rand_ID).first() if result: rand_ID = str(randId()) else: break filename = ''+rand_ID+'.jpg' # ex: filename = ekjenrfueorf.jpg target = path.join(APP_ROOT, 'static/avatars/') if not path.isdir(target): mkdir(target) old_path = path.join(target, user_profile.avatar) new_path = path.join(target, filename) if user_profile.avatar != "saitama-batman.jpg": remove(old_path) new_avatar.save(new_path) user_profile.avatar = filename db.session.commit() # render profile page user_skills = user_profile.skills.split(',') return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills) # render edit profile pages return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)
def update_profile(): username = loggedIn(session, LoggedIn) if username == False: form = LoginForm() return render_template('login.html', form=form) # AddSkillsForm, EditSkillsForm, EditAvatarForm, EditFullNameForm a_skills = AddSkillsForm() e_skills = EditSkillsForm() e_avatar = EditAvatarForm() e_flname = EditFullNameForm() e_descr = EditDescription() a_descr = AddDescription() form = EditProfileForm() if form.validate_on_submit(): # find user by username and his/her profile by his/her id user = User.query.filter_by(username=username).first() user_profile = Profile.query.filter_by(user_id=user.id).first() # update current user's profile data user_profile.name = request.form['name'] user_profile.surname = request.form['surname'] user_profile.description = request.form['description'] user_profile.skills = request.form['skills'] # update avatar avatar = form.avatar.data filename = secure_filename(avatar.filename) # maybe it's optional because i change the filename rand_ID = str(randId()) while True: result = Profile.query.filter_by(avatar=rand_ID).first() if result: rand_ID = str(randId()) else: break filename = ''+rand_ID+'.jpg' target = path.join(APP_ROOT, 'static/avatars/') # target = project's path + /static/avatars if not path.isdir(target): # if target doesn't exist mkdir(target) # we create the target old_path = path.join(target, user_profile.avatar) new_path = path.join(target, filename) if user_profile.avatar != "saitama-batman.jpg": remove(old_path) avatar.save(new_path) # save the file in the target user_profile.avatar = filename # save changes in profile table db.session.commit() user_skills = user_profile.skills.split(',') return render_template('profile.html', username=username, user_profile=user_profile, user_skills=user_skills) return render_template('edit_profile.html', form=form, a_skills=a_skills, e_skills=e_skills, e_avatar=e_avatar, e_flname=e_flname, e_descr=e_descr, a_descr=a_descr)