Python flask.request 模块,query_string() 实例源码

我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用flask.request.query_string()

项目:im2txt_api    作者:mainyaa    | 项目源码 | 文件源码
def url():
  # check if the post request has the file part
  url = request.query_string[2:]
  if url == "":
    #print("not url")
    return redirect("/")
  title = uuid.uuid4().hex
  path = os.path.join(app.config['UPLOAD_FOLDER'], title)

  # get url file
  (filename, headers) = urllib.urlretrieve(url, path)
  # convert to jpg
  im = Image.open(path)
  if im.mode != "RGB":
    im = im.convert("RGB")
  im.save(path+".jpg", "JPEG")

  FLAGS.input_files = path+".jpg"
  # inference
  result = main(None)
  return jsonify(result)
项目:zappa-bittorrent-tracker    作者:Miserlou    | 项目源码 | 文件源码
def get_info_hash(request, multiple=False):
    """
    Get infohashes from a QS.
    """
    if not multiple:
        return b2a_hex(cgi.parse_qs(request.query_string)['info_hash'][0])
    else:
        hashes = set()
        for hash in cgi.parse_qs(request.query_string)['info_hash']:
            hashes.add(b2a_hex(hash))
    return hashes
项目:RSPET    作者:panagiks    | 项目源码 | 文件源码
def get_hosts():
    """Return all hosts."""
    #Check for query string, redirect to endpoint with trailling '/'.
    if request.query_string:
        return redirect(url_for('run_cmd') + '?' + request.query_string)
    hosts = RSPET_API.get_hosts()
    return jsonify({'hosts': [make_public_host(hosts[h_id], h_id) for h_id in hosts]})
项目:RSPET    作者:panagiks    | 项目源码 | 文件源码
def get_host(host_id):
    """Return specific host."""
    #Check for query string, redirect to endpoint with trailling '/'.
    if request.query_string:
        return redirect(url_for('run_cmd_host', host_id=host_id) + '?' + request.query_string)
    hosts = RSPET_API.get_hosts()
    try:
        host = hosts[host_id]
    except KeyError:
        abort(404)
    return jsonify(make_public_host(host, host_id))
项目:fame    作者:certsocietegenerale    | 项目源码 | 文件源码
def prepare_auth_request(request):
    url_data = urlparse(request.url)
    return {
        "https": 'on',
        'http_host': request.host,
        'server_port': url_data.port,
        'script_name': request.path,
        'get_data': request.args.copy(),
        'post_data': request.form.copy(),
        # Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144
        # 'lowercase_urlencoding': True,
        'query_string': request.query_string
    }
项目:mmpdb    作者:rdkit    | 项目源码 | 文件源码
def get_current_url():
    if not request.query_string:
        return request.path
    return '%s?%s' % (request.path, request.query_string)
项目:Quiver-alfred    作者:danielecook    | 项目源码 | 文件源码
def get_current_url():
    if not request.query_string:
        return request.path
    return '%s?%s' % (request.path, request.query_string)
项目:amoc-project    作者:ajayns    | 项目源码 | 文件源码
def img():
  i = request.query_string
  f = open('a.png','wb')
  f.write(i.decode('base64'))
  return "success <img src='" + i + "'>"
项目:ssbc    作者:banwagong-news    | 项目源码 | 文件源码
def announce():
    '''/announce.php?info_hash=&peer_id=&ip=&port=&uploaded=&downloaded=&left=&numwant=&key=&compact=1'''
    ip = request.args.get('ip')
    port = request.args.get('port')
    if not ip or not port:
        return abort(404)
    address = (ip, int(port))
    binhash = urlparse.parse_qs(request.query_string)['info_hash'][0]
    country = geoip.country_code_by_addr(ip)
    if country not in ('CN','TW','JP','HK', 'KR'):
        return abort(404)
    rpc.announce(binhash.encode('hex'), address)
    return bencode({'peers': '', 'interval': 86400})
项目:deresuteme    作者:marcan    | 项目源码 | 文件源码
def try_get_banner(user_id, sizename, privacy=0):
    if sizename.endswith(".png"):
        sizename = sizename[:-4]
    if sizename not in sizemap:
        abort(404)
    if len(str(user_id)) != 9:
        abort(404)
    size = sizemap[sizename]
    try:
        data, mtime = get_data(user_id)
        key = "%d_p%d" % (user_id, privacy)
        privatize(data, privacy)
        res = get_sized_banner(key, data, mtime, size)
        if request.query_string == "dl":
            res.headers['Content-Disposition'] = 'attachment; filename=%d_p%d_%s.png' % (user_id, privacy, sizename)
        return res
    except APIError as e:
        if e.code == 1457:
            return send_file("static/error_404_%d.png" % size, mimetype="image/png", cache_timeout=60)
        elif e.code == 101:
            return send_file("static/error_503_%d.png" % size, mimetype="image/png", cache_timeout=60)
        else:
            app.logger.exception("API error for %r/%r/%r" % (user_id, sizename, privacy))
            return send_file("static/error_%d.png" % size, mimetype="image/png", cache_timeout=60)
    except Exception as e:
        app.logger.exception("Exception thrown for %r/%r/%r" % (user_id, sizename, privacy))
        return send_file("static/error_%d.png" % size, mimetype="image/png", cache_timeout=60)
项目:deresuteme    作者:marcan    | 项目源码 | 文件源码
def try_get_snap(snap, sizename):
    if sizename.endswith(".png"):
        sizename = sizename[:-4]
    if sizename not in sizemap:
        abort(404)
    size = sizemap[sizename]
    data = load_snap(snap)
    key = "s_" + snap
    res = get_sized_banner(key, data, None, size, max_age=None)
    if request.query_string == "dl":
        res.headers['Content-Disposition'] = 'attachment; filename=snap_%s_%s.png' % (snap, sizename)
    return res
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def get_current_url():
    if not request.query_string:
        return request.path
    return '%s?%s' % (request.path, request.query_string)
项目:metrics    作者:Jeremy-Friedman    | 项目源码 | 文件源码
def get_current_url():
    if not request.query_string:
        return request.path
    return '%s?%s' % (request.path, request.query_string)
项目:aeon-ztps    作者:Apstra    | 项目源码 | 文件源码
def _get_devices():
    db = aeon_ztp.db.session
    to_json = device_schema

    # ---------------------------------------------------------------
    # if the request has arguments, use these to form an "and" filter
    # and return only the subset of items matching
    # ---------------------------------------------------------------

    if request.args:
        try:
            recs = find_devices(db, request.args.to_dict())
            if len(recs) == 0:
                return jsonify(ok=False,
                               message='Not Found: %s' % request.query_string), 404

            items = [to_json.dump(rec).data for rec in recs]
            return jsonify(count=len(items), items=items)

        except AttributeError:
            return jsonify(ok=False, message='invalid arguments'), 500

    # -------------------------------------------
    # otherwise, return all items in the database
    # -------------------------------------------

    items = [to_json.dump(rec).data for rec in db.query(Device).all()]
    return jsonify(count=len(items), items=items)


# -----------------------------------------------------------------------------
#                                 POST /api/devices
# -----------------------------------------------------------------------------
项目:aeon-ztps    作者:Apstra    | 项目源码 | 文件源码
def _delete_devices():
    if request.args.get('all'):
        try:
            db = aeon_ztp.db.session
            db.query(Device).delete()
            db.commit()

        except Exception as exc:
            return jsonify(
                ok=False,
                message='unable to delete all records: {}'.format(exc.message)), 400

        return jsonify(ok=True, message='all records deleted')

    elif request.args:
        db = aeon_ztp.db.session
        try:
            recs = find_devices(db, request.args.to_dict())
            n_recs = len(recs)
            if n_recs == 0:
                return jsonify(ok=False,
                               message='Not Found: %s' % request.query_string), 404

            for dev in recs:
                db.delete(dev)
            db.commit()
            return jsonify(
                ok=True, count=n_recs,
                message='{} records deleted'.format(n_recs))

        except AttributeError:
            return jsonify(ok=False, message='invalid arguments'), 500

        except Exception as exc:
            msg = 'unable to delete specific records: {}'.format(exc.message)
            return jsonify(ok=False, message=msg), 500
    else:
        return jsonify(ok=False, message='all or filter required'), 400
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def home_project():
    """Fetches the home project, creating it if necessary.

    Eve projections are supported, but at least the following fields must be present:
        'permissions', 'category', 'user'
    """
    from pillar.auth import current_user

    user_id = current_user.user_id
    roles = current_user.roles

    log.debug('Possibly creating home project for user %s with roles %s', user_id, roles)
    if HOME_PROJECT_USERS and not HOME_PROJECT_USERS.intersection(roles):
        log.debug('User %s is not a subscriber, not creating home project.', user_id)
        return 'No home project', 404

    # Create the home project before we do the Eve query. This costs an extra round-trip
    # to the database, but makes it easier to do projections correctly.
    if not has_home_project(user_id):
        write_access = write_access_with_roles(roles)
        create_home_project(user_id, write_access)

    resp, _, _, status, _ = get('projects', category='home', user=user_id)
    if status != 200:
        return utils.jsonify(resp), status

    if resp['_items']:
        project = resp['_items'][0]
    else:
        log.warning('Home project for user %s not found, while we just created it! Could be '
                    'due to projections and other arguments on the query string: %s',
                    user_id, request.query_string)
        return 'No home project', 404

    return utils.jsonify(project), status
项目:tensorbot    作者:jtoy    | 项目源码 | 文件源码
def setSpeed():
    print 'gotParams'
    print request.query_string
    print request.args
    print request.args.getlist('name[]')
    #   print request.form.get('left',1,type=int)

    #print request.form.get('right',1,type=int)
    print request.args['right']
    print request.args['left']

    return ''
项目:FlaskBackend    作者:iamrajhans    | 项目源码 | 文件源码
def auth_decorator(func):

    @wraps(func)
    def decorator_func(*args,**kwargs):
        user        = request.headers.get('user')
        api_key     = request.headers.get('api_key')
        # api_secret  = request.headers.get('api_secret')
        user_hash   = request.headers.get('hash')
        user_timestamp   = request.headers.get('timestamp')

        if not user or not api_key :
            return jsonify("Error: Invalid Request"),412

        if not hash or not user_timestamp or not user_hash:
            return jsonify("Error: Invalid Request"), 412

        server_key = get_key(api_key,user)

        if not server_key:
            return jsonify("key not found"),412

        timestamp_hash = generate_hmac(str(server_key), str(user_timestamp))
        #for get request

        if request.method == 'GET':
            url = request.path + '?' + request.query_string if request.query_string else request.path
            server_hash = generate_hmac(str(timestamp_hash), str(url))
            if hmac.compare_digest(server_hash, user_hash):
                return func(*args, **kwargs)
            else:
                return jsonify("Error : HMAC is not matched"), 412
            #change with the hmac
            # server_hash = base64.base64encode(str(server_key),str(url))
            # if user_hash == server_hash:
            #     return func(*args,**kwargs)

            # else :
                # return jsonify("Error: HMAC is not matched"),412


        if request.method == 'POST':
            #check for file upload
            data = request.data.decode('utf-8')
            server_hash = generate_hmac(str(timestamp_hash),data)
            if hmac.compare_digest(server_hash,user_hash):
                return func(*args, **kwargs)
            else:
                return jsonify("Error : HMAC is not matched"), 412
    return decorator_func