我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.request.get_data()。
def before_request(): request_param = {key.lower(): value for key, value in request.environ.items() if key in ('CONTENT_TYPE', 'CONTENT_LENGTH', 'HTTP_HOST', 'HTTP_ACCEPT', 'HTTP_ACCEPT_ENCODING', 'HTTP_COOKIE', 'HTTP_USER_AGENT', 'PATH_INFO', 'QUERY_STRING', 'SERVER_PROTOCOL', 'REQUEST_METHOD', 'HTTP_HOST', 'SERVER_PORT', 'SERVER_SOFTWARE', 'REMOTE_ADDR', 'REMOTE_PORT', 'HTTP_ACCEPT_LANGUAGE')} g.request_raw_data = request.get_data().decode('utf8') g.request_time = datetime.now() g.api_method = request.args['method'] g.api_version = request.args['v'] g.request_param = request_param g.request_form = request.form.to_dict() if request.form else None try: g.request_json = request.get_json() if request.is_json else None except Exception: raise ApiSysExceptions.invalid_json
def verify_request_signature(func): @wraps(func) def decorated(*args, **kwargs): signature = request.headers.get('x-hub-signature', None) if signature: elements = signature.split('=') method = elements[0] signature_hash = elements[1] expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest() if signature_hash != expected_hash: LOGGER.error('Signature was invalid') return make_response('', 403) else: LOGGER.error('Could not validate the signature') return func(*args, **kwargs) return decorated
def kafka(TOPIC=None): # Lazy init of the Kafka producer # global PRODUCER if PRODUCER is None: PRODUCER = KafkaProducer( bootstrap_servers=KAFKA_BOOSTRAP_SERVERS, sasl_mechanism=KAFKA_SASL_MECHANISM, sasl_plain_username=KAFKA_USER, sasl_plain_password=KAFKA_PASSWORD) try: future = PRODUCER.send(TOPIC, request.get_data()) future.get(timeout=60) return "OK", 200, None except KafkaTimeoutError: return "Internal Server Error", 500, None
def socialcast(ALERTID=None, NUMRESULTS=10, TEAM=None, I=None, X=None): """ Create a post on Socialcast containing log events. Limited to `NUMRESULTS` (default 10) or 1MB. If `TEAM/I/X` is not passed, requires `SOCIALCASTURL` defined in the form `https://TEAM.socialcast.com/api/webhooks/IIIIIIIIII/XXXXXXXXXXX` For more information see https://socialcast.github.io/socialcast/apidoc/incoming_webhook.html """ if X is not None: URL = 'https://' + TEAM + '.socialcast.com/api/webhooks/' + I + '/' + X if not SOCIALCASTURL or not 'socialcast.com/api/webhooks' in SOCIALCASTURL: return ("SOCIALCASTURL parameter must be set properly, please edit the shim!\n", 500, None) else: URL = SOCIALCASTURL # Socialcast cares about the order of the information in the body # json.dumps() does not preserve the order so just use raw get_data() return callapi(URL, 'post', request.get_data())
def parse(request): """ Parse incoming JSON. Returns a dict or logs an exception. """ try: payload = request.get_json() if (payload is None): logging.exception("Payload is empty, did you specify a Header in the request?") raise alert = {} alert = parseLI(payload, alert) alert = parsevROps(payload, alert) except: logging.info("Body=%s" % request.get_data()) logging.exception("Unexpected payload, is it in proper JSON format?") raise logging.info("Parsed=%s" % alert) return alert
def test(ALERTID=None): """Log the auth header, request, and parsed moreinfo field. Respond with success. Don't send the payload anywhere.""" try: logging.info(request.headers['Authorization']) except KeyError: pass if request.get_data(): logging.info(request.get_data()) a = parse(request) try: logging.info(a['moreinfo']) except KeyError: pass return "OK" # Import individual shims
def query_sql_list(): user_info = cache.MyCache().get_user_info(current_user.id) if (user_info.group_id == settings.DBA_GROUP_ID and user_info.role_id == settings.ROLE_DEV): obj = get_object_from_json_tmp(request.get_data()) result_sql_list = sql_manager.get_sql_work_for_dba(obj) elif (user_info.role_id == settings.ROLE_DEV): obj = get_object_from_json_tmp(request.get_data()) result_sql_list = sql_manager.get_sql_work_for_dev(obj) elif (user_info.role_id == settings.ROLE_LEADER): obj = get_object_from_json_tmp(request.get_data()) result_sql_list = sql_manager.get_sql_work_for_leader(obj) else: obj = get_object_from_json(request.form) result_sql_list = sql_manager.get_sql_list(obj) return render_template("list_view.html", sql_list=result_sql_list, user_info=user_info, page_number=obj.page_number, page_list=get_page_number_list(obj.page_number), min_id=get_min_id(result_sql_list, obj.page_number)) # ??????????????????
def callback(): # get X-Line-Signature header value signature = request.headers['X-Line-Signature'] # get request body as text body = request.get_data(as_text=True) app.logger.info("Request body: " + body) event = request.get_json() botimize.log_incoming(event) # handle webhook body try: handler.handle(body, signature) except InvalidSignatureError: abort(400) return 'OK'
def predict(): # get data from drawing canvas and save as image parseImage(request.get_data()) # read parsed image back in 8-bit, black and white mode (L) x = imread('output.png', mode='L') x = np.invert(x) x = imresize(x,(28,28)) # reshape image data for use in neural network x = x.reshape(1,28,28,1) with graph.as_default(): out = model.predict(x) print(out) print(np.argmax(out, axis=1)) response = np.array_str(np.argmax(out, axis=1)) return response
def run_flask_request(environ): from .wsgi_aux import app if '_wsgi.input' in environ: environ['wsgi.input'] = BytesIO(environ['_wsgi.input']) # Create a request context similar to that of the original request # so that the task can have access to flask.g, flask.request, etc. with app.request_context(environ): # Record the fact that we are running in the Celery worker now g.in_celery = True # Run the route function and record the response try: rv = app.full_dispatch_request() except: # If we are in debug mode we want to see the exception # Else, return a 500 error if app.debug: raise rv = app.make_response(InternalServerError()) return (rv.get_data(), rv.status_code, rv.headers)
def wrapped(*args, **kwargs): # If we are already running the request on the celery side, then we # just call the wrapped function to allow the request to execute. if getattr(g, 'in_celery', False): return f(*args, **kwargs) # If we are on the Flask side, we need to launch the Celery task, # passing the request environment, which will be used to reconstruct # the request object. The request body has to be handled as a special # case, since WSGI requires it to be provided as a file-like object. environ = {k: v for k, v in request.environ.items() if isinstance(v, text_types)} if 'wsgi.input' in request.environ: environ['_wsgi.input'] = request.get_data() t = run_flask_request.apply_async(args=(environ,)) # Return a 202 response, with a link that the client can use to # obtain task status that is based on the Celery task id. if t.state == states.PENDING or t.state == states.RECEIVED or \ t.state == states.STARTED: return '', 202, {'Location': url_for('tasks.get_status', id=t.id)} # If the task already finished, return its return value as response. # This would be the case when CELERY_ALWAYS_EAGER is set to True. return t.info
def handle_messages(): payload = request.get_data() # Handle messages for sender_id, message in messaging_events(payload): # Start processing valid requests try: response = processIncoming(sender_id, message) if response is not None: send_message(PAT, sender_id, response) else: send_message(PAT, sender_id, "Sorry I don't understand that") except Exception, e: print e traceback.print_exc() return "ok"
def hipfrog(): requestdata = json.loads(request.get_data()) callingMessage = requestdata['item']['message']['message'].lower().split() oauthId = requestdata['oauth_client_id'] installation = messageFunctions.getInstallationFromOauthId(oauthId) if installation.glassfrogToken is None: message = strings.set_token_first message_dict = messageFunctions.createMessageDict(strings.error_color, message) elif len(callingMessage) == 1: message = strings.help_hipfrog message_dict = messageFunctions.createMessageDict(strings.succes_color, message) elif len(callingMessage) > 1: # /hipfrog something message = strings.missing_functionality.format(callingMessage[1]) message_dict = messageFunctions.createMessageDict(strings.error_color, message) # TODO Generate message_dict and color here return json.jsonify(message_dict)
def veryCode(): logging.info("veryCode function start...") if request.method == 'POST': a = request.get_data() dict1 = json.loads(a) serial_no = dict1['serialNo'] logging.info("veryCode's serialNo:"+serial_no) spider = crawler() very_code = spider.loadyan(serial_no) logging.info("very_code:"+str(very_code)) print "very_code:"+very_code global dict; dict = {serial_no:spider} logging.info("????hello" + str(veryCode).decode('utf-8')) response = app.response_class( response=json.dumps(very_code), status=200, mimetype='application/json' ) return response
def login(): if request.method == 'POST': a = request.get_data() dict1 = json.loads(a) print dict1['serialNo']+"--"+dict1['cardNo']+"---"+dict1['password'] logging.info('?????:'+dict1['serialNo']+',cardNo:'+dict1['cardNo']+',password:'+dict1['password']) print dict1['serialNo'] print dict.get(dict1['serialNo']) input_v = raw_input("::") dict2 = dict.get(dict1['serialNo']).logpage('330621196304098747', '111111', input_v,dict1['serialNo']) print dict2 response = app.response_class( response=json.dumps(dict2), status=200, mimetype='application/json' ) return response
def scrapy(): if request.method == 'POST': a = request.get_data() dict1 = json.loads(a) print dict1['serialNo'].encode("utf8") logging.info('/scrapy/content ?????:'+dict1['serialNo'].encode("utf8")) dict1=dict.get(dict1['serialNo']).info(dict1['serialNo']) print '/scrapy/content ?????:'+dict1 response = app.response_class( response=json.dumps(dict1), status=200, mimetype='application/json' ) return response
def callback(): # get X-Line-Signature header value signature = request.headers['X-Line-Signature'] # get request body as text body = request.get_data(as_text=True) app.logger.info("Request body: " + body) # handle webhook body try: # handler.handle(body, signature) handle_pool.apply_async(handler.handle, args=(body, signature)) except exceptions.InvalidSignatureError: abort(400) return 'OK'
def webhook_flash(): KEY = "secret" DATA = request.get_data() print "==============================================================="; print DATA; print "==============================================================="; EXPECTED = "" if not request.headers.get('X-Hub-Signature') is None : EXPECTED = str(request.headers.get('X-Hub-Signature')) if not EXPECTED : print("Not signed. Not calculating"); else : calculated = hmac.new(KEY, DATA, hashlib.sha1).hexdigest() calculated = "sha1=" + (calculated) print("Expected : " + EXPECTED); print("Calculated: " + calculated); print("Match? : " + str(calculated == EXPECTED)); pass output = json.loads(DATA) print("Topic Recieved: " + output["topic"]);
def get_request_validity(): # GitHub signature will suffice for CSRF check github_signature = request.headers.get("X-Hub-Signature") if github_signature: payload_bytes = request.get_data() for github_webhook_secret in config.github_webhook_secrets: digest = hmac.new(github_webhook_secret, payload_bytes, sha1).hexdigest() expected_signature = "sha1=%s" % digest if expected_signature == github_signature: return True # Normal CSRF form tokens work too token = request.form.get("_csrf_token") expected_token = session.get("_csrf_token", None) if expected_token and expected_token == token: return True return False
def run_ctx_request(environ): """ run flask request context in celery worker """ from blueprints import app # wsgi.app if '_wsgi.input' in environ: # an input stream (file-like object) from which the HTTP request body can be read. # detail: https://www.python.org/dev/peps/pep-0333/#environ-variables environ['wsgi.input'] = BytesIO(environ['_wsgi.input']) with app.request_context(): g.in_celery = True try: rv = app.full_dispatch_request() except InternalServerError: if app.debug: raise return app.make_response(InternalServerError()) return (rv.get_data(), rv.status_code, rv.headers)
def decorator(*args, **kwargs): if getattr(g, 'in_celery', False): return f(*args, **kwargs) environ = {k: v for k, v in request.environ.items() if isinstance(v, text_types)} if 'wsgi.input' in request.environ: environ['_wsgi.input'] = request.get_data() # request.body task = run_ctx_request.apply_async(args=(environ,)) if task.state == states.PENDING or task.state == states.RECEIVED or \ task.state == states.STARTED: return '', 202, {'Location': url_for('api.get_status', id=task.id)} return task.info
def post(self): resource = request.get_data() resource_json = json.loads(resource) num_participants = int(resource_json.get('num_participants', 0)) include_physical_measurements = bool(resource_json.get('include_physical_measurements', False)) include_biobank_orders = bool(resource_json.get('include_biobank_orders', False)) requested_hpo = resource_json.get('hpo', None) if num_participants > 0: participant_generator = FakeParticipantGenerator(InProcessClient()) for _ in range(0, num_participants): participant_generator.generate_participant(include_physical_measurements, include_biobank_orders, requested_hpo) if resource_json.get('create_biobank_samples'): deferred.defer( generate_samples, resource_json.get('samples_missing_fraction', _SAMPLES_MISSING_FRACTION))
def decode_json_post_data(fn): """Decorator that parses POSTed JSON and attaches it to the request object (:obj:`request.data`).""" @wraps(fn) def wrapped_function(*args, **kwargs): if request.method == 'POST': data = request.get_data(cache=False) if not data: raise OcdApiError('No data was POSTed', 400) try: request_charset = request.mimetype_params.get('charset') if request_charset is not None: data = json.loads(data, encoding=request_charset) else: data = json.loads(data) except: raise OcdApiError('Unable to parse POSTed JSON', 400) request.data = data return fn(*args, **kwargs) return wrapped_function
def login(): """ login a user :return: response """ content = request.get_data(True, as_text=True) login_data = json.loads(content) if ('name' in login_data) and ('password' in login_data): name = login_data['name'] password = login_data['password'] remember = login_data['remember'] if 'remember' in login_data else False credential = UserCredential.login_user(name, password) login_user(credential, remember=remember) return json_resp({'msg': 'OK'}) else: raise ClientError(ClientError.INVALID_REQUEST)
def register(): """ register a new user using invite code, note that a newly registered user is not administrator, you need to use an admin user to promote it :return: response """ content = request.get_data(True, as_text=True) register_data = json.loads(content) if ('name' in register_data) and ('password' in register_data) and ('password_repeat' in register_data) and ('invite_code' in register_data) and ('email' in register_data): name = register_data['name'] password = register_data['password'] password_repeat = register_data['password_repeat'] email = register_data['email'] invite_code = register_data['invite_code'] if password != password_repeat: raise ClientError(ClientError.PASSWORD_MISMATCH) if UserCredential.register_user(name=name, password=password, email=email, invite_code=invite_code): # login automatically credential = UserCredential.login_user(name, password) login_user(credential, remember=False) # send email credential.send_confirm_email() return json_resp({'message': 'ok'}, 201) else: raise ClientError(ClientError.INVALID_REQUEST)
def configure_linebot_app(app): @app.after_request def commit_database(response): db.commit() return response @app.route("/api/line_webhook", methods=["POST"]) def line_webhook(): signature = request.headers['X-Line-Signature'] body = request.get_data(as_text=True) logger.debug(f'Incoming message:\n{pformat(body)}') try: line_webhook_handler.handle(body, signature) except InvalidSignatureError: logger.warning('Message with an invalid signature received') abort(400) except LineBotApiError as e: logger.error(f'{e}\nDetails:\n{pformat(e.error.details)}') abort(500) except Exception as e: logger.error(f'Uncaught error: {e}') abort(500) return "OK"
def callback(): # get X-Line-Signature header value signature = request.headers['X-Line-Signature'] # get request body as text body = request.get_data(as_text=True) # print("body:",body) app.logger.info("Request body: " + body) # handle webhook body try: handler.handle(body, signature) except InvalidSignatureError: abort(400) return 'ok'
def prepare_client_request_data(): """ ???????????data, ?????, ????? ?????, ?????????????str ???????, ?????, ??????? (bytes) :rtype: Union[str, bytes, None] """ data = request.get_data() # type: bytes # ??????????????? encoding = encoding_detect(data) if encoding is not None: try: data = data.decode(encoding=encoding) # type: str except: # ????, data??????????????, ????, ????? encoding = None pass else: # data?????, ?????, ???str data = client_requests_text_rewrite(data) # type: str # ????if?debug???, ?????????? if developer_string_trace: # coverage: exclude if isinstance(data, str): data = data.encode(encoding=encoding) if developer_string_trace.encode(encoding=encoding) in data: infoprint('StringTrace: appears after client_requests_bin_rewrite, code line no. ', current_line_number()) return data, encoding
def predict(): #whenever the predict method is called, we're going #to input the user drawn character as an image into the model #perform inference, and return the classification #get the raw data format of the image imgData = request.get_data() #encode it into a suitable format convertImage(imgData) print "debug" #read the image into memory x = imread('output.png',mode='L') #compute a bit-wise inversion so black becomes white and vice versa x = np.invert(x) #make it the right size x = imresize(x,(28,28)) #imshow(x) #convert to a 4D tensor to feed into our model x = x.reshape(1,28,28,1) print "debug2" #in our computation graph with graph.as_default(): #perform the prediction out = model.predict(x) print(out) print(np.argmax(out,axis=1)) print "debug3" #convert the response to a string response = np.array_str(np.argmax(out,axis=1)) return response
def imreceive(): data = None hack = False if request.method == "POST": data = request.get_data() if 'hack' in request.args: hack = request.args['hack']=="True" face = pickle.loads(data) print("Image Received") if face.shape==(224,224, 3) and face.dtype=="uint8": if hack and proc_face_with_hack(face): return jsonify(dict(authentication="ALLOWED")) elif not hack and proc_face(face): return jsonify(dict(authentication="ALLOWED")) return jsonify(dict(authentication="DENIED"))
def line_call_back(): try: if PRODUCTION == '1': if not bot.bot_api.client.validate_signature(request.headers.get('X-Line-Channelsignature'), request.get_data().decode("utf-8")): return "NOT PASS" bot.process_new_event(request.get_data().decode("utf-8")) except Exception as ex: logging.error(ex) return "OK"
def edison_done(): data = request.get_data().decode("utf-8") bot.take_photo_done(data) return 'OK'
def webhook(): page.handle_webhook(request.get_data(as_text=True)) return "ok" ### Main method (Handles user messages, db) ###
def callback(): # get X-Line-Signature header value signature = request.headers['X-Line-Signature'] # get request body as text body = request.get_data(as_text=True) app.logger.info("Request body: " + body) # handle webhook body try: handler.handle(body, signature) except InvalidSignatureError: abort(400) return 'OK'
def generate_secure_token(): payload = request.get_data() token = SecureToken.encrypt(payload) return token + b'\n'
def parse_frames(pipeline): if request.method == 'GET': # parse from text parameter d = request.args.get('t') if d == None: abort(400) else: d = request.get_data(as_text=True) tstart = time.time() try: r = proc_input(d,pipeline.upper()) except: LOGGER.exception("Error processing input") abort(500) return jsonify(r)
def server_error_page(error): db.session.rollback() return jsonify({'error': 'internal server error'}), 500 # logger # @app.before_request # def log_request_info(): # app.logger.debug('Headers: %s', request.headers) # app.logger.debug('Body: %s', request.get_data())
def get_sql_audit_info(): return sql_manager.audit_sql(get_object_from_json_tmp(request.get_data()))
def review_sql_work(): return sql_manager.audit_sql_work(get_object_from_json_tmp(request.get_data()))
def add_sql_work(): return sql_manager.add_sql_work(get_object_from_json_tmp(request.get_data()))
def add_user(): return user_manager.add_user(get_object_from_json_tmp(request.get_data()))
def add_group(): return user_manager.add_group_info(get_object_from_json_tmp(request.get_data()))
def update_group(): return user_manager.update_user_group_info(get_object_from_json_tmp(request.get_data()))
def update_sql_work(): return sql_manager.update_sql_work(get_object_from_json_tmp(request.get_data())) # endregion # region login api
def webhook(): payload = request.get_data(as_text=True) print(payload) page.handle_webhook(payload) return "ok"
def get_form(): try: return request._covador_form except AttributeError: if request.content_type.startswith('multipart/form-data'): form = request.form.to_dict(False) elif request.content_type.startswith('application/x-www-form-urlencoded'): form = parse_qs(request.get_data(parse_form_data=False)) else: form = {} request._covador_form = form return form
def get_request_log(): # parse args and forms values = '' if len(request.values) > 0: for key in request.values: values += key + ': ' + request.values[key] + ', ' route = '/' + request.base_url.replace(request.host_url, '') request_log = { 'route': route, 'method': request.method, } # ??xml ?? category = request_log['route'].split('/')[1] if category != "virtual_card": return request_log if len(request.get_data()) != 0: body = json.loads(request.get_data()) if "password" in body: body.pop("password") request_log['body'] = body if len(values) > 0: request_log['values'] = values return request_log
def saveFile(name): """ save plugin code. code is provides via http body :param name: the plugin name :return: empty http reponse """ with open("./modules/plugins/"+name+"/__init__.py", "wb") as fo: fo.write(request.get_data()) cbpi.emit_message("PLUGIN %s SAVED" % (name)) return ('', 204)
def post(self): """Trigger a new run""" run_payload = utils.uni_to_str(json.loads(request.get_data())) run_payload['id'] = str(uuid.uuid4()) LOG.info('Triggering new ansible run %s', run_payload['id']) run = self.manager.create_run(run_payload) return run_model.format_response(run)