我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用flask_script.prompt()。
def database(): print_topic('Database') print_info("Database is used to store presistant data.") print_info("By having it disabled the data will be store in run-time RAM for the\ session") enabled = None while enabled is None: enabled = prompt("Enable Database(Y/N)").upper() if 'Y' in enabled: enabled = True elif 'N' in enabled: enabled = False else: enabled = None NodeDefender.config.database.set_cfg(enabled = enabled) if enabled: config_database_engine() return True
def config_database_engine(): supported_databases = ['mysql', 'sqlite'] engine = None while engine is None: engine = prompt("Enter DB Engine(SQLITE, MySQL)").lower() if engine not in supported_databases: engine = None NodeDefender.config.database.set_cfg(engine = engine) if engine == 'mysql': config_database_host() config_database_user() if engine == 'sqlite': config_database_file() return True
def config_database_user(): username = None while not username: username = prompt('Enter Username') password = None while not password: password = prompt('Enter Password') db = None while not db: db = prompt("Enter DB Name/Number") NodeDefender.config.database.set_cfg(username = username,\ password = password,\ db = db) return True
def info(name): 'Show information about a Group' if name is None: name = prompt('Group Name') group = NodeDefender.db.group.get_sql(name) if group is None: print("Cant find group ", name) return print("ID: {}, Name: {}".format(group.id, group.name)) print("User Members:") for user in group.users: print("ID: {}, Mail: {}".format(user.id, user.email)) print("Nodes") for node in group.nodes: print("ID: {}, Name: {}".format(node.id, node.name))
def create(email, password): 'Create User Account' if email is None: email = prompt('Email') if password is None: password = prompt_pass('Password') try: NodeDefender.db.user.create(email) NodeDefender.db.user.set_password(email, password) NodeDefender.db.user.enable(email) except ValueError: print("User already present") return print("User {} Successfully added!".format(email))
def run(self): username = prompt("??????") with app.app_context(): g_db = db.session while True: old_user = models.User.query.filter_by(username=username).first() if old_user: print "?????" username = prompt("????????") else: break nickname = prompt("?????") while True: password = prompt_pass("?????") confirm_password = prompt_pass("??????") if password != confirm_password: print "????, ???????" continue break super_user = models.User(username, nickname, password, "", is_superuser=True) g_db.add(super_user) g_db.commit() print super_user.create_date.strftime("%Y%m%d%H%M%S") print "???????%s??" % username
def config_database_host(): server = None while not server: server = prompt('Enter Server Address') port = None while not port: port = prompt('Enter Server Port') NodeDefender.config.database.set_cfg(server = server,\ port = port) return True
def config_database_file(): filepath = None while not filepath: print("FilePath for SQLite Database, Enter leading slash(/) for\ absolute- path. Otherwise relative to your current folder.") filepath = prompt("Enter File Path") if filepath[0] == '/': filepath = filepath else: filepath = NodeDefender.config.basepath + '/' + filepath NodeDefender.config.database.set_cfg(filepath = filepath) return True
def config_logging_type(): loggtype = None while loggtype is None: loggtype = prompt("Enter Logging Type(Syslog/Local)").lower() if loggtype not in supported_loggtypes: loggtype = None NodeDefender.config.logging.set_cfg(TYPE = loggtype) if loggtype == 'local': config_logging_filepath() elif loggtype == 'syslog': config_logging_host() config_logging_level() return True
def config_logging_filepath(): filepath = None while not filepath: print_info("Enter filepath for loggingfile. Leading slah(/) for absolute-\ path. Otherwise relative to current directory") filepath = prompt("Please Filename") if filepath[0] == '/': filepath = filepath else: filepath = NodeDefender.config.basepath + '/' + filepath NodeDefender.config.logging.set_cfg(filepath = filepath)
def config_logging_host(): while not server: server = prompt('Enter Syslog IP') while not port: port = prompt('Enter Syslog Port') NodeDefender.config.set_cfg(server = server, port = port) return True
def config_logging_level(): level = None print_info("Logging Level can be: debug, info, warning, error, critical") while level is None: level = prompt("Debug level").lower() if level not in supported_levels: level = None NodeDefender.config.logging.set_cfg(level = level) return True
def config_mail_host(): host = None while host is None: host = prompt("Enter Server Address") port = None while port is None: port = prompt("Enter Server Port") NodeDefender.config.mail.set_cfg(server = host, port = port) return True
def config_mail_user(): tls = None while tls is None: tls = prompt("TLS Enabled(Y/N)?") if tls[0].upper() == 'Y': tls = True elif tls[0].upper() == 'N': tls = False else: tls = None ssl = None while ssl is None: ssl = prompt("SSL Enabled(Y/N)?") if ssl[0].upper() == 'Y': ssl = True elif ssl[0].upper() == 'N': ssl = False else: ssl = None username = None while username is None: username = prompt('Username') password = None while password is None: password = prompt('Password') NodeDefender.config.mail.set_cfg(tls = tls, ssl = ssl, username = username, password = password) return True
def redis(): print_topic("Redis") print_info("Redis is used to store temporary data(Current heat of sensor\ etc). With redis enabled it will store the data in Redis.\ Disabled will store in as a local class- object") enabled = None while enabled is None: enabled = prompt("Enable Redis(Y/N)").upper() if 'Y' in enabled: enabled = True elif 'N' in enabled: enabled = False else: enabled = None NodeDefender.config.redis.set_cfg(enabled = enabled) if not enabled: return True host = None while host is None: host = prompt("Enter Host Address") NodeDefender.config.redis.set_cfg(host = host) port = None while port is None: port = prompt("Enter Server Port") NodeDefender.config.redis.set_cfg(port = port) database = '' while not database: database = prompt("Enter Database") NodeDefender.config.redis.set_cfg(database = database) return True
def create(name): 'Create a Group' if name is None: name = prompt('Group Name') try: NodeDefender.db.group.create(name) except ValueError as e: print("Error: ", e) print("Group {} successfully added".format(name))
def create(host, port, username, password): 'Create Node and Assign to Group' if host is None: host = prompt('Host Address') if port is None: port = prompt('Port Number') ''' if username is None: username = prompt('Username(blank for None)') if not len(username): username = None if password is None: password = prompt('Password(blank for None)') if not len(password): password = None ''' try: NodeDefender.db.mqtt.create(host, port) except ValueError as e: print("Error: ", e) return print("MQTT {}:{} Successfully created".format(host, port))
def delete(host): 'Delete Node' if host is None: host = prompt('Host Address') try: NodeDefender.db.mqtt.delete(host) except LookupError as e: print("Error: ", e) return print("MQTT {} Successfully deleted".format(host))
def create(name, group, street, city): 'Create Node and Assign to Group' if name is None: name = prompt('Node Name') if group is None: group = prompt('Group Name') if street is None: street = prompt('Street') if city is None: city = prompt('City') try: location = NodeDefender.db.node.location(street, city) except LookupError as e: print("Error: ", e) return try: NodeDefender.db.node.create(name, group, location) except (LookupError, ValueError) as e: print("Error: ", e) return print("Node {} Successfully created".format(name))
def delete(name): 'Delete Node' if name is None: name = prompt('Node Name') try: node.Delete(name) except LookupError as e: print("Error: ", e) return print("Node {} Successfully deleted".format(name))
def delete(mac, index): 'Delete Sensor' if mac is None: mac = prompt('Mac') if index is None: index = prompt('Index') try: sensor.Delete(index, mac) except LookupError as e: print("Error: ", str(e)) return print("Successfully Deleted: ", sensor)
def info(mac, index): 'Info about a specific Sensor' if mac is None: mac = prompt('mac') if index is None: index = prompt('Index') s = sensor.Get(mac, index) if icpe is None: print("Unable to find iCPE {}".format(mac)) print('ID: {}, Name: {}'.format(s.id, s.name)) print('iCPE: {}, Mac: {}'.format(s.icpe.name, s.icpe.mac_address))
def technician(email): "List users that are member of a group" if email is None: email = prompt('Email of User') try: NodeDefender.db.user.set_role(email, 'technician') except AttributeError: return print("User {} not found".format(email)) return print("User {} Added as Technician".format(email))
def admin(email): "List users that are member of a group" if email is None: email = prompt('Email of User') try: NodeDefender.db.user.set_role(email, 'administrator') except AttributeError: return print("User {} not found".format(email)) return print("User {} Added as Administrator".format(email))
def superuser(email): "List users that are member of a group" if email is None: email = prompt('Email of User') try: NodeDefender.db.user.set_role(email, 'superuser') except AttributeError: return print("User {} not found".format(email)) return print("User {} Added as superuser".format(email))
def delete(email): "Deltes User" if email is None: email = prompt('Email') try: u = NodeDefender.db.user.delete(email) except LookupError as e: print("Error: ", e) print("User {} Successfully Deleted!".format(u.email))
def enable(email): "Enable User" if email is None: email = prompt('Email') try: user = NodeDefender.db.user.enable(email) except LookupError as e: print("Error: ", e) print("User {} Successfully Enabled!".format(user.email))
def disable(email): "Disable User" if email is None: email = prompt('Email') try: user = NodeDefender.db.user.disable(email) except LookupError as e: print("Error: ", e) print("User {} Successfully Locked!".format(user.email))
def groups(email): "List User Groups" if email is None: email = prompt('Email') for group in NodeDefender.db.user.groups(email): print("ID: {}, Name: {}".format(group.id, group.name))
def leave(email, group): 'Remove Role from User' if email is None: email = prompt('Email') if group is None: group = prompt('Group') try: NodeDefender.db.group.remove_user(group, email) except LookupError as e: print("Error: ", e) return print("User {}, Successfully removed from {}".format(email, group))
def delete(mac): 'Delete iCPE' if mac is None: mac = prompt('Mac') try: NodeDefender.db.icpe.delete(mac) except LookupError as e: print("Error: ", str(e)) return print("Successfully Deleted: ", mac)
def info(mac): 'Info about a specific iCPE' if mac is None: mac = prompt('Mac') icpe = NodeDefender.db.icpe.get_sql(mac) if icpe is None: print("Unable to find iCPE {}".format(mac)) print('ID: {}, MAC: {}'.format(icpe.id, icpe.mac_address)) print('Alias {}, Node: {}'.format(icpe.alias, icpe.node.name)) print('ZWave Sensors: ') for sensor in icpe.sensors: print('Alias: {}, Type: {}'.format(sensor.alias, sensor.type))
def run(self): email = prompt('Email Address') password = prompt_pass('Password') if password == prompt_pass('Confirm Password'): user = app.user_datastore.create_user(email=email, password=encrypt_password(password)) app.user_datastore.activate_user(user) db.session.commit() self.stdout.write('New User Created, <{id} : {email}>'.format(id=user.id, email=user.email)) else: self.stderr.write('Passwords did not match!')
def run(self): name = prompt('Role Name') desc = prompt('Role Description') role = Role(name=name, description=desc).save() self.stdout.write( 'New Role Created, <{id} : {name} : {desc}>'.format( id=role.id, name=role.name, desc=role.description ) )
def createapp(): path = prompt(Fore.BLUE + "Write the name of the blueprint") try: int(path) print (Fore.RED + "Name no valid") return except ValueError: pass folder = current_app.config['APP_FOLDER'] + path register_blueprint_str = "Blueprint('{0}', 'app.{0}', template_folder='templates')".format(path.lower()) if not os.path.exists(folder): # Scaffold new blueprint os.makedirs(folder) python_files = ["forms", "routes", "utils", "views", "__init__.py"] for i, file in enumerate(python_files): with open(os.path.join(folder, file + ".py"), 'w') as temp_file: if i != 4: if file is "routes": temp_file.write("from flask_via.routers.default import Pluggable\nfrom views import *\n") if file is "forms": temp_file.write("from wtforms import *\n") if file is "views": temp_file.write("from flask import jsonify, request, " "url_for, redirect, current_app, render_template, flash, make_response\n" "from flask.views import MethodView") else: os.makedirs(folder + "/template/" + path) # Register blueprint in app/route.py route_path = os.path.join(current_app.config['APP_FOLDER'], "routes.py") with open(route_path, "r") as old_routes: data = old_routes.readlines() data[-2] = data[-2] + " " + register_blueprint_str + ',\n' os.remove(os.path.join(current_app.config['APP_FOLDER'], "routes.py")) with open(route_path, 'w') as new_routes: new_routes.writelines(data) else: print (Fore.RED + "This path exist")
def createadmin(): username = prompt(Fore.BLUE + "Username") query_username = db.session.query(FinalUser).filter_by(username=username).first() email = prompt(Fore.BLUE + "Write Email") if re.match('^[_a-z0-9-]+(\.[_a-z0-9-]+)*@[a-z0-9-]+(\.[a-z0-9-]+)*(\.[a-z]{2,4})$', email) == None: print(Fore.RED + "Invalid email format") return query_email = db.session.query(FinalUser).filter_by(email=email).first() if query_username is None and query_email is None: password = prompt(Fore.BLUE + "Write password") repeat_password = prompt(Fore.BLUE + "Repeat password") if password == repeat_password: encrypted_password = utils.encrypt_password(password) user_datastore.create_user(username=username, password=encrypted_password, email=email) db.session.commit() user_datastore.add_role_to_user(email, 'admin') db.session.commit() print(Fore.GREEN + "Admin created") else: print(Fore.RED + "The password does not match") return else: print(Fore.RED + "The username or email are in use") return
def general(): print_topic("General configuration") print_info("Server Name. If you are using a local running server please enter\ as format NAME:PORT. Otherwise it will be\ generating non- accessable URLs") print("/ Example: 127.0.0.1:5000") servername = None while servername is None: servername = prompt("Enter Server Name") NodeDefender.config.general.set_cfg(servername = servername) port = None while port is None: port = prompt("Which port should the server be running on") NodeDefender.config.general.set_cfg(port = port) print_info("Security Key is used to Encrypt Password etc.") key = None while key is None: key = prompt("Enter Secret Key") NodeDefender.config.general.set_cfg(secret_key = key) print_info("Salt is used to genereate URLS and more.") salt = None while salt is None: salt = prompt("Please enter Salt") NodeDefender.config.general.set_cfg(salt = salt) print_info("You can either have users register by themselfs on the\ authentication- page or via invite mail. Invite mail requires\ that you also enable mail- support so that NodeDefender can send\ invitation- mail and such. Superuser can still administrate\ users in the same way.") self_registration = None while self_registration is None: self_registration = prompt("Enable self-registration(Y/N)").upper() if 'Y' in self_registration: self_registration = True elif 'N' in self_registration: self_registration = False else: self_registration = None NodeDefender.config.general.set_cfg(self_registration = self_registration) return True
def celery(): print_topic("Celery") print_info("Celery is used for concurrent operation. It will spawn multiple\ workes on multiple CPU cores and possibly even on multiple\ hosts, running as a cluster. Disabling Celery will make\ NodeDefender as a single process. Celery requires AMQP or Redis\ to communicate between workes") enabled = None while enabled is None: enabled = prompt("Enable Celery(Y/N)").upper() if 'Y' in enabled: enabled = True elif 'N' in enabled: enabled = False else: enabled = None NodeDefender.config.celery.set_cfg(enabled = enabled) if not enabled: return True broker = None while broker is None: broker = prompt("Enter Broker type(AMQP or Redis)").lower() if broker not in supported_brokers: broker = None NodeDefender.config.celery.set_cfg(broker = broker) server = None while server is None: server = prompt("Enter Server Address") NodeDefender.config.celery.set_cfg(server = server) port = None while port is None: port = prompt("Enter Server Port") NodeDefender.config.celery.set_cfg(port = port) database = '' while not database: database = prompt("Enter Database") NodeDefender.config.celery.set_cfg(database = database) return True