我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.current_app._get_current_object()。
def send_email(subject, recipients, template, **kwargs): if not isinstance(recipients, list): recipients = list(recipients) msg = Message(subject, reply_to=current_app.config['MAIL_DEFAULT_SENDER'], recipients=recipients) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) attachments = kwargs.get('attachments', []) mimes = MimeTypes() for file in attachments: path_ = os.path.join(current_app.config['APP_UPLOADS'], file) with current_app.open_resource(path_) as fp: mime = mimes.guess_type(fp.name) msg.attach(path_, mime[0], fp.read()) app = current_app._get_current_object() t = Thread(target=send_async_email, args=[app, msg]) t.start() return t
def send_mail(to, subject, template, **kwargs): """ ???? :param to: ??? :param subject: ?? :param template: ???? :param kwargs: ???? :return: """ app = current_app._get_current_object() message = Message(app.config['MAIL_SUBJECT_PREFIX'] + '' + subject, sender=app.config['MAIL_SENDER'], recipients=[to]) message.body = render_template(template + '.txt', **kwargs) message.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, message]) thr.start() return thr
def save(self): """ ???????? :return: """ try: db_session.add(self) db_session.commit() except: db_session.rollback() raise finally: db_session.close() model_saved.send(app._get_current_object()) return self
def info(): app = current_app._get_current_object() key = app.config['IP_INFO_DB_KEY'] ip = request.args.get('ip', None) if not ip: if request.headers.getlist("X-Forwarded-For"): ip = request.headers.getlist("X-Forwarded-For")[0] else: ip = request.remote_addr user_agent = {'browser': request.user_agent.browser, 'language': request.user_agent.language, 'platform': request.user_agent.platform, 'string': request.user_agent.string, 'version': request.user_agent.version, 'status': 'success', 'message': 'localhost user agent information'} else: user_agent = {'status': 'error', 'message': 'not query localhost information'} ip_info = requests.get('http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, ip)).json() return jsonify(status='success', data={'ip': ip, 'ip_information': ip_info, 'user_agent': user_agent})
def nslookup(): dom = request.args.get('domain', None) if dom: try: result = socket.gethostbyname(dom) except: result = 'domain error, check your input' if result.startswith('domain'): ip_info = None else: app = current_app._get_current_object() key = app.config['IP_INFO_DB_KEY'] ip_info = requests.get('http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, result)).json() return jsonify(status='success', data={'DNS record': result, 'IP infomation': ip_info}) else: return jsonify(status='error', data='needs domain parameter'), 400
def register_user(**kwargs): confirmation_link, token = None, None kwargs['password'] = encrypt_password(kwargs['password']) user = _datastore.create_user(**kwargs) _datastore.commit() if _security.confirmable: confirmation_link, token = generate_confirmation_link(user) do_flash(*get_message('CONFIRM_REGISTRATION', email=user.email)) user_registered.send(app._get_current_object(), user=user, confirm_token=token) if config_value('SEND_REGISTER_EMAIL'): send_mail(config_value('EMAIL_SUBJECT_REGISTER'), user.email, 'welcome', user=user, confirmation_link=confirmation_link) return user
def _check_token(): header_key = _security.token_authentication_header args_key = _security.token_authentication_key header_token = request.headers.get(header_key, None) token = request.args.get(args_key, header_token) if request.get_json(silent=True): if not isinstance(request.json, list): token = request.json.get(args_key, token) user = _security.login_manager.token_callback(token) if user and user.is_authenticated: app = current_app._get_current_object() _request_ctx_stack.top.user = user identity_changed.send(app, identity=Identity(user.id)) return True return False
def nslookup(): dom = request.args.get('domain', None) if dom: try: result = socket.gethostbyname(dom) except: result = 'domain error, check your input' if result.startswith('domain'): ip_info = None else: app = current_app._get_current_object() key = app.config['IP_INFO_DB_KEY'] time.sleep(3) ip_info = requests.get( 'http://api.ipinfodb.com/v3/ip-city/?key={0}&ip={1}&format=json'.format(key, result)).json() return jsonify(status='success', data={'DNS record': result, 'IP infomation': ip_info}) else: return jsonify(status='error', data='needs domain parameter'), 400
def send(self, message, envelope_from=None): """Verifies and sends message. :param message: Message instance. :param envelope_from: Email address to be used in MAIL FROM command. """ assert message.send_to, "No recipients have been added" assert message.sender, ( "The message does not specify a sender and a default sender " "has not been configured") if message.has_bad_headers(): raise BadHeaderError if message.date is None: message.date = time.time() if self.host: self.host.sendmail(sanitize_address(envelope_from or message.sender), list(sanitize_addresses(message.send_to)), message.as_bytes() if PY3 else message.as_string(), message.mail_options, message.rcpt_options) email_dispatched.send(message, app=current_app._get_current_object()) self.num_emails += 1 if self.num_emails == self.mail.max_emails: self.num_emails = 0 if self.host: self.host.quit() self.host = self.configure_host()
def send_email(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(app.config['CIRCULATE_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['CIRCULATE_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, msg]) thr.start() return thr
def send_email(to, subject, template, **kwargs): """Send email using multible threads. """ app = current_app._get_current_object() msg = Message(app.config['MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thread = Thread(target=send_async_email, args=[app, msg]) thread.start() return thread
def __init__(self, thread_ID): self.thread_ID = thread_ID threading.Thread.__init__(self) self.app = current_app._get_current_object()
def set_status(msg, status): message = { 'msg': msg, 'to_do': 'I do not know what to do, the signal will tell me' } status_changed.send(current_app._get_current_object(), status="go_to_work") status = get_status_callback() return message, status
def send_mails(recipients, cc, mail_title, mail_body): msg = Message(mail_title) msg.body = mail_body msg.sender = current_app._get_current_object().config['MAIL_USERNAME'] msg.recipients = recipients msg.cc = cc mail.send(msg)
def send_email(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['FLASKY_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, msg]) thr.start() return thr
def send(self, message): """Verifies and sends message. :param message: Message instance. """ assert message.recipients, "No recipients have been added" assert message.sender, ( "The message does not specify a sender and a default sender " "has not been configured") if message.has_bad_headers(): raise BadHeaderError if message.date is None: message.date = time.time() if self.host: self.host.sendmail(message.sender, message.send_to, message.as_string()) email_dispatched.send(message, app=current_app._get_current_object()) self.num_emails += 1 if self.num_emails == self.mail.max_emails: self.num_emails = 0 if self.host: self.host.quit() self.host = self.configure_host()
def config(self): return app._get_current_object().config
def super_user_role(self): return app._get_current_object().config['SUPER_USER_ROLE']
def _dispatching_log(message, level=None): app = current_app._get_current_object() if app is None or app.debug: return _original_log(message, level)
def send_email(to, subject, template, **kwargs): """Send email using either Celery, or Thread. Selection depends on CELERY_INSTEAD_THREADING config variable. """ app = current_app._get_current_object() if app.config['CELERY_INSTEAD_THREADING']: send_email_celery(to, subject, template, countdown=None, **kwargs) else: send_email_thread(to, subject, template, **kwargs)
def send_email_thread(to, subject, template, **kwargs): """Send async email using threading. """ app = current_app._get_current_object() msg = Message(app.config['APP_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['APP_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, msg]) thr.start() return thr
def send_email_celery(to, subject, template, countdown=None, **kwargs): """Send async email using Celery. """ app = current_app._get_current_object() msg = Message(app.config['APP_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['APP_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) send_celery_async_email.apply_async(args=[msg], countdown=countdown)
def send_mail(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(subject, sender=os.environ.get('MAIL_USERNAME'), recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_mail_async, args=[app, msg]) thr.start() return thr
def get_engine(): app = current_app._get_current_object() if not hasattr(app, '_sa_engine'): app._sa_engine = create_engine(get_config('DATABASE_URL')) return app._sa_engine
def get_metadata(): app = current_app._get_current_object() if not hasattr(app, '_sa_meta'): app._sa_meta = MetaData() app._sa_meta.bind = get_engine() return app._sa_meta
def get_model(): app = current_app._get_current_object() if not hasattr(app, '_lg_model'): from memorious.model import Model model_config = get_config('MODEL_YAML', 'model.yaml') app._lg_model = Model(load_config_file(model_config)) return app._lg_model
def get_es(): app = current_app._get_current_object() if not hasattr(app, '_es_instance'): app._es_instance = Elasticsearch(get_config('ELASTICSEARCH_URL'), timeout=240, sniff_on_start=True) return app._es_instance
def send_email(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(app.config['YIAVE_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['YIAVE_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, msg]) thr.start() return thr
def before_first_request(): """Start a background thread that looks for users that leave.""" def find_offline_users(app): with app.app_context(): while True: users = User.find_offline_users() for user in users: push_model(user) db.session.remove() time.sleep(5) if not current_app.config['TESTING']: thread = threading.Thread(target=find_offline_users, args=(current_app._get_current_object(),)) thread.start()
def send_email(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(app.config['SYSTEM_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['SYSTEM_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, msg]) thr.start() return thr
def setUp(self): FlaskBloggingTestCase.setUp(self) self._create_storage() self.app.config["BLOGGING_URL_PREFIX"] = "/blog" self.engine = self._create_blogging_engine() self.login_manager = LoginManager(self.app) @self.login_manager.user_loader @self.engine.user_loader def load_user(user_id): return TestUser(user_id) @self.app.route("/login/<username>/", methods=["POST"], defaults={"blogger": 0}) @self.app.route("/login/<username>/<int:blogger>/", methods=["POST"]) def login(username, blogger): this_user = TestUser(username) login_user(this_user) if blogger: identity_changed.send(current_app._get_current_object(), identity=Identity(username)) return redirect("/") @self.app.route("/logout/") def logout(): logout_user() identity_changed.send(current_app._get_current_object(), identity=AnonymousIdentity()) return redirect("/") for i in range(20): tags = ["hello"] if i < 10 else ["world"] user = "testuser" if i < 10 else "newuser" self.storage.save_post(title="Sample Title%d" % i, text="Sample Text%d" % i, user_id=user, tags=tags)
def login(): user = User("testuser") login_user(user) # notify the change of role identity_changed.send(current_app._get_current_object(), identity=Identity("testuser")) return redirect("/blog")
def logout(): logout_user() # notify the change of role identity_changed.send(current_app._get_current_object(), identity=AnonymousIdentity()) return redirect("/")
def logout(): logout_user() # Notify flask principal that the user has logged out identity_changed.send(current_app._get_current_object(), identity=AnonymousIdentity()) return redirect(url_for('index.render_feed'))
def _perform_login(self, user): user = prepare_user(user) login_user(user) # Notify flask principal that the identity has changed identity_changed.send(current_app._get_current_object(), identity=Identity(user.id))
def send_email(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(app.config['FLASKY_MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender = app.config['FLASKY_MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, msg]) thr.start() return thr
def send_email(to, subject, template, **kwargs): app = current_app._get_current_object() msg = Message(app.config['MAIL_SUBJECT_PREFIX'] + ' ' + subject, sender=app.config['MAIL_SENDER'], recipients=[to]) msg.body = render_template(template + '.txt', **kwargs) msg.html = render_template(template + '.html', **kwargs) thr = Thread(target=send_async_email, args=[app, msg]) thr.start() return thr
def login(): data = request.values if hasattr(request, "json") and request.json: data = request.json if octoprint.server.userManager.enabled and "user" in data and "pass" in data: username = data["user"] password = data["pass"] if "remember" in data and data["remember"] in valid_boolean_trues: remember = True else: remember = False if "usersession.id" in session: _logout(current_user) user = octoprint.server.userManager.findUser(username) if user is not None: if octoprint.server.userManager.checkPassword(username, password): if octoprint.server.userManager.enabled: user = octoprint.server.userManager.login_user(user) session["usersession.id"] = user.get_session() g.user = user login_user(user, remember=remember) identity_changed.send(current_app._get_current_object(), identity=Identity(user.get_id())) return jsonify(user.asDict()) return make_response(("User unknown or password incorrect", 401, [])) elif "passive" in data: return passive_login() return NO_CONTENT