Python flask.request 模块,get_data() 实例源码

我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用flask.request.get_data()

项目:apizen    作者:blackmatrix7    | 项目源码 | 文件源码
def before_request():
    request_param = {key.lower(): value for key, value in request.environ.items()
                     if key in ('CONTENT_TYPE', 'CONTENT_LENGTH', 'HTTP_HOST',
                                'HTTP_ACCEPT', 'HTTP_ACCEPT_ENCODING', 'HTTP_COOKIE',
                                'HTTP_USER_AGENT', 'PATH_INFO', 'QUERY_STRING',
                                'SERVER_PROTOCOL', 'REQUEST_METHOD', 'HTTP_HOST',
                                'SERVER_PORT', 'SERVER_SOFTWARE', 'REMOTE_ADDR',
                                'REMOTE_PORT', 'HTTP_ACCEPT_LANGUAGE')}
    g.request_raw_data = request.get_data().decode('utf8')
    g.request_time = datetime.now()
    g.api_method = request.args['method']
    g.api_version = request.args['v']
    g.request_param = request_param
    g.request_form = request.form.to_dict() if request.form else None
    try:
        g.request_json = request.get_json() if request.is_json else None
    except Exception:
        raise ApiSysExceptions.invalid_json
项目:dota2-messenger-platform    作者:nico-arianto    | 项目源码 | 文件源码
def verify_request_signature(func):
    @wraps(func)
    def decorated(*args, **kwargs):
        signature = request.headers.get('x-hub-signature', None)
        if signature:
            elements = signature.split('=')
            method = elements[0]
            signature_hash = elements[1]
            expected_hash = hmac.new(APP_SECRET, msg=request.get_data(), digestmod=method).hexdigest()
            if signature_hash != expected_hash:
                LOGGER.error('Signature was invalid')
                return make_response('', 403)
        else:
            LOGGER.error('Could not validate the signature')
        return func(*args, **kwargs)

    return decorated
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def kafka(TOPIC=None):
    # Lazy init of the Kafka producer
    #
    global PRODUCER
    if PRODUCER is None:
        PRODUCER = KafkaProducer(
            bootstrap_servers=KAFKA_BOOSTRAP_SERVERS,
            sasl_mechanism=KAFKA_SASL_MECHANISM,
            sasl_plain_username=KAFKA_USER,
            sasl_plain_password=KAFKA_PASSWORD)
    try:
        future = PRODUCER.send(TOPIC, request.get_data())
        future.get(timeout=60)
        return "OK", 200, None
    except KafkaTimeoutError:
        return "Internal Server Error", 500, None
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def socialcast(ALERTID=None, NUMRESULTS=10, TEAM=None, I=None, X=None):
    """
    Create a post on Socialcast containing log events.
    Limited to `NUMRESULTS` (default 10) or 1MB.
    If `TEAM/I/X` is not passed, requires `SOCIALCASTURL` defined in the form `https://TEAM.socialcast.com/api/webhooks/IIIIIIIIII/XXXXXXXXXXX`
    For more information see https://socialcast.github.io/socialcast/apidoc/incoming_webhook.html
    """
    if X is not None:
       URL = 'https://' + TEAM + '.socialcast.com/api/webhooks/' + I + '/' + X
    if not SOCIALCASTURL or not 'socialcast.com/api/webhooks' in SOCIALCASTURL:
        return ("SOCIALCASTURL parameter must be set properly, please edit the shim!\n", 500, None)
    else:
        URL = SOCIALCASTURL
    # Socialcast cares about the order of the information in the body
    # json.dumps() does not preserve the order so just use raw get_data()
    return callapi(URL, 'post', request.get_data())
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def parse(request):
    """
    Parse incoming JSON.
    Returns a dict or logs an exception.
    """
    try:
        payload = request.get_json()
        if (payload is None):
            logging.exception("Payload is empty, did you specify a Header in the request?")
            raise
        alert = {}
        alert = parseLI(payload, alert)
        alert = parsevROps(payload, alert)
    except:
        logging.info("Body=%s" % request.get_data())
        logging.exception("Unexpected payload, is it in proper JSON format?")
        raise
    logging.info("Parsed=%s" % alert)
    return alert
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def test(ALERTID=None):
    """Log the auth header, request, and parsed moreinfo field. Respond with success. Don't send the payload anywhere."""
    try:
        logging.info(request.headers['Authorization'])
    except KeyError:
        pass
    if request.get_data():
        logging.info(request.get_data())
        a = parse(request)
        try:
            logging.info(a['moreinfo'])
        except KeyError:
            pass
    return "OK"


# Import individual shims
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def query_sql_list():
    user_info = cache.MyCache().get_user_info(current_user.id)
    if (user_info.group_id == settings.DBA_GROUP_ID and user_info.role_id == settings.ROLE_DEV):
        obj = get_object_from_json_tmp(request.get_data())
        result_sql_list = sql_manager.get_sql_work_for_dba(obj)
    elif (user_info.role_id == settings.ROLE_DEV):
        obj = get_object_from_json_tmp(request.get_data())
        result_sql_list = sql_manager.get_sql_work_for_dev(obj)
    elif (user_info.role_id == settings.ROLE_LEADER):
        obj = get_object_from_json_tmp(request.get_data())
        result_sql_list = sql_manager.get_sql_work_for_leader(obj)
    else:
        obj = get_object_from_json(request.form)
        result_sql_list = sql_manager.get_sql_list(obj)
    return render_template("list_view.html",
                           sql_list=result_sql_list,
                           user_info=user_info,
                           page_number=obj.page_number,
                           page_list=get_page_number_list(obj.page_number),
                           min_id=get_min_id(result_sql_list, obj.page_number))


# ??????????????????
项目:line-python-bot    作者:botimize    | 项目源码 | 文件源码
def callback():
    # get X-Line-Signature header value
    signature = request.headers['X-Line-Signature']

    # get request body as text
    body = request.get_data(as_text=True)
    app.logger.info("Request body: " + body)

    event = request.get_json()
    botimize.log_incoming(event)

    # handle webhook body
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        abort(400)

    return 'OK'
项目:mnist-flask    作者:akashdeepjassal    | 项目源码 | 文件源码
def predict():
    # get data from drawing canvas and save as image
    parseImage(request.get_data())

    # read parsed image back in 8-bit, black and white mode (L)
    x = imread('output.png', mode='L')
    x = np.invert(x)
    x = imresize(x,(28,28))

    # reshape image data for use in neural network
    x = x.reshape(1,28,28,1)
    with graph.as_default():
        out = model.predict(x)
        print(out)
        print(np.argmax(out, axis=1))
        response = np.array_str(np.argmax(out, axis=1))
        return response
项目:flack    作者:miguelgrinberg    | 项目源码 | 文件源码
def run_flask_request(environ):
    from .wsgi_aux import app

    if '_wsgi.input' in environ:
        environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])

    # Create a request context similar to that of the original request
    # so that the task can have access to flask.g, flask.request, etc.
    with app.request_context(environ):
        # Record the fact that we are running in the Celery worker now
        g.in_celery = True

        # Run the route function and record the response
        try:
            rv = app.full_dispatch_request()
        except:
            # If we are in debug mode we want to see the exception
            # Else, return a 500 error
            if app.debug:
                raise
            rv = app.make_response(InternalServerError())
        return (rv.get_data(), rv.status_code, rv.headers)
项目:flack    作者:miguelgrinberg    | 项目源码 | 文件源码
def wrapped(*args, **kwargs):
        # If we are already running the request on the celery side, then we
        # just call the wrapped function to allow the request to execute.
        if getattr(g, 'in_celery', False):
            return f(*args, **kwargs)

        # If we are on the Flask side, we need to launch the Celery task,
        # passing the request environment, which will be used to reconstruct
        # the request object. The request body has to be handled as a special
        # case, since WSGI requires it to be provided as a file-like object.
        environ = {k: v for k, v in request.environ.items()
                   if isinstance(v, text_types)}
        if 'wsgi.input' in request.environ:
            environ['_wsgi.input'] = request.get_data()
        t = run_flask_request.apply_async(args=(environ,))

        # Return a 202 response, with a link that the client can use to
        # obtain task status that is based on the Celery task id.
        if t.state == states.PENDING or t.state == states.RECEIVED or \
                t.state == states.STARTED:
            return '', 202, {'Location': url_for('tasks.get_status', id=t.id)}

        # If the task already finished, return its return value as response.
        # This would be the case when CELERY_ALWAYS_EAGER is set to True.
        return t.info
项目:FacebookBot-echobot-simple    作者:hungtraan    | 项目源码 | 文件源码
def handle_messages():
    payload = request.get_data()

    # Handle messages
    for sender_id, message in messaging_events(payload):
        # Start processing valid requests
        try:
            response = processIncoming(sender_id, message)

            if response is not None:
                send_message(PAT, sender_id, response)

            else:
                send_message(PAT, sender_id, "Sorry I don't understand that")
        except Exception, e:
            print e
            traceback.print_exc()
    return "ok"
项目:hipfrog    作者:wardweistra    | 项目源码 | 文件源码
def hipfrog():
    requestdata = json.loads(request.get_data())
    callingMessage = requestdata['item']['message']['message'].lower().split()
    oauthId = requestdata['oauth_client_id']
    installation = messageFunctions.getInstallationFromOauthId(oauthId)

    if installation.glassfrogToken is None:
        message = strings.set_token_first
        message_dict = messageFunctions.createMessageDict(strings.error_color, message)
    elif len(callingMessage) == 1:
        message = strings.help_hipfrog
        message_dict = messageFunctions.createMessageDict(strings.succes_color, message)
    elif len(callingMessage) > 1:
        # /hipfrog something
        message = strings.missing_functionality.format(callingMessage[1])
        message_dict = messageFunctions.createMessageDict(strings.error_color, message)
    # TODO Generate message_dict and color here
    return json.jsonify(message_dict)
项目:python_learn    作者:jetty-guo    | 项目源码 | 文件源码
def veryCode():
    logging.info("veryCode function start...")
    if request.method == 'POST':

        a = request.get_data()
        dict1 = json.loads(a)
        serial_no = dict1['serialNo']
        logging.info("veryCode's serialNo:"+serial_no)
        spider = crawler()
        very_code = spider.loadyan(serial_no)
        logging.info("very_code:"+str(very_code))
        print "very_code:"+very_code

        global dict;
        dict = {serial_no:spider}

        logging.info("????hello" + str(veryCode).decode('utf-8'))
        response = app.response_class(
            response=json.dumps(very_code),
            status=200,
            mimetype='application/json'
        )
        return response
项目:python_learn    作者:jetty-guo    | 项目源码 | 文件源码
def login():

    if request.method == 'POST':
        a = request.get_data()
        dict1 = json.loads(a)
        print dict1['serialNo']+"--"+dict1['cardNo']+"---"+dict1['password']
        logging.info('?????:'+dict1['serialNo']+',cardNo:'+dict1['cardNo']+',password:'+dict1['password'])

        print dict1['serialNo']

        print dict.get(dict1['serialNo'])
        input_v = raw_input("::")

        dict2 = dict.get(dict1['serialNo']).logpage('330621196304098747', '111111', input_v,dict1['serialNo'])
        print dict2
        response = app.response_class(
            response=json.dumps(dict2),
            status=200,
            mimetype='application/json'
        )
        return response
项目:python_learn    作者:jetty-guo    | 项目源码 | 文件源码
def scrapy():

    if request.method == 'POST':
        a = request.get_data()
        dict1 = json.loads(a)
        print dict1['serialNo'].encode("utf8")
        logging.info('/scrapy/content ?????:'+dict1['serialNo'].encode("utf8"))

        dict1=dict.get(dict1['serialNo']).info(dict1['serialNo'])
        print '/scrapy/content ?????:'+dict1

        response = app.response_class(
            response=json.dumps(dict1),
            status=200,
            mimetype='application/json'
        )
        return response
项目:LineBot    作者:RaenonX    | 项目源码 | 文件源码
def callback():
    # get X-Line-Signature header value
    signature = request.headers['X-Line-Signature']

    # get request body as text
    body = request.get_data(as_text=True)
    app.logger.info("Request body: " + body)

    # handle webhook body
    try:
        # handler.handle(body, signature)
        handle_pool.apply_async(handler.handle, args=(body, signature))
    except exceptions.InvalidSignatureError:
        abort(400)

    return 'OK'
项目:intercom-webhooks    作者:intercom-archive    | 项目源码 | 文件源码
def webhook_flash():
    KEY = "secret"

    DATA = request.get_data()
    print "===============================================================";
    print DATA;
    print "===============================================================";
    EXPECTED = ""
    if not request.headers.get('X-Hub-Signature') is None :
        EXPECTED = str(request.headers.get('X-Hub-Signature'))
    if not EXPECTED :
        print("Not signed. Not calculating");
    else :
        calculated = hmac.new(KEY, DATA, hashlib.sha1).hexdigest()
        calculated = "sha1=" + (calculated)
        print("Expected  : " + EXPECTED);
        print("Calculated: " + calculated);
        print("Match?    : " + str(calculated == EXPECTED));
    pass
    output = json.loads(DATA)
    print("Topic Recieved: " + output["topic"]);
项目:ob2    作者:octobear2    | 项目源码 | 文件源码
def get_request_validity():
    # GitHub signature will suffice for CSRF check
    github_signature = request.headers.get("X-Hub-Signature")
    if github_signature:
        payload_bytes = request.get_data()
        for github_webhook_secret in config.github_webhook_secrets:
            digest = hmac.new(github_webhook_secret, payload_bytes, sha1).hexdigest()
            expected_signature = "sha1=%s" % digest
            if expected_signature == github_signature:
                return True
    # Normal CSRF form tokens work too
    token = request.form.get("_csrf_token")
    expected_token = session.get("_csrf_token", None)
    if expected_token and expected_token == token:
        return True
    return False
项目:hacks    作者:neo1218    | 项目源码 | 文件源码
def run_ctx_request(environ):
    """
    run flask request context in celery worker
    """
    from blueprints import app  # wsgi.app

    if '_wsgi.input' in environ:
        # an input stream (file-like object) from which the HTTP request body can be read.
        # detail: https://www.python.org/dev/peps/pep-0333/#environ-variables
        environ['wsgi.input'] = BytesIO(environ['_wsgi.input'])

    with app.request_context():
        g.in_celery = True

        try:
            rv = app.full_dispatch_request()
        except InternalServerError:
            if app.debug:
                raise
            return app.make_response(InternalServerError())
        return (rv.get_data(), rv.status_code, rv.headers)
项目:hacks    作者:neo1218    | 项目源码 | 文件源码
def decorator(*args, **kwargs):

        if getattr(g, 'in_celery', False):
            return f(*args, **kwargs)

        environ = {k: v for k, v in request.environ.items()
                   if isinstance(v, text_types)}
        if 'wsgi.input' in request.environ:
            environ['_wsgi.input'] = request.get_data()  # request.body
        task = run_ctx_request.apply_async(args=(environ,))

        if task.state == states.PENDING or task.state == states.RECEIVED or \
           task.state == states.STARTED:
            return '', 202, {'Location': url_for('api.get_status', id=task.id)}

        return task.info
项目:raw-data-repository    作者:all-of-us    | 项目源码 | 文件源码
def post(self):
    resource = request.get_data()
    resource_json = json.loads(resource)
    num_participants = int(resource_json.get('num_participants', 0))
    include_physical_measurements = bool(resource_json.get('include_physical_measurements', False))
    include_biobank_orders = bool(resource_json.get('include_biobank_orders', False))
    requested_hpo = resource_json.get('hpo', None)
    if num_participants > 0:
      participant_generator = FakeParticipantGenerator(InProcessClient())
      for _ in range(0, num_participants):
        participant_generator.generate_participant(include_physical_measurements,
                                                   include_biobank_orders,
                                                   requested_hpo)
    if resource_json.get('create_biobank_samples'):
      deferred.defer(
          generate_samples,
          resource_json.get('samples_missing_fraction', _SAMPLES_MISSING_FRACTION))
项目:open-wob-api    作者:openstate    | 项目源码 | 文件源码
def decode_json_post_data(fn):
    """Decorator that parses POSTed JSON and attaches it to the request
    object (:obj:`request.data`)."""

    @wraps(fn)
    def wrapped_function(*args, **kwargs):
        if request.method == 'POST':
            data = request.get_data(cache=False)
            if not data:
                raise OcdApiError('No data was POSTed', 400)

            try:
                request_charset = request.mimetype_params.get('charset')
                if request_charset is not None:
                    data = json.loads(data, encoding=request_charset)
                else:
                    data = json.loads(data)
            except:
                raise OcdApiError('Unable to parse POSTed JSON', 400)

            request.data = data

        return fn(*args, **kwargs)

    return wrapped_function
项目:Albireo    作者:lordfriend    | 项目源码 | 文件源码
def login():
    """
    login a user
    :return: response
    """
    content = request.get_data(True, as_text=True)
    login_data = json.loads(content)
    if ('name' in login_data) and ('password' in login_data):
        name = login_data['name']
        password = login_data['password']
        remember = login_data['remember'] if 'remember' in login_data else False
        credential = UserCredential.login_user(name, password)
        login_user(credential, remember=remember)
        return json_resp({'msg': 'OK'})
    else:
        raise ClientError(ClientError.INVALID_REQUEST)
项目:Albireo    作者:lordfriend    | 项目源码 | 文件源码
def register():
    """
    register a new user using invite code, note that a newly registered user is not administrator, you need to
    use an admin user to promote it
    :return: response
    """
    content = request.get_data(True, as_text=True)
    register_data = json.loads(content)
    if ('name' in register_data) and ('password' in register_data) and ('password_repeat' in register_data) and ('invite_code' in register_data) and ('email' in register_data):
        name = register_data['name']
        password = register_data['password']
        password_repeat = register_data['password_repeat']
        email = register_data['email']
        invite_code = register_data['invite_code']
        if password != password_repeat:
            raise ClientError(ClientError.PASSWORD_MISMATCH)
        if UserCredential.register_user(name=name, password=password, email=email, invite_code=invite_code):
            # login automatically
            credential = UserCredential.login_user(name, password)
            login_user(credential, remember=False)
            # send email
            credential.send_confirm_email()
            return json_resp({'message': 'ok'}, 201)
    else:
        raise ClientError(ClientError.INVALID_REQUEST)
项目:quizbot-2017    作者:pycontw    | 项目源码 | 文件源码
def configure_linebot_app(app):

    @app.after_request
    def commit_database(response):
        db.commit()
        return response

    @app.route("/api/line_webhook", methods=["POST"])
    def line_webhook():
        signature = request.headers['X-Line-Signature']
        body = request.get_data(as_text=True)
        logger.debug(f'Incoming message:\n{pformat(body)}')
        try:
            line_webhook_handler.handle(body, signature)
        except InvalidSignatureError:
            logger.warning('Message with an invalid signature received')
            abort(400)
        except LineBotApiError as e:
            logger.error(f'{e}\nDetails:\n{pformat(e.error.details)}')
            abort(500)
        except Exception as e:
            logger.error(f'Uncaught error: {e}')
            abort(500)
        return "OK"
项目:line-bot-imgur-tutorial    作者:twtrubiks    | 项目源码 | 文件源码
def callback():
    # get X-Line-Signature header value
    signature = request.headers['X-Line-Signature']

    # get request body as text
    body = request.get_data(as_text=True)
    # print("body:",body)
    app.logger.info("Request body: " + body)

    # handle webhook body
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        abort(400)

    return 'ok'
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def prepare_client_request_data():
    """
    ???????????data, ?????, ?????
    ?????, ?????????????str
    ???????, ?????, ??????? (bytes)
    :rtype: Union[str, bytes, None]
    """
    data = request.get_data()  # type: bytes

    # ???????????????
    encoding = encoding_detect(data)

    if encoding is not None:
        try:
            data = data.decode(encoding=encoding)  # type: str
        except:
            # ????, data??????????????, ????, ?????
            encoding = None
            pass
        else:
            # data?????, ?????, ???str
            data = client_requests_text_rewrite(data)  # type: str

    # ????if?debug???, ??????????
    if developer_string_trace:  # coverage: exclude
        if isinstance(data, str):
            data = data.encode(encoding=encoding)
        if developer_string_trace.encode(encoding=encoding) in data:
            infoprint('StringTrace: appears after client_requests_bin_rewrite, code line no. ', current_line_number())

    return data, encoding
项目:how_to_deploy_a_keras_model_to_production    作者:llSourcell    | 项目源码 | 文件源码
def predict():
    #whenever the predict method is called, we're going
    #to input the user drawn character as an image into the model
    #perform inference, and return the classification
    #get the raw data format of the image
    imgData = request.get_data()
    #encode it into a suitable format
    convertImage(imgData)
    print "debug"
    #read the image into memory
    x = imread('output.png',mode='L')
    #compute a bit-wise inversion so black becomes white and vice versa
    x = np.invert(x)
    #make it the right size
    x = imresize(x,(28,28))
    #imshow(x)
    #convert to a 4D tensor to feed into our model
    x = x.reshape(1,28,28,1)
    print "debug2"
    #in our computation graph
    with graph.as_default():
        #perform the prediction
        out = model.predict(x)
        print(out)
        print(np.argmax(out,axis=1))
        print "debug3"
        #convert the response to a string
        response = np.array_str(np.argmax(out,axis=1))
        return response
项目:facejack    作者:PetarV-    | 项目源码 | 文件源码
def imreceive():
    data = None
    hack = False
    if request.method == "POST":
        data = request.get_data()
        if 'hack' in request.args:
            hack = request.args['hack']=="True"
        face = pickle.loads(data)
        print("Image Received")
        if face.shape==(224,224, 3) and face.dtype=="uint8":
            if hack and proc_face_with_hack(face):
                return jsonify(dict(authentication="ALLOWED"))
            elif not hack and proc_face(face):
                return jsonify(dict(authentication="ALLOWED"))
    return jsonify(dict(authentication="DENIED"))
项目:coscup-line-bot    作者:ncuoolab    | 项目源码 | 文件源码
def line_call_back():
    try:
        if PRODUCTION == '1':
            if not bot.bot_api.client.validate_signature(request.headers.get('X-Line-Channelsignature'),
                                                     request.get_data().decode("utf-8")):
                return "NOT PASS"
        bot.process_new_event(request.get_data().decode("utf-8"))
    except Exception as ex:
        logging.error(ex)
    return "OK"
项目:coscup-line-bot    作者:ncuoolab    | 项目源码 | 文件源码
def edison_done():
    data = request.get_data().decode("utf-8")
    bot.take_photo_done(data)
    return 'OK'
项目:MIT-Hodor    作者:kalbhor    | 项目源码 | 文件源码
def webhook():
    page.handle_webhook(request.get_data(as_text=True))
    return "ok"

### Main method (Handles user messages, db) ###
项目:bot-line-indonesian-summarizer    作者:ec2ainun    | 项目源码 | 文件源码
def callback():
    # get X-Line-Signature header value
    signature = request.headers['X-Line-Signature']

    # get request body as text
    body = request.get_data(as_text=True)
    app.logger.info("Request body: " + body)

    # handle webhook body
    try:
        handler.handle(body, signature)
    except InvalidSignatureError:
        abort(400)

    return 'OK'
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def generate_secure_token():
    payload = request.get_data()
    token = SecureToken.encrypt(payload)
    return token + b'\n'
项目:turboparser-semafor    作者:ReutersMedia    | 项目源码 | 文件源码
def parse_frames(pipeline):
    if request.method == 'GET':
        # parse from text parameter
        d = request.args.get('t')
        if d == None:
            abort(400)
    else:
        d = request.get_data(as_text=True)
    tstart = time.time()
    try:
        r = proc_input(d,pipeline.upper())
    except:
        LOGGER.exception("Error processing input")
        abort(500)
    return jsonify(r)
项目:neural-finance    作者:Metnew    | 项目源码 | 文件源码
def server_error_page(error):
    db.session.rollback()
    return jsonify({'error': 'internal server error'}), 500

# logger
# @app.before_request
# def log_request_info():
    # app.logger.debug('Headers: %s', request.headers)
    # app.logger.debug('Body: %s', request.get_data())
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def get_sql_audit_info():
    return sql_manager.audit_sql(get_object_from_json_tmp(request.get_data()))
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def review_sql_work():
    return sql_manager.audit_sql_work(get_object_from_json_tmp(request.get_data()))
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def add_sql_work():
    return sql_manager.add_sql_work(get_object_from_json_tmp(request.get_data()))
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def add_user():
    return user_manager.add_user(get_object_from_json_tmp(request.get_data()))
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def add_group():
    return user_manager.add_group_info(get_object_from_json_tmp(request.get_data()))
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def update_group():
    return user_manager.update_user_group_info(get_object_from_json_tmp(request.get_data()))
项目:mysql_audit    作者:ycg    | 项目源码 | 文件源码
def update_sql_work():
    return sql_manager.update_sql_work(get_object_from_json_tmp(request.get_data()))


# endregion

# region login api
项目:fbmq    作者:conbus    | 项目源码 | 文件源码
def webhook():
    payload = request.get_data(as_text=True)
    print(payload)
    page.handle_webhook(payload)

    return "ok"
项目:covador    作者:baverman    | 项目源码 | 文件源码
def get_form():
    try:
        return request._covador_form
    except AttributeError:
        if request.content_type.startswith('multipart/form-data'):
            form = request.form.to_dict(False)
        elif request.content_type.startswith('application/x-www-form-urlencoded'):
            form = parse_qs(request.get_data(parse_form_data=False))
        else:
            form = {}
        request._covador_form = form
        return form
项目:GOKU    作者:bingweichen    | 项目源码 | 文件源码
def get_request_log():
    # parse args and forms
    values = ''
    if len(request.values) > 0:
        for key in request.values:
            values += key + ': ' + request.values[key] + ', '
    route = '/' + request.base_url.replace(request.host_url, '')
    request_log = {
        'route': route,
        'method': request.method,
    }

    # ??xml ??
    category = request_log['route'].split('/')[1]
    if category != "virtual_card":
        return request_log

    if len(request.get_data()) != 0:
        body = json.loads(request.get_data())
        if "password" in body:
            body.pop("password")
        request_log['body'] = body

    if len(values) > 0:
        request_log['values'] = values
    return request_log
项目:craftbeerpi3    作者:Manuel83    | 项目源码 | 文件源码
def saveFile(name):

    """
    save plugin code. code is provides via http body
    :param name: the plugin name
    :return: empty http reponse
    """
    with open("./modules/plugins/"+name+"/__init__.py", "wb") as fo:
        fo.write(request.get_data())
    cbpi.emit_message("PLUGIN %s SAVED" % (name))

    return ('', 204)
项目:column    作者:vmware    | 项目源码 | 文件源码
def post(self):
        """Trigger a new run"""
        run_payload = utils.uni_to_str(json.loads(request.get_data()))
        run_payload['id'] = str(uuid.uuid4())
        LOG.info('Triggering new ansible run %s', run_payload['id'])
        run = self.manager.create_run(run_payload)
        return run_model.format_response(run)