我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.request.values()。
def deployment(): """ """ hasFiles = False if request.files.getlist('file')[0]: hasFiles = True DATA = {'files': list(zip(request.files.getlist('file'), request.form.getlist('File Type'), request.form.getlist('file_code')))} for file, fileType, fileCod in DATA['files']: if fileType in ['static', 'data'] and not fileCod: return json.dumps('Error - You should specify a file code for statics and outputs'), 500 env = request.values['env'] isNew = True if request.values['isNew'] == 'true' else False if isNew: createEnv(env) if hasFiles: return deployFiles(env, DATA) return json.dumps("Environment created"), 200
def crawl(): try: depth_limit = int(request.values['depth']) except ValueError as e: return "Depth parameter must be a number", 400 except: depth_limit = 1 if 'url' in request.values: url = request.values['url'] parsed_url = urlparse.urlsplit(url) if parsed_url.scheme not in ['http', 'https']: return "Only http and https protocols are supported", 400 if parsed_url.netloc == '': return "Missing domain", 400 allowed_domains = [ parsed_url.netloc ] crawler = Crawler(allowed_domains, depth_limit) crawler.crawl(url) return jsonify(**crawler.crawled) else: return "Missing url parameter", 400
def verify_slack_username(http_method, api_method, method_acl_userlist): user_name = '' if http_method == 'GET': user_name = request.args.get('user_name', '') elif http_method == 'POST': user_name = request.values['user_name'] else: raise Exception("Error: unsupported http_method(%s)" % (http_method)) if "*" in method_acl_userlist: super_name_list = method_acl_userlist["*"] if user_name in super_name_list: return None if api_method in method_acl_userlist: allowed_user_list = method_acl_userlist[api_method] if user_name not in allowed_user_list: content = "Error: user(%s) not allowed to call api_method(%s)" % \ (user_name, api_method) raise Exception(content) return None
def join(id): event = find_event(id) if event == None: return not_found() user = find_user(request.values.get('user_param'), request.values.get('user_param_value')) if user == None: return user_not_specified() join_event(user, event) return jsonify({ 'joined': True, 'rsvps_disabled': False, 'event_archived': False, 'over_capacity': False, 'past_deadline': False, })
def chan_friends(channel): """ If you GET this endpoint, go to /api/v1/channel/<channel>/friend with <channel> replaced for the channel of the friends you want to get <channel> can either be an int that matches the channel, or a string that matches the owner's username """ # model = request.path.split("/")[-1] model = "Friend" if channel.isdigit(): fields = {"channelId": int(channel)} else: fields = {"owner": channel.lower()} packet, code = generate_response( model, request.path, request.method, request.values, data=results, fields=fields ) return make_response(jsonify(packet), code) # There was an error! # if not str(code).startswith("2"): # return make_response(jsonify(packet), code) # NOTE: Not needed currently, but this is how you would check # TODO: Fix this endpoint to remove timing elements (friends are forever) # TODO: Use Object.update(**changes) instead of Object(**updated_object).save()
def chan_messages(channel): """ If you GET this endpoint, go to /api/v1/channel/<channel>/messages with <channel> replaced for the messages you want to get """ model = "Message" if channel.isdigit(): fields = {"channelId": int(channel)} else: fields = {"owner": channel.lower()} packet, code = generate_response( model, request.path, request.method, request.values, fields=fields ) return make_response(jsonify(packet), code)
def user_quotes(channel): """ If you GET this endpoint, go to /api/v1/channel/<channel>/quote with <channel> replaced for the channel you want to get quotes for """ model = "quote" if channel.isdigit(): fields = {"channelId": int(channel), "deleted": False} else: fields = {"owner": channel.lower(), "deleted": False} packet, code = generate_response( model, request.path, request.method, request.values, fields=fields ) return make_response(jsonify(packet), code)
def user_commands(channel): """ If you GET this endpoint, simply go to /api/v1/channel/<channel>/command with <channel> replaced for the channel you want to get commands for """ model = "Command" if channel.isdigit(): fields = {"channelId": int(channel), "deleted": False} else: fields = {"channelName": channel.lower(), "deleted": False} packet, code = generate_response( model, request.path, request.method, request.values, fields=fields ) return make_response(jsonify(packet), code)
def handle_states(): '''Returns all the states from the database as JSON objects with a GET request, or adds a new state to the database with a POST request. Refer to exception rules of peewee `get()` method for additional explanation of how the POST request is handled: http://docs.peewee-orm.com/en/latest/peewee/api.html#SelectQuery.get ''' if request.method == 'GET': list = ListStyle().list(State.select(), request) return jsonify(list), 200 elif request.method == 'POST': params = request.values '''Check that all the required parameters are made in request.''' required = set(["name"]) <= set(request.values.keys()) if required is False: return jsonify(msg="Missing parameter."), 400 try: State.select().where(State.name == request.form['name']).get() return jsonify(code=10001, msg="State already exists"), 409 except State.DoesNotExist: state = State.create(name=request.form['name']) return jsonify(state.to_dict()), 201
def handle_state_id(state_id): '''Select the state with the id from the database and store as the variable `state` with a GET request method. Update the data of the particular state with a PUT request method. This will take the parameters passed and update only those values. Remove the state with this id from the database with a DELETE request method. Keyword arguments: state_id: The id of the state from the database. ''' try: state = State.select().where(State.id == state_id).get() except State.DoesNotExist: return jsonify(msg="There is no state with this id."), 404 if request.method == 'GET': return jsonify(state.to_dict()), 200 elif request.method == 'DELETE': try: state = State.delete().where(State.id == state_id) except State.DoesNotExist: raise Exception("There is no state with this id.") state.execute() return jsonify(msg="State deleted successfully."), 200
def handle_sms(): """Handles Twillio SMS message""" customer_phone_number = request.values["From"] text_message_body = request.values["Body"] # Retrieve the customer from the DB or create a new one customer = Customer.get_customer_by_phone_number(customer_phone_number) if not customer: # Save the customer to the DB customer = Customer.create_new({ CFields.PHONE_NUMBER: customer_phone_number, }) customer.create() customer_lc.on_new_customer() messaging.on_message_recieve(customer, text_message_body) return jsonpickle.encode(dict( success=True ))
def engine(): if session.get('login_in',None): if session.get('username',None): if request.values.get('uuid',None): uuid = request.values['uuid'] session['uuid'] = uuid else: uuid = session['uuid'] username = session['username'] try: engine = models.Engine.query.filter_by(uuid = uuid, user_name = username).first() base_url = "tcp://" + engine.host + ":" + engine.port docker = DOCKER(base_url = base_url, timeout = 5, version = "auto") return render_template('engine.html', host_info = docker.get_info(), usage = docker.monitor()) except Exception as msg: return str(msg), 503 else: return "Error 401, Authentication failed", 401 else: return redirect('/login')
def change_passwd(): if session.get('login_in',None): if session.get('username',None): oldpassword = request.values['oldpassword'] newpassword = request.values['newpassword'] try: user = models.User.query.filter_by(username = session['username']).first() if check_password_hash(user.password, oldpassword): user.password = generate_password_hash(newpassword) db.session.add(user) db.session.commit() return jsonify(result="change sucessfull") else: return jsonify(result="change failed") except: db.session.rollback() return jsonify(result="change failed") finally: db.session.close() else: return redirect('/login') else: return redirect('/login')
def readGcodeFilesForOrigin(origin): if origin not in [FileDestinations.LOCAL, FileDestinations.SDCARD]: return make_response("Unknown origin: %s" % origin, 404) recursive = request.values.get("recursive", "false") in valid_boolean_trues force = request.values.get("force", "false") in valid_boolean_trues if force: with _file_cache_mutex: try: del _file_cache[origin] except KeyError: pass files = _getFileList(origin, recursive=recursive) if origin == FileDestinations.LOCAL: usage = psutil.disk_usage(settings().getBaseFolder("uploads")) return jsonify(files=files, free=usage.free, total=usage.total) else: return jsonify(files=files)
def _get_temperature_data(preprocessor): if not printer.is_operational(): return make_response("Printer is not operational", 409) tempData = printer.get_current_temperatures() if "history" in request.values.keys() and request.values["history"] in valid_boolean_trues: tempHistory = printer.get_temperature_history() limit = 300 if "limit" in request.values.keys() and unicode(request.values["limit"]).isnumeric(): limit = int(request.values["limit"]) history = list(tempHistory) limit = min(limit, len(history)) tempData.update({ "history": map(lambda x: preprocessor(x), history[-limit:]) }) return preprocessor(tempData)
def uploadLanguagePack(): input_name = "file" input_upload_path = input_name + "." + settings().get(["server", "uploads", "pathSuffix"]) input_upload_name = input_name + "." + settings().get(["server", "uploads", "nameSuffix"]) if not input_upload_path in request.values or not input_upload_name in request.values: return make_response("No file included", 400) upload_name = request.values[input_upload_name] upload_path = request.values[input_upload_path] exts = filter(lambda x: upload_name.lower().endswith(x), (".zip", ".tar.gz", ".tgz", ".tar")) if not len(exts): return make_response("File doesn't have a valid extension for a language pack archive", 400) target_path = settings().getBaseFolder("translations") if tarfile.is_tarfile(upload_path): _unpack_uploaded_tarball(upload_path, target_path) elif zipfile.is_zipfile(upload_path): _unpack_uploaded_zipfile(upload_path, target_path) else: return make_response("Neither zip file nor tarball included", 400) return getInstalledLanguagePacks()
def _get_locale(self): global LANGUAGES if "l10n" in request.values: return Locale.negotiate([request.values["l10n"]], LANGUAGES) if "X-Locale" in request.headers: return Locale.negotiate([request.headers["X-Locale"]], LANGUAGES) if hasattr(g, "identity") and g.identity and userManager.enabled: userid = g.identity.id try: user_language = userManager.getUserSetting(userid, ("interface", "language")) if user_language is not None and not user_language == "_default": return Locale.negotiate([user_language], LANGUAGES) except octoprint.users.UnknownUser: pass default_language = self._settings.get(["appearance", "defaultLanguage"]) if default_language is not None and not default_language == "_default" and default_language in LANGUAGES: return Locale.negotiate([default_language], LANGUAGES) return Locale.parse(request.accept_languages.best_match(LANGUAGES))
def get_alerts_by_form_parameters(): ''' Get an alert based on the ''' start = request.args.get('start') or request.form.get('start') end = request.args.get('end') or request.form.get('end') _logger.info('{}: {} - {}'.format(request.method, request.path, request.values)) # Convert to datetime objects start_datetime = _parse_date(start) end_datetime = _parse_date(end) alerts = s3_model.get_alerts_by_date(start_datetime, end_datetime) status_code = 200 success = True response = { 'success': success, 'alerts': alerts } return jsonify(response), status_code
def create(): """ ???? :return: """ username = request.values.get("username") password = request.values.get("password") email = request.values.get("email") gender = int(request.values.get("gender")) password_hash = generate_password_hash(password, method='pbkdf2:sha1', salt_length=8) user = User(username=username, password_hash=password_hash, email=email, gender=gender) user.save() return success()
def session_test(): if request.method == 'DELETE': session.pop('username') # ?? ?? return 'Session deleted!' else: if 'username' in session: # ?? ?? ?? ?? return 'Hello {0}'.format(session['username']) else: session['username'] = request.values['username'] # ?? ?? return 'Session appended!'
def __init__(self, request, columns, collection): self.columns = columns self.collection = collection # values specified by the datatable for filtering, sorting, paging self.request_values = request.values # results from the db self.result_data = None # total in the table after filtering self.cardinality_filtered = 0 # total in the table unfiltered self.cadinality = 0 self.run_queries()
def profile_change(): user = g.user if set(['password', 'question', 'answer']).issubset(request.values): password = request.values['password'] if is_valid_password(password): name = request.values['name'] question = request.values['question'] answer = request.values['answer'] user.name = name user.password = password user.question = question user.answer = answer db.session.add(user) db.session.commit() flash('Account information successfully changed.') else: flash('Password does not meet complexity requirements.') return redirect(url_for('ph_bp.profile'))
def guild_config_history(guild): def serialize(gcc): return { 'user': serialize_user(gcc.user_id), 'before': unicode(gcc.before_raw), 'after': unicode(gcc.after_raw), 'created_at': gcc.created_at.isoformat(), } q = GuildConfigChange.select(GuildConfigChange, User).join( User, on=(User.user_id == GuildConfigChange.user_id), ).where(GuildConfigChange.guild_id == guild.guild_id).order_by( GuildConfigChange.created_at.desc() ).paginate(int(request.values.get('page', 1)), 25) return jsonify(map(serialize, q))
def update(): if request.method == 'POST': user = request.values['user'] vector = request.values['vector'] if user not in logged_in: abort(404) vector = np.fromstring(vector, dtype=float, sep=',') if True: fm = FeatureMatrix() # nearest_neighbor = fm.match('A', np.array([.1,.2,.3,.5])) nearest_neighbor = fm.match(user, vector) else: nearest_neighbor = querySpark.match(user, vector) return jsonify(result='Success', neighbor=nearest_neighbor)
def login(): if request.method == 'POST': remote_ip = request.remote_addr print("Login from client IP",remote_ip) user = request.values['user'] pwd = request.values['password'] if user in logged_in: return jsonify(result='Success') if user in registered_ids.keys(): if registered_ids[user] == pwd: print("logging in",user) logged_in.append(user) #r.set(user,str(remote_ip)) return jsonify(result='Success') else: abort(404)
def get_history(targets, start='-1h', end='now'): """Retrieve the time series data for one or more metrics. Args: targets (str|List[str]): Graphite path, or list of paths. start (Optional[str]): Start of date range. Defaults to one hour ago. end (Optional[str]): End of date range. Defaults to now. Returns: A list of metric values. """ metrics = query(targets, start, end) try: metrics = json.loads(metrics)[0]['datapoints'] except: return [] # Convert unix timestamps to plot.ly's required date format for metric in metrics: timestamp = datetime.fromtimestamp(metric[1]) metric[1] = timestamp.strftime('%Y-%m-%d %H:%M:%S') return metrics
def plotly(metrics=[], name='', color=None): """Convert a list of metric values to a Plot.ly object. """ values, timestamps = zip(*metrics) trace = { 'type': 'scatter', 'x': timestamps, 'y': values, 'name': name, } if color: trace['marker'] = { color: color } return trace
def date_range(): """Ensure the requested date range is in a format Graphite will accept. """ range_from = request.values.get('from', '-1h') range_end = request.values.get('end', 'now') try: # Try to coerce date range into integers range_from = int(float(range_from) / 1000) range_end = int(float(range_end) / 1000) except: # ... or pass string directly to Graphite pass return (range_from, range_end)
def get_efo_info_from_code(self, efo_codes, **kwargs): params = SearchParams(**kwargs) if not isinstance(efo_codes, list): efo_codes = [efo_codes] query_body = addict.Dict() query_body.query.ids["values"] = efo_codes query_body.size = params.size if params.facets == 'true': query_body.aggregations.significant_therapeutic_areas.significant_terms.field = "therapeutic_labels.keyword" query_body.aggregations.significant_therapeutic_areas.significant_terms.size = 25 query_body.aggregations.significant_therapeutic_areas.significant_terms.min_doc_count = 2 if params.fields: query_body._source = params.fields if efo_codes: res = self._cached_search(index=self._index_efo, doc_type=self._docname_efo, body=query_body.to_dict() ) return PaginatedResult(res, params)
def get_evidences_by_id(self, evidenceid, **kwargs): if isinstance(evidenceid, str): evidenceid = [evidenceid] params = SearchParams(**kwargs) if params.datastructure == SourceDataStructureOptions.DEFAULT: params.datastructure = SourceDataStructureOptions.FULL res = self._cached_search(index=self._index_data, # doc_type=self._docname_data, body={"query": { "ids": {"values": evidenceid}, }, "size": len(evidenceid), } ) return SimpleResult(res, params, data=[hit['_source'] for hit in res['hits']['hits']])
def _get_labels_for_reactome_ids(self, reactome_ids): labels = defaultdict(str) if reactome_ids: res = self._cached_search(index=self._index_reactome, doc_type=self._docname_reactome, body={"query": { "ids": { "values": reactome_ids } }, '_source': {"includes": ['label']}, 'size': 10000, 'from': 0, } ) if res['hits']['total']: for hit in res['hits']['hits']: labels[hit['_id']] = hit['_source']['label'] return labels
def api_send_message(): print ('send message', request.values.get('type')) game.doStage(session['name'], request.values) return jsonify({'messages': 'success'})
def verify_slack_token(http_method, allowed_token_list): token = '' if http_method == 'GET': token = request.args.get('token', '') elif http_method == 'POST': token = request.values['token'] else: raise Exception("Error: unsupported http_method(%s)" % (http_method)) if token not in allowed_token_list: content = "Error: invalid token(%s)" % (token) raise Exception(content) return None
def get_response_url(http_method): response_url = '' if http_method == 'GET': response_url = request.args.get('response_url', '') elif http_method == 'POST': response_url = request.values['response_url'] else: raise Exception("Error: unsupported http_method(%s)" % (http_method)) if response_url == '': raise Exception("Error: fail to get response_url from the http request") return response_url
def update_job(job_id, job=None): result = Job.update(job_id, request.values) if result.get("updated", False): return json_resp(result) else: return json_resp(result), 450
def add_user(): result = User.create(request.values) if result.get("created", False): result["user"] = result["user"].as_object() return json_resp(result) else: return json_resp(result), 450
def update_pheno(pheno_id, pheno=None): result = Phenotype.update(pheno_id, request.values) if result.get("updated", False): return json_resp(result) else: return json_resp(result), 450
def webhook(): """ Twilio will make a post request to `/webhook` everytime if receives a new text message. This endpoint should both record the texter in the database, and use Twilio to send a response. The status code will be 201 if successful, and have an appropriate error code if not. Returns: object: The rendered JSON response. """ # pylint: disable=unused-variable if not request.values: return Response(status=400) phone_number = request.values.get("From") message_body = request.values.get("Body") if None in {phone_number, message_body}: return Response(status=400) Texter.record(phone_number) response = get_response(message_body) get_text_class().send_message(phone_number, response) return Response(status=201)
def set_param(param_name): """ POST /api/<version>/param/<param_name> {"value": "x"} POST to the ROS Parameter Server. Value should be in the value field of the request body. """ if not "value" in request.values: return error("No value supplied") rospy.set_param(param_name, request.values["value"]) return "", 204
def post_topic_message(topic_name): """ POST /api/<version>/topic/<topic_name> POST a message to a ROS topic by name. Requires a JSON payload of shape: ``[x, y, z]``. The values of the JSON array must match the argument types of the topic's type constructor. """ topic_name = "/" + topic_name # Get the list of ROS topics in the system. # Returns a list of [topic, type_string] pairs. topic_list = rostopic_master.getTopicTypes() # Find topic in list (this is Python's approach to find). try: topic_match = next(x for x in topic_list if x[0] == topic_name) except StopIteration: # If we can't find the topic, return early (400 bad request). return error("Topic does not exist") json = request.get_json(silent=True) args = json if json else [] try: # Get the message type constructor for the topic's type string. MsgType = get_message_class(topic_match[1]) pub = rospy.Publisher(topic_name, MsgType, queue_size=10) # Unpack JSON list and pass to publish. # pub.publish() will pass the list of arguments to the message # type constructor. pub.publish(*args) except Exception, e: return error("Wrong arguments for topic type constructor") return success("Posted message to topic")
def get_topic_data(topic_name): """ GET /api/<version>/topic_data/<topic_name> Get a single data point from a topic over HTTP. """ topic_name = "/" + topic_name try: msg_class, real_topic, _ = rostopic.get_topic_class(topic_name) except rostopic.ROSTopicIOException as e: raise e if not real_topic: return error("Topic does not exist", 404) msg = rospy.wait_for_message(real_topic, msg_class) data = getattr(msg, "data", None) if data is None: json = '{"status": [' for x in msg.status: if x == msg.status[-1]: json += '{"name": \"%s\", "level": %d, "message": \"%s\", "hardware_id": \"%s\", "values": %s}]}' % \ (x.name if x.name else "null", x.level, \ x.message if x.message else "null", x.hardware_id if x.hardware_id else "null", \ x.values) else: json += '{"name": \"%s\", "level": %d, "message": \"%s\", "hardware_id": \"%s\", "values": %s},' % \ (x.name if x.name else "null", x.level, \ x.message if x.message else "null", x.hardware_id if x.hardware_id else "null", \ x.values) return Response(json, mimetype = 'application/json') else: return jsonify({"result": data})
def filter_participants(event): include_participants = request.values.get('include_participants', '').lower() if include_participants == 'true': return event else: return {k: v for k, v in event.items() if k != 'participants'}
def events(): if 'created_at_or_after' in request.values: date = parse_date(request.values['created_at_or_after']) events = [event for event in api_data if created_on_or_after(event, date)] return render_events(sort_by_start_time(events)) elif 'ids' in request.values: ids = json.loads(request.values['ids']) return render_events(e for e in map(find_event, ids) if e) else: return render_events([])
def leave(id): event = find_event(id) if event == None: return not_found() user = find_user(request.values.get('user_param'), request.values.get('user_param_value')) if user == None: return user_not_specified() leave_event(user, event) return jsonify({})
def get_request_log(): # parse args and forms values = '' if len(request.values) > 0: for key in request.values: values += key + ': ' + request.values[key] + ', ' route = '/' + request.base_url.replace(request.host_url, '') request_log = { 'route': route, 'method': request.method, } # ??xml ?? category = request_log['route'].split('/')[1] if category != "virtual_card": return request_log if len(request.get_data()) != 0: body = json.loads(request.get_data()) if "password" in body: body.pop("password") request_log['body'] = body if len(values) > 0: request_log['values'] = values return request_log
def sonarr(): from slack_posting import postMsg postMsg(request.values, 'bots') return "ok" # API Stuff
def handle_books_id(place_id, book_id): '''Returns a JSON object of the book_id with a GET request method. Updates a booking's attributes with a POST request method. Removes a booking with a DELETE request method. Keyword arguments: place_id: The id of the place with the booking. book_id: The id of the booking. ''' try: book = PlaceBook.select().where(PlaceBook.id == book_id).get() except PlaceBook.DoesNotExist: raise Exception("There is no placebook with this id.") if request.method == 'GET': return jsonify(book.to_dict()), 200 elif request.method == 'PUT': params = request.values for key in params: if key == 'user': return jsonify(msg="You may not change the user."), 409 if key == 'updated_at' or key == 'created_at': continue setattr(book, key, params.get(key)) book.save() return jsonify(msg="Place book information updated successfully."), 200 elif request.method == 'DELETE': try: book = PlaceBook.delete().where(PlaceBook.id == book_id) except PlaceBook.DoesNotExist: raise Exception("There is no place with this id.") book.execute() return jsonify(msg="Place book deleted successfully."), 200
def handle_places(): '''Returns all the places with a GET request, or adds a new city to the database with a POST request. The parameters passed to the POST request iterate through the data and set the corresponding attributes of the instance to be inserted into the database. Will not set attribute passed as `updated_at` or `created_at`. ''' if request.method == 'GET': list = ListStyle().list(Place.select(), request) return jsonify(list), 200 elif request.method == 'POST': params = request.values place = Place() '''Check that all the required parameters are made in request.''' required = set(["owner", "city", "name"]) <= set(request.values.keys()) if required is False: return jsonify(msg="Missing parameter."), 400 for key in params: if key == 'updated_at' or key == 'created_at': continue setattr(place, key, params.get(key)) place.save() return jsonify(place.to_dict()), 200