Python flask.request 模块,cookies() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用flask.request.cookies()

项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def zmirror_enter(input_path='/'):
    """??????, ??????????, ??? main_function() """
    try:
        resp = main_function(input_path=input_path)

        # ????????
        for name, value in parse.extra_resp_headers.items():
            resp.headers.set(name, value)

        # ?????cookies
        for name, cookie_string in parse.extra_cookies.items():
            resp.headers.add("Set-Cookie", cookie_string)

    except:  # coverage: exclude
        return generate_error_page(is_traceback=True)
    else:
        return resp


# noinspection PyUnusedLocal
项目:inshack-2017    作者:HugoDelval    | 项目源码 | 文件源码
def create_dishwasher(name: str, brand: str, cost: int, cve: str) -> str:
    try:
        query = "INSERT INTO dishwashers VALUES ('{inserted_by}', '{id}', '{object}')"
        id = get_new_id()
        new_dishwasher = DishWasher(id, name, brand, cost, cve)
        if "user" in request.cookies:
            inserted_by = base64.b64decode(request.cookies["user"]).decode('utf-8')
        else:
            inserted_by = "no one :("
        if len(inserted_by) > 255:
            return ""
        for c in inserted_by:
            if c not in string.printable[:-2]:
                return ""
        if re.search(r"sleep", inserted_by, flags=re.IGNORECASE):
            return ""
        if re.search(r"benchmark", inserted_by, flags=re.IGNORECASE):
            return ""
        if re.search(r"wait", inserted_by, flags=re.IGNORECASE):
            return ""
        if insert(query.format(id=id, object=yaml.dump(new_dishwasher), inserted_by=inserted_by)):
            return id
    except Exception as e:
        print(e, file=sys.stderr)
    return ""
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def response_cookies_deep_copy():
    """
    It's a BAD hack to get RAW cookies headers, but so far, we don't have better way.
    We'd go DEEP inside the urllib's private method to get raw headers

    raw_headers example:
    [('Cache-Control', 'private'),
    ('Content-Length', '48234'),
    ('Content-Type', 'text/html; Charset=utf-8'),
    ('Server', 'Microsoft-IIS/8.5'),
    ('Set-Cookie','BoardList=BoardID=Show; expires=Mon, 02-May-2016 16:00:00 GMT; path=/'),
    ('Set-Cookie','aspsky=abcefgh; expires=Sun, 24-Apr-2016 16:00:00 GMT; path=/; HttpOnly'),
    ('Set-Cookie', 'ASPSESSIONIDSCSSDSSQ=OGKMLAHDHBFDJCDMGBOAGOMJ; path=/'),
    ('X-Powered-By', 'ASP.NET'),
    ('Date', 'Tue, 26 Apr 2016 12:32:40 GMT')]

    """
    raw_headers = parse.remote_response.raw._original_response.headers._headers
    header_cookies_string_list = []
    for name, value in raw_headers:
        if name.lower() == 'set-cookie':
            if my_host_scheme == 'http://':
                value = value.replace('Secure;', '')
                value = value.replace(';Secure', ';')
                value = value.replace('; Secure', ';')
            if 'httponly' in value.lower():
                if enable_aggressive_cookies_path_rewrite:
                    # ??cookie path??, ???path???? /
                    value = regex_cookie_path_rewriter.sub('path=/;', value)
                elif enable_aggressive_cookies_path_rewrite is not None:
                    # ??HttpOnly Cookies?path???url?
                    # eg(/extdomains/a.foobar.com): path=/verify; -> path=/extdomains/a.foobar.com/verify

                    if parse.remote_domain not in domain_alias_to_target_set:  # do not rewrite main domains
                        value = regex_cookie_path_rewriter.sub(
                            '\g<prefix>=/extdomains/' + parse.remote_domain + '\g<path>', value)

            header_cookies_string_list.append(value)
    return header_cookies_string_list
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def response_text_rewrite(resp_text):
    """
    rewrite urls in text-like content (html,css,js)
    :type resp_text: str
    :rtype: str
    """
    # v0.20.6+ plain replace domain alias, support json/urlencoded/json-urlencoded/plain
    if url_custom_redirect_enable:
        for before_replace, after_replace in (plain_replace_domain_alias + parse.temporary_domain_alias):
            resp_text = resp_text.replace(before_replace, after_replace)

    # v0.9.2+: advanced url rewrite engine
    resp_text = regex_adv_url_rewriter.sub(regex_url_reassemble, resp_text)

    if developer_string_trace is not None and developer_string_trace in resp_text:
        # debug???, ??????????
        infoprint('StringTrace: appears after advanced rewrite, code line no. ', current_line_number())

    # v0.28.0 ?????, ?v0.28.3?????
    resp_text = response_text_basic_mirrorlization(resp_text)

    if developer_string_trace is not None and developer_string_trace in resp_text:
        # debug???, ??????????
        infoprint('StringTrace: appears after basic mirrorlization, code line no. ', current_line_number())

    # for cookies set string (in js) replace
    # eg: ".twitter.com" --> "foo.com"
    resp_text = resp_text.replace('\".' + target_domain_root + '\"', '\"' + my_host_name_no_port + '\"')
    resp_text = resp_text.replace("\'." + target_domain_root + "\'", "\'" + my_host_name_no_port + "\'")
    resp_text = resp_text.replace("domain=." + target_domain_root, "domain=" + my_host_name_no_port)
    resp_text = resp_text.replace('\"' + target_domain_root + '\"', '\"' + my_host_name_no_port + '\"')
    resp_text = resp_text.replace("\'" + target_domain_root + "\'", "\'" + my_host_name_no_port + "\'")

    if developer_string_trace is not None and developer_string_trace in resp_text:
        # debug???, ??????????
        infoprint('StringTrace: appears after js cookies string rewrite, code line no. ', current_line_number())

    # resp_text = resp_text.replace('lang="zh-Hans"', '', 1)
    return resp_text
项目:flask-thirdLogin-demo    作者:staugur    | 项目源码 | 文件源码
def index():
    app.logger.info(request.cookies)
    if request.cookies.get("username"):
        return render_template("index.html")
    else:
        return """<form action="%s" method='post'>
            <input type="text" name="username" required>
            <input type="password" name="password" required>
            <input type="submit" value="??">
            </form>""" %url_for("login")
项目:invenio1-orcid    作者:bronger    | 项目源码 | 文件源码
def get_user_id(request):
    """Returns the record ID of the currently logged-in user.  The user is derived
    from the session cookie.

    :param request: flask HTTP request object

    :type request: `flash.Request`

    :return:
      the Invenio record ID od the currently logged-in user, or ``None`` if
      this could not be detected

    :rtype: str or NoneType
    """
    return invenio_binding("get_user_id", request.cookies)
项目:Invalid_cookies_in_Cookie_header    作者:dorfsmay    | 项目源码 | 文件源码
def hello():
    print("Cookie header raw: {}".format(request.headers['Cookie']))
    print("cookies: {}".format(request.cookies))
    return "Got it!\n"
项目:inshack-2017    作者:HugoDelval    | 项目源码 | 文件源码
def index():
    resp = make_response(render_template("index.html"))
    if "user" not in request.cookies:
        resp.set_cookie('user', base64.b64encode(b'user with no name'))
    return resp
项目:Penny-Dreadful-Tools    作者:PennyDreadfulMTG    | 项目源码 | 文件源码
def create_issue(content, author, location='Discord', repo='PennyDreadfulMTG/Penny-Dreadful-Tools'):
    if content is None or content == '':
        return None
    body = ''
    if '\n' in content:
        title, body = content.split('\n', 1)
        body += '\n\n'
    else:
        title = content
    body += 'Reported on {location} by {author}'.format(location=location, author=author)
    if request:
        body += textwrap.dedent("""
            --------------------------------------------------------------------------------
            Request Method: {method}
            Path: {full_path}
            Cookies: {cookies}
            Endpoint: {endpoint}
            View Args: {view_args}
            Person: {id}
            User-Agent: {user_agent}
            Referrer: {referrer}
        """.format(method=request.method, full_path=request.full_path, cookies=request.cookies, endpoint=request.endpoint, view_args=request.view_args, id=session.get('id', 'logged_out'), user_agent=request.headers.get('User-Agent'), referrer=request.referrer))
    print(title + '\n' + body)
    # Only check for github details at the last second to get log output even if github not configured.
    if not configuration.get('github_user') or not configuration.get('github_password'):
        return None
    g = Github(configuration.get('github_user'), configuration.get('github_password'))
    repo = g.get_repo(repo)
    issue = repo.create_issue(title=title, body=body)
    return issue
项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def filter_client_request():
    """??????, ??????????
    :rtype: Union[Response, None]
    """
    dbgprint('Client Request Url: ', request.url)

    # crossdomain.xml
    if os.path.basename(request.path) == 'crossdomain.xml':
        dbgprint('crossdomain.xml hit from', request.url)
        return crossdomain_xml()

    # Global whitelist ua
    if check_global_ua_pass(str(request.user_agent)):
        return None

    if is_deny_spiders_by_403 and is_denied_because_of_spider(str(request.user_agent)):
        return generate_simple_resp_page(b'Spiders Are Not Allowed To This Site', 403)

    if human_ip_verification_enabled and (
                ((human_ip_verification_whitelist_from_cookies or enable_custom_access_cookie_generate_and_verify)
                 and must_verify_cookies)
            or is_ip_not_in_allow_range(request.remote_addr)
    ):
        dbgprint('ip', request.remote_addr, 'is verifying cookies')
        if 'zmirror_verify' in request.cookies and \
                ((human_ip_verification_whitelist_from_cookies and verify_ip_hash_cookie(request.cookies.get('zmirror_verify')))
                 or (enable_custom_access_cookie_generate_and_verify and custom_verify_access_cookie(
                        request.cookies.get('zmirror_verify'), request))):
            ip_whitelist_add(request.remote_addr, info_record_dict=request.cookies.get('zmirror_verify'))
            dbgprint('add to ip_whitelist because cookies:', request.remote_addr)
        else:
            return redirect(
                "/ip_ban_verify_page?origin=" + base64.urlsafe_b64encode(str(request.url).encode(encoding='utf-8')).decode(
                    encoding='utf-8'),
                code=302)

    return None
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def get_current_user():
    """Set g.user to the currently logged in user.

    Called before each request, get_current_user sets the global g.user
    variable to the currently logged in user.  A currently logged in user is
    determined by seeing if it exists in Flask's session dictionary.

    If it is the first time the user is logging into this application it will
    create the user and insert it into the database.  If the user is not logged
    in, None will be set to g.user.
    """

    # Set the user in the session dictionary as a global g.user and bail out
    # of this function early.
    if session.get('user'):
        g.user = session.get('user')
        return

    # Attempt to get the short term access token for the current user.
    result = get_user_from_cookie(cookies=request.cookies, app_id=FB_APP_ID,
                                  app_secret=FB_APP_SECRET)

    # If there is no result, we assume the user is not logged in.
    if result:
        # Check to see if this user is already in our database.
        user = User.query.filter(User.id == result['uid']).first()

        if not user:
            # Not an existing user so get info
            graph = GraphAPI(result['access_token'])
            profile = graph.get_object('me')
            if 'link' not in profile:
                profile['link'] = ""

            # Create the user and insert it into the database
            user = User(id=str(profile['id']), name=profile['name'],
                        profile_url=profile['link'],
                        access_token=result['access_token'])
            db.session.add(user)
        elif user.access_token != result['access_token']:
            # If an existing user, update the access token
            user.access_token = result['access_token']

        # Add the user to the current session
        session['user'] = dict(name=user.name, profile_url=user.profile_url,
                               id=user.id, access_token=user.access_token)

    # Commit changes to the database and set the user as a global g.user
    db.session.commit()
    g.user = session.get('user', None)
项目:lab5    作者:zlotus    | 项目源码 | 文件源码
def session_service():
    resp = jsonify(success=False)
    if request.method == 'DELETE':
        resp = jsonify(success=True)
        resp.set_cookie(key="token", expires=0)
        return set_debug_response_header(resp)
    if request.method == 'POST':
        username, password = request.json['username'], request.json['password']
        user = user_datastore.get_user(username)
        password_hash = user.password
        if verify_password(password, password_hash):
            print('user login: %s' % user.user_name + ' verified')
            resp = jsonify(success=True, userID=user.id)
            resp.set_cookie(key="token",
                            value=str({"id": user.id, "deadline": (time.time() + 86400) * 1000}),
                            max_age=7200,
                            httponly=True)
            return set_debug_response_header(resp)
        else:
            resp = jsonify(success=False, loginError='????????')
            return set_debug_response_header(resp)
    elif request.method == 'GET':
        token, deadline, user_id, user = None, None, None, None
        if not request.cookies:
            resp = jsonify(success=False, loginError='???')
            return set_debug_response_header(resp)
        else:
            cookies = request.cookies

        if not cookies.get('token'):
            resp = jsonify(success=False, loginError='???')
            return set_debug_response_header(resp)
        else:
            token = json.loads(cookies['token'].replace('\'', '"'))

        if not token.get('deadline') or not token.get('id'):
            resp = jsonify(success=False, loginError='???')
            return set_debug_response_header(resp)
        else:
            deadline = int(token['deadline'])
            user_id = int(token['id'])
            user = app_models.User.query.get(user_id)

        if time.time() > (deadline / 1000):
            resp = jsonify(success=False, loginError='????')
            return set_debug_response_header(resp)
        if user:
            result = {
                'success': True,
                'user': {
                    'userID': user.id,
                    'userName': user.user_name,
                    'permissions': [p.name for p in user.roles]
                }
            }
            resp = Response(json.dumps(result))
            return set_debug_response_header(resp)
    elif request.method == 'OPTIONS':
        pass

    return set_debug_response_header(resp)
项目:flask-fb-neo4j-d3    作者:suprfanz    | 项目源码 | 文件源码
def get_current_user():
    """Set g.user to the currently logged in user.

    Called before each request, get_current_user sets the global g.user
    variable to the currently logged in user.  A currently logged in user is
    determined by seeing if it exists in Flask's session dictionary.

    If it is the first time the user is logging into this application it will
    create the user and insert it into the database.  If the user is not logged
    in, None will be set to g.user.
    """

    # Set the user in the session dictionary as a global g.user and bail out
    # of this function early.
    if session.get('user'):
        g.user = session.get('user')
        return

    # Attempt to get the short term access token for the current user.
    result = get_user_from_cookie(cookies=request.cookies, app_id=FB_APP_ID,
                                  app_secret=FB_APP_SECRET)

    # If there is no result, we assume the user is not logged in.
    if result:
        graph = GraphAPI(result['access_token'])

        profile = graph.get_object('me')

        if 'link' not in profile:
            # Check to see if this user is already in our database.
            profile['link'] = ""
            user = User(result['uid'], name=profile['name'], profile_url=profile['link'],
                        access_token=result['access_token'])

            user = user.check_user()

            if not user:
                # Not an existing user so get info
                graph = GraphAPI(result['access_token'])
                profile = graph.get_object('me')
                if 'link' not in profile:
                    profile['link'] = ""

                    # Create the user and insert it into the database '

                    user = User(result['uid'], profile['name'], profile['link'], result['access_token'])
                    user.create_user()

            elif user['access_token'] != result['access_token']:
                # If an existing user, update the access token
                user['access_token'] = result['access_token']

            # Add the user to the current session
            session['user'] = dict(name=profile['name'], profile_url=profile['link'],
                                   id=result['uid'], access_token=result['access_token'])

    # Commit changes to the database and set the user as a global g.user
    g.user = session.get('user', None)