Python flask 模块,request() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request()

项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def launch_lti() -> t.Any:
    """Do a LTI launch.

    .. :quickref: LTI; Do a LTI Launch.
    """
    lti = {
        'params': CanvasLTI.create_from_request(flask.request).launch_params,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=1)
    }
    return flask.redirect(
        '{}/lti_launch/?inLTI=true&jwt={}'.format(
            app.config['EXTERNAL_URL'],
            urllib.parse.quote(
                jwt.encode(
                    lti, app.config['LTI_SECRET_KEY'], algorithm='HS512'
                ).decode('utf8')
            )
        )
    )
项目:PimuxBot    作者:Finn10111    | 项目源码 | 文件源码
def index():
    if request.args.get('code'):
        unlock_code = request.args.get('code')
        # unlock, new password
        re = s.query(RecoveryEmail).filter(RecoveryEmail.password_code==unlock_code).one_or_none()
        if re:
            jid = re.jid
            re.password_code = None
            s.merge(re)
            s.commit()
            # set new password and send email
            email_address = re.email
            password = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(10))
            p = subprocess.Popen(['/usr/bin/prosodyctl', 'passwd', jid], stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT)
            args = bytes("%s\n%s\n" % (password, password), encoding='utf8')
            p.communicate(args)
            sendMail(email_address, 'new password', password)
            content = render_template('success.html', message='password was sent')
        else:
            content = render_template('error.html', message='link invalid')
    else:
        content = render_template('index.html')
    return content
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
项目:love    作者:Yelp    | 项目源码 | 文件源码
def api_send_loves():
    sender = request.form.get('sender')
    recipients = sanitize_recipients(request.form.get('recipient'))
    message = request.form.get('message')

    try:
        recipients = send_loves(recipients, message, sender_username=sender)
        recipients_display_str = ', '.join(recipients)
        link_url = create_love_link(recipients_display_str, message).url
        return make_response(
            u'Love sent to {}! Share: {}'.format(recipients_display_str, link_url),
            LOVE_CREATED_STATUS_CODE,
            {}
        )
    except TaintedLove as exc:
        return make_response(
            exc.user_message,
            LOVE_FAILED_STATUS_CODE if exc.is_error else LOVE_CREATED_STATUS_CODE,
            {}
        )
项目:love    作者:Yelp    | 项目源码 | 文件源码
def sent():
    link_id = request.args.get('link_id', None)
    recipients_str = request.args.get('recipients', None)
    message = request.args.get('message', None)

    if not link_id or not recipients_str or not message:
        return redirect(url_for('home'))

    recipients = sanitize_recipients(recipients_str)
    loved = [
        Employee.get_key_for_username(recipient).get()
        for recipient in recipients
    ]

    return render_template(
        'sent.html',
        current_time=datetime.utcnow(),
        current_user=Employee.get_current_employee(),
        message=message,
        loved=loved,
        url='{0}l/{1}'.format(config.APP_BASE_URL, link_id),
    )
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
项目:zanph    作者:zanph    | 项目源码 | 文件源码
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def login():
    username = request.headers.get('username')
    password = request.headers.get('password')

    if username is None or password is None:
        raise InvalidRequest()

    user = UsersCollection().find_one({'username': username})
    if user is None:
        raise AuthFailed()

    is_valid = check_password_hash(user['password_hash'], password)
    if not is_valid:
        raise AuthFailed()

    return jsonify({'token': UserJWT.new(username, user['scope'])})
项目:PEBA    作者:dtag-dev-sec    | 项目源码 | 文件源码
def fixUrl(destinationPort, transport, url, peerType):
    """
        fixes the URL (original request string)
    """
    transportProtocol = ""
    if transport.lower() in "udp" or transport.lower() in "tcp":
        transportProtocol="/"+transport

    if ("honeytrap" in peerType):
        return "Attack on port " + str(destinationPort) + transportProtocol

    # prepared dionaea to output additional information in ticker
    if ("Dionaea" in peerType):
        return "Attack on port " + str(destinationPort)+ transportProtocol

    return url
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
项目:myproject    作者:dengliangshi    | 项目源码 | 文件源码
def password_reset_request():
    """Request for reseting password when user forget his/her password.
    """
    if not current_user.is_anonymous:
        return redirect(url_for('.index'))
    form = PasswordResetRequestForm()
    if form.validate_on_submit():
        user = User.query.filter_by(email=form.email.data).first()
        if user:
            token = user.generate_reset_token()
            send_email(user.email, 'Reset Your Password',
                       'auth/email/reset_password',
                       user=user, token=token,
                       next=request.args.get('next'))
        flash('An email with instructions to reset your password has been '
              'sent to you.')
        return redirect(url_for('auth.login'))
    return render_template('auth/reset_password.html', form=form)
项目:wank.party    作者:Streetwalrus    | 项目源码 | 文件源码
def inject():
    return {
        'root': _cfg("protocol") + "://" + _cfg("domain"),
        'domain': _cfg("domain"),
        'protocol': _cfg("protocol"),
        'len': len,
        'any': any,
        'request': request,
        'locale': locale,
        'url_for': url_for,
        'file_link': file_link,
        'disown_link': disown_link,
        'user': current_user,
        'moe': random.choice(moe),
        'random': random,
        'owner': _cfg("owner"),
        'owner_email': _cfg("owner_email"),
        '_cfg': _cfg
    }
项目:CoinCord    作者:heyitswither    | 项目源码 | 文件源码
def make_embed(request, message="New Transaction"):
  """
  create an embed for logging transactions
  """
  fields = []
  fields.append({"name": "ID", "value": request['_id']})
  if request['type'] == "deposit":
    fields.append({"name": "Amount", "value": f"+{request['amount']}"})
  elif request['type'] == "withdrawl":
    fields.append({"name": "Amount", "value": f"-{request['amount']}"})
  fields.append({"name": "Reason", "value": request['reason']})
  fields.append({"name": "Balance", "value": request['user']['balance']})
  fields.append(
      {"name": "User", "value": f"{request['user']['name']}#{request['user']['discrim']} ({request['user']['_id']})"})
  fields.append(
      {"name": "Bot", "value": f"{request['bot']['name']}#{request['bot']['discrim']} ({request['bot']['_id']})"})
  embed = {"title": message, "fields": fields, "timestamp":datetime.now().isoformat()}
  test = requests.post('https://canary.discordapp.com/api/webhooks/338371642277494786/vG8DJjpXC-NEXB4ZISo1r7QQ0Ras_RaqZbuhzjYOklKu70l73PmumdUCgBruypPv3fQp', json={"embeds": [embed]})
项目:CoinCord    作者:heyitswither    | 项目源码 | 文件源码
def get_transactions(request):
  transactions = []
  if "type" in request:
    transactions.append(list(r.table('transactions').filter(
        r.row['type'] == request['type']).run(conn)))
  if "amount" in request:
    transactions.append(list(r.table('transactions').filter(
        r.row['amount'] == request['amount']).run(conn)))
  if "bot" in request:
    transactions.append(list(r.table('transactions').filter(
        r.row['bot'] == request['bot']).run(conn)))
  if "user" in request:
    transactions.append(list(r.table('transactions').filter(
        r.row['user'] == request['user']).run(conn)))
  if "reason" in request:
    transactions.append(list(r.table('transactions').filter(
        r.row['reason'] == request['reason']).run(conn)))
  if not "reason" or "user" or "bot" or "amount" or "type" in request:
    transactions.append(list(r.table('transactions').run(conn)))
  temp = []
  for i in transactions:
    if i not in temp:
      temp.append(i)
  transactions = temp
  return transactions[:request['limit']]
项目:CoinCord    作者:heyitswither    | 项目源码 | 文件源码
def create_token():
  """
  register a new bot, and return the token

  "bot": {
    "name": "Alpha Bot",
    "discrim": "4112",
    "_id": "331841835733614603"
  }
  """
  raw_request = flask.request
  request = flask.request.get_json()

  token = tokengenerator.URandomTokenGenerator().generate()
  bot = {"name": request['bot']['name'], "discrim": request['bot']['discrim'],
         "_id": request['bot']['id'], "owner": request['owner'], "token": token}
  r.table('bots').insert({"bot": bot})
  return flask.jsonify(bot)
项目:CoinCord    作者:heyitswither    | 项目源码 | 文件源码
def show_transactions():
  """
  Returns all transactions that match the given parameters
  (keys in parentheses are optional)
  {
    "limit": 20,
    ("type": "deposit",)
    ("amount": 200,)
    ("user": {
      "name": "Mr Boo Grande",
      "discrim": "6644",
      "_id": "209137943661641728"
    },)
    ("bot": {
      "name": "Alpha Bot",
      "discrim": "4112",
      "_id": "331841835733614603"
    },)
    ("reason": "casino")
  }
  """
  raw_request = flask.request
  request = flask.request.get_json()
  transactions = get_transactions(request)
  return flask.jsonify(transactions)
项目:CoinCord    作者:heyitswither    | 项目源码 | 文件源码
def fake_transaction():
  """
  create a fake transaction log (for testing purposes)

  {
    "_id": 90832,
    "type": "deposit",
    "amount": 200,
    "user": {
      "name": "Mr Boo Grande",
      "discrim": "6644",
      "_id": "209137943661641728"
    },
    "bot": {
      "name": "Alpha Bot",
      "discrim": "4112",
      "_id": "331841835733614603"
    },
    "reason": "casino"
  }
  """
  raw_request = flask.request
  request = flask.request.get_json()
  make_embed(request)
  return flask.jsonify(request)
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
项目:tesismometro    作者:joapaspe    | 项目源码 | 文件源码
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def flask(body, headers):
    import flask

    path = '/hello/<account_id>/test'
    flask_app = flask.Flask('hello')

    @flask_app.route(path)
    def hello(account_id):
        request = flask.request
        user_agent = request.headers['User-Agent']  # NOQA
        limit = request.args.get('limit', '10')  # NOQA

        return flask.Response(body, headers=headers,
                              mimetype='text/plain')

    return flask_app
项目:deb-python-falcon    作者:openstack    | 项目源码 | 文件源码
def werkzeug(body, headers):
    import werkzeug.wrappers as werkzeug
    from werkzeug.routing import Map, Rule

    path = '/hello/<account_id>/test'
    url_map = Map([Rule(path, endpoint='hello')])

    @werkzeug.Request.application
    def hello(request):
        user_agent = request.headers['User-Agent']  # NOQA
        limit = request.args.get('limit', '10')  # NOQA
        adapter = url_map.bind_to_environ(request.environ)  # NOQA
        endpoint, values = adapter.match()  # NOQA
        aid = values['account_id']  # NOQA

        return werkzeug.Response(body, headers=headers,
                                 mimetype='text/plain')

    return hello
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
项目:isni-reconcile    作者:cmh2166    | 项目源码 | 文件源码
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
项目:raiden    作者:raiden-network    | 项目源码 | 文件源码
def _serve_webui(self, file_name='index.html'):  # pylint: disable=redefined-builtin
        try:
            assert file_name
            web3 = self.flask_app.config.get('WEB3_ENDPOINT')
            if web3 and 'config.' in file_name and file_name.endswith('.json'):
                host = request.headers.get('Host')
                if any(h in web3 for h in ('localhost', '127.0.0.1')) and host:
                    _, _port = split_endpoint(web3)
                    _host, _ = split_endpoint(host)
                    web3 = 'http://{}:{}'.format(_host, _port)
                response = jsonify({'raiden': self._api_prefix, 'web3': web3})
            else:
                response = send_from_directory(self.flask_app.config['WEBUI_PATH'], file_name)
        except (NotFound, AssertionError):
            response = send_from_directory(self.flask_app.config['WEBUI_PATH'], 'index.html')
        return response
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
项目:fontman-server    作者:fontman    | 项目源码 | 文件源码
def find_fontface_by_font_id():
    response_data = []

    try:
        query_string = request.args.get("font_id")
        fontfaces = FontFaceService().find_by_font_id(query_string)

        for fontface in fontfaces:
            response_data.append(
                {
                    "fontface_id": fontface.fontface_id,
                    "fontface": fontface.fontface,
                    "font_id": fontface.font_id,
                    "resource_path": fontface.resource_path
                }
            )

        return jsonify(response_data)

    except:
        return jsonify({"error": "Invalid request"})
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def pushbullet(ALERTID=None, TOKEN=None):
    """
    Send a `link` notification to all devices on pushbullet with a link back to the alert's query.
    If `TOKEN` is not passed, requires `PUSHBULLETTOKEN` defined, see https://www.pushbullet.com/#settings/account
    """
    if not PUSHBULLETURL:
        return ("PUSHBULLET parameter must be set, please edit the shim!", 500, None)
    if (TOKEN is not None):
        PUSHBULLETTOKEN = TOKEN
    if not PUSHBULLETTOKEN:
        return ("PUSHBULLETTOKEN parameter must be set, please edit the shim!", 500, None)

    a = parse(request)

    payload = {
        "body": a['info'],
        "title": a['AlertName'],
        "type": "link",
        "url": a['url'],
    }

    headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN}

    return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
项目:fontman-desktop-core    作者:fontman    | 项目源码 | 文件源码
def find_fontface_by_font_id():
    response_data = []

    try:
        query_string = request.args.get("font_id")
        fontfaces = FontFaceService().find_by_font_id(query_string)

        for fontface in fontfaces:
            response_data.append(
                {
                    "fontface_id": fontface.fontface_id,
                    "fontface": fontface.fontface,
                    "font_id": fontface.font_id,
                    "resource_path": fontface.resource_path
                }
            )

        return jsonify(response_data)

    except:
        return jsonify({"error": "Invalid request"})
项目:flask-kerberos-login    作者:ContinuumIO    | 项目源码 | 文件源码
def extract_token(self):
        '''
        Extracts a token from the current HTTP request if it is available.

        Invokes the `save_user` callback if authentication is successful.
        '''
        header = request.headers.get(b'authorization')
        if header and header.startswith(b'Negotiate '):
            token = header[10:]
            user, token = _gssapi_authenticate(token, self._service_name)
            if token is not None:
                stack.top.kerberos_token = token

            if user is not None:
                self._save_user(user)
            else:
                # Invalid Kerberos ticket, we could not complete authentication
                abort(403)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_manual_context_binding(self):
        app = flask.Flask(__name__)
        @app.route('/')
        def index():
            return 'Hello %s!' % flask.request.args['name']

        ctx = app.test_request_context('/?name=World')
        ctx.push()
        self.assert_equal(index(), 'Hello World!')
        ctx.pop()
        try:
            index()
        except RuntimeError:
            pass
        else:
            self.assert_true(0, 'expected runtime error')
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_greenlet_context_copying(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            def g():
                self.assert_false(flask.request)
                self.assert_false(flask.current_app)
                with reqctx:
                    self.assert_true(flask.request)
                    self.assert_equal(flask.current_app, app)
                    self.assert_equal(flask.request.path, '/')
                    self.assert_equal(flask.request.args['foo'], 'bar')
                self.assert_false(flask.request)
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def test_greenlet_context_copying_api(self):
        app = flask.Flask(__name__)
        greenlets = []

        @app.route('/')
        def index():
            reqctx = flask._request_ctx_stack.top.copy()
            @flask.copy_current_request_context
            def g():
                self.assert_true(flask.request)
                self.assert_equal(flask.current_app, app)
                self.assert_equal(flask.request.path, '/')
                self.assert_equal(flask.request.args['foo'], 'bar')
                return 42
            greenlets.append(greenlet(g))
            return 'Hello World!'

        rv = app.test_client().get('/?foo=bar')
        self.assert_equal(rv.data, b'Hello World!')

        result = greenlets[0].run()
        self.assert_equal(result, 42)

    # Disable test if we don't have greenlets available
项目:who_the_hill    作者:newsdev    | 项目源码 | 文件源码
def recongize():
    '''
    Receives POST request from Twilio. Sends data from request to Amazon Rekognize and receives the API's analysis.
    Returns the resulting analysis as JSON.
    '''
    # Reads image data from incoming request
    r = request
    data = r.files['file'].read()

    # Sends image data to Amazon Rekognize for analysis. Returns JSON response of results.
    try:
        results = client.recognize_celebrities(Image={'Bytes': data})
    except botocore.exceptions.ClientError as e:
        results = {"Rekognition Client Error": str(e)}
    logging.info(results)
    return jsonify(results)
项目:mixmatch    作者:openstack    | 项目源码 | 文件源码
def chunked_reader():
    try:
        # If we're running under uWSGI, use the uwsgi.chunked_read method
        # to read chunked input.
        import uwsgi  # noqa

        while True:
            chunk = uwsgi.chunked_read()
            if len(chunk) > 0:
                yield chunk
            else:
                return
    except ImportError:
        # Otherwise try to read the wsgi input. This works in embedded Apache.
        stream = flask.request.environ["wsgi.input"]
        try:
            while True:
                yield stream.next()
        except Exception:
            return
项目:coinbin.org    作者:kennethreitz    | 项目源码 | 文件源码
def get_history(coin):
    c = Coin(coin.lower())

    q = "SELECT * from api_coin WHERE name=:coin ORDER BY date desc"

    if request.args.get('key') in API_KEYS:
        print(crayons.red('Pro request!'))
        rows = pro_db.query(q, coin=c.name)
    else:
        rows = db.query(q, coin=c.name)

    return jsonify(history=[
        {
            'value': r.value,
            'value.currency': 'USD',
            'timestamp': maya.MayaDT.from_datetime(r.date).subtract(hours=4).iso8601(),
            'when': maya.MayaDT.from_datetime(r.date).subtract(hours=4).slang_time()
        } for r in rows]
    )
项目:CodeGra.de    作者:CodeGra-de    | 项目源码 | 文件源码
def second_phase_lti_launch(
) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]]
                          ]:
    launch_params = jwt.decode(
        flask.request.headers.get('Jwt', None),
        app.config['LTI_SECRET_KEY'],
        algorithm='HS512'
    )['params']
    lti = CanvasLTI(launch_params)

    user, new_token = lti.ensure_lti_user()
    course = lti.get_course()
    assig = lti.get_assignment(user)
    lti.set_user_role(user)
    new_role_created = lti.set_user_course_role(user, course)
    db.session.commit()

    result: t.Mapping[str, t.Union[str, models.Assignment, bool]]
    result = {'assignment': assig, 'new_role_created': new_role_created}
    if new_token is not None:
        result['access_token'] = new_token

    return helpers.jsonify(result)
项目:DnsE16    作者:pooleja    | 项目源码 | 文件源码
def validate_sig(body, sig_str, pkh):
    """
    Validate the signature on the body of the request - throws exception if not valid.
    """
    # Check permission to update
    if (pkh is None):
        raise PermissionError("pkh not found.")

    # Validate the pkh format
    base58.b58decode_check(pkh)
    if (len(pkh) < 20) or (len(pkh) > 40):
        raise PermissionError("Invalid pkh")

    try:
        if not sig_str:
            raise PermissionError("X-Bitcoin-Sig header not found.")
        if not wallet.verify_bitcoin_message(body, sig_str, pkh):
            raise PermissionError("X-Bitcoin-Sig header not valid.")
    except Exception as err:
        logger.error("Failure: {0}".format(err))
        raise PermissionError("X-Bitcoin-Sig header validation failed.")

    return True
项目:conditional    作者:ComputerScienceHouse    | 项目源码 | 文件源码
def clear_cache():
    user_name = request.headers.get('x-webauth-user')
    account = ldap_get_member(user_name)

    if not ldap_is_eval_director(account) and not ldap_is_rtp(account):
        return redirect("/dashboard")

    log = logger.new(request=request)
    log.info('Purge All Caches')

    _ldap_is_member_of_directorship.cache_clear()
    ldap_get_member.cache_clear()
    ldap_get_active_members.cache_clear()
    ldap_get_intro_members.cache_clear()
    ldap_get_onfloor_members.cache_clear()
    ldap_get_current_students.cache_clear()

    get_voting_members.cache_clear()
    get_members_info.cache_clear()
    get_onfloor_members.cache_clear()
    return "cache cleared", 200
项目:conditional    作者:ComputerScienceHouse    | 项目源码 | 文件源码
def database_processor(logger, log_method, event_dict): # pylint: disable=unused-argument, redefined-outer-name
    if 'request' in event_dict:
        if event_dict['method'] != 'GET':
            log = UserLog(
                ipaddr=event_dict['ip'],
                user=event_dict['user'],
                method=event_dict['method'],
                blueprint=event_dict['blueprint'],
                path=event_dict['path'],
                description=event_dict['event']
                )
            db.session.add(log)
            db.session.flush()
            db.session.commit()
        del event_dict['request']
    return event_dict
项目:pyupdater-wx-demo    作者:wettenhj    | 项目源码 | 文件源码
def RunFileServer(fileServerDir, fileServerPort):
    """
    Run a Flask file server on the given port.

    Explicitly specify instance_path, because Flask's
    auto_find_instance_path can fail when run in a frozen app.
    """
    app = Flask(__name__, instance_path=fileServerDir)

    @app.route('/fileserver-is-ready', methods=['GET'])
    def FileserverIsReady():  # pylint: disable=unused-variable
        """
        Used to test if file server has started.
        """
        return 'Fileserver is ready!'

    @app.route('/<path:filename>', methods=['GET'])
    def ServeFile(filename):  # pylint: disable=unused-variable
        """
        Serves up a file from PYUPDATER_FILESERVER_DIR.
        """
        return send_from_directory(fileServerDir, filename.strip('/'))

    def ShutDownServer():
        """
        Shut down the file server.
        """
        func = request.environ.get('werkzeug.server.shutdown')
        if func is None:
            raise RuntimeError('Not running with the Werkzeug Server')
        func()

    @app.route('/shutdown', methods=['POST'])
    def ShutDown():  # pylint: disable=unused-variable
        """
        Respond to a POSTed request to shut down the file server.
        """
        ShutDownServer()
        return 'Server shutting down...'

    app.run(host=LOCALHOST, port=fileServerPort)
项目:PimuxBot    作者:Finn10111    | 项目源码 | 文件源码
def request_password():
    jid = request.form.get('jid')
    re = s.query(RecoveryEmail).filter(RecoveryEmail.jid==jid, RecoveryEmail.confirmed==True).one_or_none()
    if re:
        password_code = ''.join(random.SystemRandom().choice(string.ascii_lowercase + string.ascii_uppercase + string.digits) for _ in range(64))
        re.password_code = password_code
        s.merge(re)
        s.commit()
        password_code_link = 'https://www.pimux.de/password/?code=%s' % password_code
        sendMail(re.email, 'password reset request', 'click here: %s' % password_code_link)
        content = render_template('success.html', message='link was sent')
    else:
        content = render_template('error.html', message='user not found')
    return content
项目:active_stream    作者:flinder    | 项目源码 | 文件源码
def connected():
    logging.debug('Received connect request')
    emit('log', {'data': 'Connected'})