我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用model.Session()。
def create_user(self, username, password): """creates a user in local user database Args: - username: name of the user - password: password of the user Returns: True, if the user was created successfully False, if not """ try: orm_session = model.Session() user = model.LocalUser( user_name=username, password_hash=self.__hash_password(password) ) orm_session.add(user) orm_session.commit() orm_session.close() except: return False return True
def get_user(self, username): """Returns the user object for the given username Args: - username: name of the user Returns: the user object or None, if the user was not found """ try: orm_session = model.Session() user = orm_session.query(model.LocalUser).\ filter(model.LocalUser.user_name==username).\ first() orm_session.close() except: return None return user
def change_password(self, username, password): """change password of the given user Args: - username: name of the user - password: new password of the user Returns: True, if password change was successful False, if not """ try: orm_session = model.Session() user = orm_session.query(model.LocalUser).\ filter(model.LocalUser.user_name==username).\ first() user.password_hash = self.__hash_password(password) orm_session.commit() orm_session.close() except: return False return True
def delete_user(self, username): """Delete the given user Args: - username: name of the user Returns: True, if the user was deleted successfully False, if not """ orm_session = model.Session() user = orm_session.query(model.LocalUser).\ filter(model.LocalUser.user_name==username).\ first() if user is None: return False else: orm_session.delete(user) orm_session.commit() orm_session.close() return True
def delete_source(name): orm_session = model.Session() source = orm_session.query(model.Source).filter(model.Source.source_name==name).first() if source is None: orm_session.close() message = "Source " + name + " not found!" if json_check(): return json_error(message, 404) flash(message, "alert-danger") return redirect("/sources") else: orm_session.delete(source) orm_session.commit() orm_session.close() message = "Source " + name + " successfully deleted" if json_check(): return json_result(message, 200) flash(message, "alert-success") return redirect("/sources")
def delete_target(name): orm_session = model.Session() target = orm_session.query(model.Target).filter(model.Target.target_name==name).first() if target is None: orm_session.close() error_msg = "Target " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/targets") else: orm_session.delete(target) orm_session.commit() orm_session.close() result_msg = "Target " + name + " successfully deleted" if json_check(): return json_result(result_msg, 200) flash(result_msg, "alert-success") return redirect("/targets")
def add_rule(): # check, if data are form data or json if request.get_json(silent=True) is not None: rule_match = request.json["rule_match"] rule_delay = request.json["rule_delay"] rule_maxforwardings = request.json["rule_maxforwardings"] rule_target = request.json["rule_target"] else: rule_match = request.form['rule'] rule_delay = request.form['delay'] rule_maxforwardings = request.form['maxforwardings'] rule_target = request.form['target'] orm_session = model.Session() rule = model.ForwardingRule(rule_match=rule_match, rule_delay=rule_delay, rule_maxforwardings=rule_maxforwardings, rule_target=rule_target) orm_session.add(rule) orm_session.commit() orm_session.close() result_msg = "Rule successfully added" if json_check(): return json_result(result_msg, 200) flash("Rule successfully added", "alert-success") return redirect("/rules")
def login(account_id): session = Session(account_id=account_id) db.session.add(session) db.session.commit() session.account.dormant = False db.session.commit() tasks.fetch_acc.s(account_id).apply_async(routing_key='high') return session
def authenticate(self, username, password): password_hash = self.__hash_password(password) orm_session = model.Session() user = orm_session.query(model.LocalUser).\ filter(model.LocalUser.user_name==username).\ filter(model.LocalUser.password_hash==password_hash).\ first() if user is not None: self._logger.info("LocalUserAuthenticationProvider: Login of user %s successful", username) return True self._logger.warn("LocalUserAuthenticationProvider: Login of user %s failed", username) return False
def index(): orm_session = model.Session() sources = orm_session.query(model.Source).all() rules = orm_session.query(model.ForwardingRule).all() orm_session.close() return render_template("index.html.tpl", sources=sources, rules=rules)
def get_source_list(): orm_session = model.Session() sources = orm_session.query(model.Source).all() orm_session.close() if json_check(): return jsonify(items=[source.dict_repr() for source in sources]) return render_template("source_list.html.tpl", sources=sources)
def get_source(name): orm_session = model.Session() source = orm_session.query(model.Source).filter(model.Source.source_name==name).first() orm_session.close() if source is None: error_msg = "Source " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/sources") if json_check(): return jsonify(source.dict_repr()) return render_template("source_view.html.tpl", source=source)
def test_source(name): orm_session = model.Session() source = orm_session.query(model.Source).filter(model.Source.source_name==name).first() orm_session.close() if source is None: error_msg = "Source " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/sources") recv = receiver.OpennmsReceiver(source) test_result = recv.test_connection() result = {} result["result_state"] = "failed" result["result_msg"] = "" if test_result == 200: result["result_state"] = "success" result["result_msg"] = "Test of source " + name + " successful: HTTP/" + str(test_result) flash(result["result_msg"], "alert-success") elif test_result == -1: result["result_msg"] = "Test of source " + name + " failed: Error connecting to server" flash(result["result_msg"], "alert-danger") else: result["result_msg"] = "Test of source " + name + " failed: HTTP/" + str(test_result) flash(result["result_msg"], "alert-danger") if json_check(): return jsonify(result) return redirect("/sources")
def edit_source(name): orm_session = model.Session() source = orm_session.query(model.Source).filter(model.Source.source_name==name).first() if source is None: orm_session.close() error_msg = "Source " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/sources") else: # check, if data are form data or json if request.get_json(silent=True) is not None: # update source from json data source.source_url = request.json["source_url"] source.source_user = request.json["source_user"] source.source_password = request.json["source_password"] source.source_filter = request.json["source_filter"] orm_session.commit() orm_session.close() result_msg = "Source " + name + " successfully changed" return json_result(result_msg, 200) else: # update source from form data source.source_url = request.form["url"] source.source_user = request.form["user"] source.source_password = request.form["password"] source.source_filter = request.form["filter"] orm_session.commit() orm_session.close() flash("Source " + name + " successfully changed", "alert-success") return redirect("/sources")
def get_target_list(): orm_session = model.Session() targets = orm_session.query(model.Target).options(joinedload("target_parms")).all() forwarder_classes = forwarder.Forwarder.get_forwarder_classnames() orm_session.close() if json_check(): return jsonify(items=[target.dict_repr() for target in targets]) return render_template("target_list.html.tpl", targets=targets, forwarder_classes=forwarder_classes)
def get_target(name): orm_session = model.Session() target = orm_session.query(model.Target).options(joinedload("target_parms")).filter(model.Target.target_name==name).first() orm_session.close() if target is None: error_msg = "Target " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/targets") else: if json_check(): return jsonify(target.dict_repr()) return render_template("target_view.html.tpl", target=target)
def test_target(name): # check if message parameter is set message = None try: message = request.json["message"] except: pass try: message = request.form["message"] except: pass orm_session = model.Session() target = orm_session.query(model.Target).options(joinedload("target_parms")).filter(model.Target.target_name==name).first() orm_session.close() if target is None: error_msg = "Target " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/targets") else: forwarder_obj = forwarder.Forwarder.create_forwarder(target.target_name, target.target_class, target.target_parms) forwarder_obj.test_forwarder(message) result_msg = "Target " + name + " tested" if json_check(): return json_result(result_msg, 200) flash(result_msg, "alert-success") return redirect("/targets")
def edit_target(name): orm_session = model.Session() target = orm_session.query(model.Target).filter(model.Target.target_name==name).first() if target is None: orm_session.close() error_msg = "Target " + name + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/targets") else: #update target parameters # check, if data are form data or json if request.get_json(silent=True) is not None: # update source from json data for request_parm in request.json["target_parms"]: for target_parm in target.target_parms: if target_parm.parameter_name == request_parm: target_parm.parameter_value = request.json["target_parms"][request_parm] else: # update source from form data for request_parm in request.form: if request_parm != "action" and request_parm != "class" and request_parm != "name": for target_parm in target.target_parms: if target_parm.parameter_name == request_parm: target_parm.parameter_value = request.form[request_parm] orm_session.commit() orm_session.close() result_msg = "Target " + name + " successfully changed" if json_check(): return json_result(result_msg, 200) flash(result_msg, "alert-success") return redirect("/targets")
def get_rule_list(): orm_session = model.Session() rules = orm_session.query(model.ForwardingRule).all() targets = orm_session.query(model.Target).all() orm_session.close() if json_check(): return jsonify(items=[rule.dict_repr() for rule in rules]) return render_template("rule_list.html.tpl", rules=rules, targets=targets)
def get_rule(rule_id): orm_session = model.Session() targets = orm_session.query(model.Target).all() rule = orm_session.query(model.ForwardingRule).filter(model.ForwardingRule.rule_id==rule_id).\ first() if rule is None: orm_session.close() error_msg = "Rule " + rule_id + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/rules") if json_check(): return jsonify(rule.dict_repr()) return render_template("rule_view.html.tpl", rule=rule, targets=targets)
def edit_rule(rule_id): orm_session = model.Session() rule = orm_session.query(model.ForwardingRule).filter(model.ForwardingRule.rule_id==rule_id).\ first() if rule is None: orm_session.close() error_msg = "Rule " + rule_id + " not found!" if json_check(): return json_error(error_msg, 404) flash(error_msg, "alert-danger") return redirect("/rules") else: # check, if data are form data or json if request.get_json(silent=True) is not None: rule.rule_target = request.json["rule_target"] rule.rule_match = request.json["rule_match"] rule.rule_delay = request.json["rule_delay"] rule.rule_maxforwardings = request.json["rule_maxforwardings"] else: rule.rule_target = request.form["target"] rule.rule_match = request.form["match"] rule.rule_delay = request.form["delay"] rule.rule_maxforwardings = request.form["maxforwardings"] orm_session.commit() orm_session.close() result_msg = "Rule successfully changed" if json_check(): return json_result(result_msg, 200) flash(result_msg, "alert-success") return redirect("/rules")
def parse_xml(self, content): """ Parse the xml file and build the course""" content = content.replace('<b />', '').replace('<TableHead />', '') element = ElementTree.fromstring(content) if not self.course.title_full: self.course.title_full = ElementTree.tostring(element.find('CourseTitle'), encoding='utf8', method='text') if not self.course.title_short: self.course.title_short = ElementTree.tostring(element.find('CourseCode'), encoding='utf8', method='text') section_title = ElementTree.tostring(element.find('ItemTitle'), encoding='utf8', method='xml') #section_title = re.sub('^([0-9]+\.?)+\s*', '', section_title) sessions = [] session_count = len(element.findall('.//Session')) references_count = len(element.findall('.//Reference')) if references_count > 0 and session_count == 0 and (not self.includerefs): print ' > References section. Excluding it...' return for i, session in enumerate(element.iter('Session'), start=1): progress = str(i * 100 / session_count) + '%' print '\r > Parsing Sessions (' + str(i) + '/' + str(session_count) + ' - ' + progress + ').', session_title = ElementTree.tostring(session.find('Title'), encoding='utf8', method='xml') #session_title = re.sub('^([0-9]+\.?)+\s*', '', session_title) session.remove(session.find('Title')) content = ElementTree.tostring(session, encoding='utf8', method='xml') sessions.append(Session(session_title, content)) self.course.sections.append(Section(section_title, sessions)) print 'Done.'
def download_images(self, content): element = ElementTree.fromstring(content) images_dir = os.path.join(self.output_path, 'temp', 'images') if not os.path.exists(images_dir): os.makedirs(images_dir) for i, session in enumerate(element.iter('item'), start=1): try: print ' * Session ' + str(i) + ":", description = ElementTree.tostring(session.find('description'), 'utf8', 'xml') # print description.text images_list = re.findall('http[s]?://[^\s]*\.(?:jpg|JPG|png|PNG|jpeg|JPEG)', description) if len(images_list) == 0: print 'No images to download.' continue else: print '\n > Getting images from RSS file content... ' for idx, image_url in enumerate(images_list): progress = str(idx * 100 / len(images_list)) + '%' print '\r > Downloading images (' + progress + ')', sys.stdout.flush() err, image = URLUtils.get(image_url) if not err: filename = image_url.split("/")[-1].replace(".small", "") f = open(os.path.join(images_dir, filename), "wb+") f.write(image) f.close() print '\r > Downloading images (100%). Done.' except AttributeError: return
def insert_db_bikes(values): """Function for inserting scraped data from Bikes API into database""" fields = ['status', 'bike_stands', 'available_bike_stands', 'available_bikes'] session = Session() for data in values: station = session.query(Station).get(data['number']) # checking if the timestamp is greater than the last update to ensure no duplicates are added to the DB if datetime.fromtimestamp(data['last_update']/1000) > station.last_updated: new_data = UsageData(**{field: data[field] for field in fields}) new_data.dt_last_update = data['last_update'] station.station_usage.append(new_data) session.commit() session.close()
def insert_db_weather(weather_values): """Function for inserting scraped data from Weather API into database""" session = Session() new_data = Weather(coord_lon=weather_values["coord"]["lon"], coord_lat=weather_values["coord"]["lat"], weather_id=weather_values["weather"][0]["id"], weather_main=weather_values["weather"][0]["main"], weather_description=weather_values["weather"][0]["description"], weather_icon=weather_values["weather"][0]["icon"], base=weather_values["base"], main_temp=weather_values["main"]["temp"], main_pressure=weather_values["main"]["pressure"], main_humidity=weather_values["main"]["humidity"], main_temp_min=weather_values["main"]["temp_min"], main_temp_max=weather_values["main"]["temp_max"], visibility=weather_values["visibility"], wind_speed=weather_values["wind"]["speed"], wind_deg=weather_values["wind"]["deg"], clouds_all=weather_values["clouds"]["all"], dt=datetime.fromtimestamp(weather_values["dt"]), sys_type=weather_values["sys"]["type"], sys_id=weather_values["sys"]["id"], sys_message=weather_values["sys"]["message"], sys_country=weather_values["sys"]["country"], sys_sunrise=datetime.fromtimestamp(weather_values["sys"]["sunrise"]), sys_sunset=datetime.fromtimestamp(weather_values["sys"]["sunset"]), city_id=weather_values["id"], city_name=weather_values["name"], cod=weather_values["cod"]) session.add(new_data) session.commit() session.close() # Running the scraper