我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用flask.request.query_string()。
def url(): # check if the post request has the file part url = request.query_string[2:] if url == "": #print("not url") return redirect("/") title = uuid.uuid4().hex path = os.path.join(app.config['UPLOAD_FOLDER'], title) # get url file (filename, headers) = urllib.urlretrieve(url, path) # convert to jpg im = Image.open(path) if im.mode != "RGB": im = im.convert("RGB") im.save(path+".jpg", "JPEG") FLAGS.input_files = path+".jpg" # inference result = main(None) return jsonify(result)
def get_info_hash(request, multiple=False): """ Get infohashes from a QS. """ if not multiple: return b2a_hex(cgi.parse_qs(request.query_string)['info_hash'][0]) else: hashes = set() for hash in cgi.parse_qs(request.query_string)['info_hash']: hashes.add(b2a_hex(hash)) return hashes
def get_hosts(): """Return all hosts.""" #Check for query string, redirect to endpoint with trailling '/'. if request.query_string: return redirect(url_for('run_cmd') + '?' + request.query_string) hosts = RSPET_API.get_hosts() return jsonify({'hosts': [make_public_host(hosts[h_id], h_id) for h_id in hosts]})
def get_host(host_id): """Return specific host.""" #Check for query string, redirect to endpoint with trailling '/'. if request.query_string: return redirect(url_for('run_cmd_host', host_id=host_id) + '?' + request.query_string) hosts = RSPET_API.get_hosts() try: host = hosts[host_id] except KeyError: abort(404) return jsonify(make_public_host(host, host_id))
def prepare_auth_request(request): url_data = urlparse(request.url) return { "https": 'on', 'http_host': request.host, 'server_port': url_data.port, 'script_name': request.path, 'get_data': request.args.copy(), 'post_data': request.form.copy(), # Uncomment if using ADFS as IdP, https://github.com/onelogin/python-saml/pull/144 # 'lowercase_urlencoding': True, 'query_string': request.query_string }
def get_current_url(): if not request.query_string: return request.path return '%s?%s' % (request.path, request.query_string)
def img(): i = request.query_string f = open('a.png','wb') f.write(i.decode('base64')) return "success <img src='" + i + "'>"
def announce(): '''/announce.php?info_hash=&peer_id=&ip=&port=&uploaded=&downloaded=&left=&numwant=&key=&compact=1''' ip = request.args.get('ip') port = request.args.get('port') if not ip or not port: return abort(404) address = (ip, int(port)) binhash = urlparse.parse_qs(request.query_string)['info_hash'][0] country = geoip.country_code_by_addr(ip) if country not in ('CN','TW','JP','HK', 'KR'): return abort(404) rpc.announce(binhash.encode('hex'), address) return bencode({'peers': '', 'interval': 86400})
def try_get_banner(user_id, sizename, privacy=0): if sizename.endswith(".png"): sizename = sizename[:-4] if sizename not in sizemap: abort(404) if len(str(user_id)) != 9: abort(404) size = sizemap[sizename] try: data, mtime = get_data(user_id) key = "%d_p%d" % (user_id, privacy) privatize(data, privacy) res = get_sized_banner(key, data, mtime, size) if request.query_string == "dl": res.headers['Content-Disposition'] = 'attachment; filename=%d_p%d_%s.png' % (user_id, privacy, sizename) return res except APIError as e: if e.code == 1457: return send_file("static/error_404_%d.png" % size, mimetype="image/png", cache_timeout=60) elif e.code == 101: return send_file("static/error_503_%d.png" % size, mimetype="image/png", cache_timeout=60) else: app.logger.exception("API error for %r/%r/%r" % (user_id, sizename, privacy)) return send_file("static/error_%d.png" % size, mimetype="image/png", cache_timeout=60) except Exception as e: app.logger.exception("Exception thrown for %r/%r/%r" % (user_id, sizename, privacy)) return send_file("static/error_%d.png" % size, mimetype="image/png", cache_timeout=60)
def try_get_snap(snap, sizename): if sizename.endswith(".png"): sizename = sizename[:-4] if sizename not in sizemap: abort(404) size = sizemap[sizename] data = load_snap(snap) key = "s_" + snap res = get_sized_banner(key, data, None, size, max_age=None) if request.query_string == "dl": res.headers['Content-Disposition'] = 'attachment; filename=snap_%s_%s.png' % (snap, sizename) return res
def _get_devices(): db = aeon_ztp.db.session to_json = device_schema # --------------------------------------------------------------- # if the request has arguments, use these to form an "and" filter # and return only the subset of items matching # --------------------------------------------------------------- if request.args: try: recs = find_devices(db, request.args.to_dict()) if len(recs) == 0: return jsonify(ok=False, message='Not Found: %s' % request.query_string), 404 items = [to_json.dump(rec).data for rec in recs] return jsonify(count=len(items), items=items) except AttributeError: return jsonify(ok=False, message='invalid arguments'), 500 # ------------------------------------------- # otherwise, return all items in the database # ------------------------------------------- items = [to_json.dump(rec).data for rec in db.query(Device).all()] return jsonify(count=len(items), items=items) # ----------------------------------------------------------------------------- # POST /api/devices # -----------------------------------------------------------------------------
def _delete_devices(): if request.args.get('all'): try: db = aeon_ztp.db.session db.query(Device).delete() db.commit() except Exception as exc: return jsonify( ok=False, message='unable to delete all records: {}'.format(exc.message)), 400 return jsonify(ok=True, message='all records deleted') elif request.args: db = aeon_ztp.db.session try: recs = find_devices(db, request.args.to_dict()) n_recs = len(recs) if n_recs == 0: return jsonify(ok=False, message='Not Found: %s' % request.query_string), 404 for dev in recs: db.delete(dev) db.commit() return jsonify( ok=True, count=n_recs, message='{} records deleted'.format(n_recs)) except AttributeError: return jsonify(ok=False, message='invalid arguments'), 500 except Exception as exc: msg = 'unable to delete specific records: {}'.format(exc.message) return jsonify(ok=False, message=msg), 500 else: return jsonify(ok=False, message='all or filter required'), 400
def home_project(): """Fetches the home project, creating it if necessary. Eve projections are supported, but at least the following fields must be present: 'permissions', 'category', 'user' """ from pillar.auth import current_user user_id = current_user.user_id roles = current_user.roles log.debug('Possibly creating home project for user %s with roles %s', user_id, roles) if HOME_PROJECT_USERS and not HOME_PROJECT_USERS.intersection(roles): log.debug('User %s is not a subscriber, not creating home project.', user_id) return 'No home project', 404 # Create the home project before we do the Eve query. This costs an extra round-trip # to the database, but makes it easier to do projections correctly. if not has_home_project(user_id): write_access = write_access_with_roles(roles) create_home_project(user_id, write_access) resp, _, _, status, _ = get('projects', category='home', user=user_id) if status != 200: return utils.jsonify(resp), status if resp['_items']: project = resp['_items'][0] else: log.warning('Home project for user %s not found, while we just created it! Could be ' 'due to projections and other arguments on the query string: %s', user_id, request.query_string) return 'No home project', 404 return utils.jsonify(project), status
def setSpeed(): print 'gotParams' print request.query_string print request.args print request.args.getlist('name[]') # print request.form.get('left',1,type=int) #print request.form.get('right',1,type=int) print request.args['right'] print request.args['left'] return ''
def auth_decorator(func): @wraps(func) def decorator_func(*args,**kwargs): user = request.headers.get('user') api_key = request.headers.get('api_key') # api_secret = request.headers.get('api_secret') user_hash = request.headers.get('hash') user_timestamp = request.headers.get('timestamp') if not user or not api_key : return jsonify("Error: Invalid Request"),412 if not hash or not user_timestamp or not user_hash: return jsonify("Error: Invalid Request"), 412 server_key = get_key(api_key,user) if not server_key: return jsonify("key not found"),412 timestamp_hash = generate_hmac(str(server_key), str(user_timestamp)) #for get request if request.method == 'GET': url = request.path + '?' + request.query_string if request.query_string else request.path server_hash = generate_hmac(str(timestamp_hash), str(url)) if hmac.compare_digest(server_hash, user_hash): return func(*args, **kwargs) else: return jsonify("Error : HMAC is not matched"), 412 #change with the hmac # server_hash = base64.base64encode(str(server_key),str(url)) # if user_hash == server_hash: # return func(*args,**kwargs) # else : # return jsonify("Error: HMAC is not matched"),412 if request.method == 'POST': #check for file upload data = request.data.decode('utf-8') server_hash = generate_hmac(str(timestamp_hash),data) if hmac.compare_digest(server_hash,user_hash): return func(*args, **kwargs) else: return jsonify("Error : HMAC is not matched"), 412 return decorator_func