Python urllib.parse 模块,parse_qsl() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.parse_qsl()

项目:sketal    作者:vk-brain    | 项目源码 | 文件源码
def get_url_query(self, url):
        if not isinstance(url, str):
            url = str(url)

        parsed_url = urlparse(url)
        url_query = parse_qsl(parsed_url.fragment or parsed_url.query)

        # login_response_url_query can have multiple key
        url_query = dict(url_query)

        token = self.get_token_from_url(url)
        if token:
            url_query["access_token"] = token

        return url_query

    ############################################################################
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def parseURL(url: str) -> URLParts:
    """ Parse the given URL into a URLParts named tuple and normalize any relevant domain names """
    try:
        parsed = urlparse(url)
        if parsed.hostname:
            hostname = parsed.hostname
        else:
            hostname = ''
        if config.hostnameSanitizers:
            for regex, result in config.hostnameSanitizers.items():
                if re.compile(regex).match(hostname):
                    domain = result
                    break
            else:
                domain = hostname
        else:
            domain = hostname

        return URLParts(domain=domain,
                        path=parsed.path,
                        queries=dict(parse_qsl(parsed.query)))
    except ValueError:
        return None
项目:weibopy    作者:nooperpudd    | 项目源码 | 文件源码
def test_authorize_url(self):
        """
        :return:
        """

        request_params = {
            'client_id': self.client_id,
            'redirect_uri': self.redirect_url,
            "forcelogin": "false"
        }

        parse_result = parse.urlparse(self.oauth2_client.authorize_url)
        query_params = dict(parse.parse_qsl(parse_result.query))

        self.assertEqual(parse_result.hostname, "api.weibo.com")
        self.assertEqual(parse_result.path, "/oauth2/authorize")

        self.assertDictEqual(query_params, request_params)
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def facebook_compliance_fix(session):

    def _compliance_fix(r):
        # if Facebook claims to be sending us json, let's trust them.
        if 'application/json' in r.headers.get('content-type', {}):
            return r

        # Facebook returns a content-type of text/plain when sending their
        # x-www-form-urlencoded responses, along with a 200. If not, let's
        # assume we're getting JSON and bail on the fix.
        if 'text/plain' in r.headers.get('content-type', {}) and r.status_code == 200:
            token = dict(parse_qsl(r.text, keep_blank_values=True))
        else:
            return r

        expires = token.get('expires')
        if expires is not None:
            token['expires_in'] = expires
        token['token_type'] = 'Bearer'
        r._content = to_unicode(dumps(token)).encode('UTF-8')
        return r

    session.register_compliance_hook('access_token_response', _compliance_fix)
    return session
项目:swagger-stub    作者:Trax-air    | 项目源码 | 文件源码
def get_parsed_body(request):
    """Get the body (json or formData) of the request parsed.

    Args:
        request: request object to get the body from.

    Returns:
        A dictionary of the body.

    Raises:
        ValueError: if it is neither a json or a formData
    """
    data = None
    if not request.parsed_body == '':
        try:
            data = json.loads(request.body.decode('utf-8'))
        except Exception:  # Decode formData
            data = dict(parse_qsl(request.body))

            if not data:  # Not a form data
                raise
    return data
项目:globus-cli    作者:globus    | 项目源码 | 文件源码
def do_GET(self):  # noqa
            self.send_response(200)
            self.send_header('Content-type', 'text/html')
            self.end_headers()

            query_params = dict(parse_qsl(urlparse(self.path).query))
            code = query_params.get('code')
            if code:
                self.wfile.write(
                    six.b(
                        HTML_TEMPLATE.substitute(
                            post_login_message=DOC_URL,
                            login_result='Login successful')))
                self.server.return_code(code)
            else:
                msg = query_params.get(
                    'error_description', query_params.get('error'))

                self.wfile.write(
                    six.b(
                        HTML_TEMPLATE.substitute(
                            post_login_message=msg,
                            login_result='Login failed')))

                self.server.return_code(LocalServerError(msg))
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def noredirect_url(url, forced_path=None):
    """
        ?????????? ? ???? ?????????, ???????????? ????????????
    """
    url_parts = list(urlparse(url))
    query = dict(parse_qsl(url_parts[4]))

    if forced_path:
        forced_path_parts = list(urlparse(forced_path))
        url_parts[2] = forced_path_parts[2]
        forced_path_query = dict(parse_qsl(forced_path_parts[4]))
        query.update(forced_path_query)

    query.update({
        options.MULTILANGUAGE_GET_PARAM: 1
    })
    url_parts[4] = urlencode(query)

    return urlunparse(url_parts)
项目:socialauth    作者:emilyhorsman    | 项目源码 | 文件源码
def get_user_information(self):
        token = oauth2.Token(self.oauth_token, self.oauth_token_secret)
        token.set_verifier(self.oauth_verifier)
        client = oauth2.Client(self.consumer, token)

        url = 'https://api.twitter.com/oauth/access_token'
        resp, content = client.request(url, 'POST')
        if resp.status != 200:
            raise Error('{} from Twitter'.format(resp.status))

        provider_user = dict(parse_qsl(content.decode('utf-8')))
        if 'user_id' not in provider_user:
            raise Error('No user_id from Twitter')

        self.status    = 200
        self.user_id   = provider_user.get('user_id')
        self.user_name = provider_user.get('screen_name', None)
        return (self.user_id, self.user_name,)
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def format_body(self, body):
        urlencoed_list = urlparse.parse_qsl(body)
        if not urlencoed_list:
            return ""
        max_length = max(map(lambda kv: len(kv[0]), urlencoed_list))
        texts = []
        for k, v in urlencoed_list:
            formatter = self._get_value_formatter(v)
            try:
                formatted_content = formatter.format_body(v)
            except:
                texts.append("{0}: {1}".format(
                    k.ljust(max_length), v))
            else:
                texts.append("{0}:".format(k.ljust(max_length)))
                texts.append(formatted_content)

        return u"\n".join(texts)
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def format_tui(self, body):
        urlencoed_list = urlparse.parse_qsl(body)
        if not urlencoed_list:
            return []

        max_length = max(map(lambda kv: len(kv[0]), urlencoed_list))
        texts = []
        for k, v in urlencoed_list:
            formatter = self._get_value_formatter(v)
            try:
                formatted_list = formatter.format_tui(v)
            except:
                texts.append(Text("{0}: {1}".format(
                    k.ljust(max_length), v)))
            else:
                texts.append(Text("{0}:".format(k.ljust(max_length))))
                texts = texts + formatted_list

        return texts
项目:microProxy    作者:mike820324    | 项目源码 | 文件源码
def format_console(self, body):  # pragma: no cover
        urlencoed_list = urlparse.parse_qsl(body)
        if not urlencoed_list:
            return ""

        max_length = max(map(lambda kv: len(kv[0]), urlencoed_list))
        texts = []
        for k, v in urlencoed_list:
            formatter = self._get_value_formatter(v)
            try:
                formatted_content = formatter.format_console(v)
            except:
                texts.append("{0}: {1}".format(
                    k.ljust(max_length), v))
            else:
                texts.append("{0}:".format(k.ljust(max_length)))
                texts.append(formatted_content)

        return u"\n".join(texts)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def walkNextPages(sess, url="https://iptime.com/iptime/?page_id=126&dffid=1&dfsid=11"):
    try:
        from os.path import basename

        def get_pageid(url):
            from urllib.parse import parse_qsl, urlsplit
            qs = dict(parse_qsl(urlsplit(url).query))
            return int(qs.get("pageid", "1"))
        while True:
            pageid = get_pageid(url)
            print("pageid=%d" % pageid)
            walkListItems(sess, url)

            root = html.fromstring(sess.get(url=url).text)
            arrows = [basename(_) for _ in root.xpath(".//ul[@class='pages']//img/@src")]
            if 'next_1.gif' not in arrows:
                break
            nexturl = next(_ for _ in root.xpath(".//ul[@class='pages']//img") if
                           basename(_.attrib['src']) == 'next_1.gif')
            url = urljoin(url, nexturl.xpath("../../a/@href")[0])
            nextpageid = get_pageid(url)
            assert nextpageid == pageid+1
    except BaseException as ex:
        traceback.print_exc()
        print(ex)
项目:payapp    作者:ssut    | 项目源码 | 文件源码
def _post(self, cmd, data={}):
        assert cmd in CMD_LIST
        assert self.user_id

        data.update({
            'cmd': cmd,
            'userid': self.user_id,
        })
        resp = requests.post(REST_URL, data=data)
        success = False
        error = None
        content = {}
        if resp.status_code == 200:
            data = dict(parse_qsl(resp.text))
            if data['state'] == '1':
                success = True
            elif 'errorMessage' in data:
                error = data['errorMessage']
            content = Struct(**{k: v for k, v in data.items() if k not in ('state', 'errorMessage')})
        result = PayAppInternalResult(success=success, error=error, content=content)

        return result
项目:payapp    作者:ssut    | 项目源码 | 文件源码
def verify_callback(self, params):
        """??? ?? ????? ?????.

        :param params: `dict` ?? ?? ??? `str`, POST ???
        """
        if type(params) is str:
            params = Struct(**dict(parse_qsl(params)))
        elif type(params) is dict:
            params = Struct(**params)

        valid = True
        try:
            assert self.user_id == params.userid, u'??? ?? ???? ???? ????.'
            assert self.link_key == params.linkkey, u'?? KEY? ???? ????.'
            assert self.link_value == params.linkval, u''
        except (AssertionError, AttributeError) as e:
            valid = False
            print(e)

        return valid
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_adds_other_supplied_values_as_query_string():
    app = Sanic('passes')

    @app.route(COMPLEX_PARAM_URL)
    def passes():
        return text('this should pass')

    new_kwargs = dict(PASSING_KWARGS)
    new_kwargs['added_value_one'] = 'one'
    new_kwargs['added_value_two'] = 'two'

    url = app.url_for('passes', **new_kwargs)

    query = dict(parse_qsl(urlsplit(url).query))

    assert query['added_value_one'] == 'one'
    assert query['added_value_two'] == 'two'
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def merge_url_qs(url: str, **kw) -> str:
    """Merge the query string elements of a URL with the ones in ``kw``.

    If any query string element exists in ``url`` that also exists in ``kw``, replace it.

    :param url: An URL.
    :param kw: Dictionary with keyword arguments.
    :return: An URL with keyword arguments merged into the query string.
    """
    segments = urlsplit(url)
    extra_qs = [
        (k, v)
        for (k, v) in parse_qsl(segments.query, keep_blank_values=1)
        if k not in kw
    ]
    qs = urlencode(sorted(kw.items()))
    if extra_qs:
        qs += '&' + urlencode(extra_qs)
    return urlunsplit((segments.scheme, segments.netloc, segments.path, qs, segments.fragment))
项目:grasp    作者:textgain    | 项目源码 | 文件源码
def serialize(url='', data={}):
    """ Returns a URL with a query string of the given data.
    """
    p = urlparse.urlsplit(url)
    q = urlparse.parse_qsl(p.query)
    q.extend((b(k), b(v)) for k, v in sorted(data.items()))
    q = urlencode(q, doseq=True)
    p = p.scheme, p.netloc, p.path, q, p.fragment
    s = urlparse.urlunsplit(p)
    s = s.lstrip('?')
    return s

# print(serialize('http://www.google.com', {'q': 'cats'})) # http://www.google.com?q=cats

#---- REQUESTS & STREAMS --------------------------------------------------------------------------
# The download(url) function returns the HTML (JSON, image data, ...) at the given url.
# If this fails it will raise NotFound (404), Forbidden (403) or TooManyRequests (420).
项目:USTC-Software-2017    作者:igemsoftware2017    | 项目源码 | 文件源码
def add_params(url, **params):
    """
    Append or replace query params in `url` using `params`.
    """
    url = parse.unquote(url)
    parsed = parse.urlparse(url)
    existing = dict(parse.parse_qsl(parsed.query))
    existing.update(params)
    return parse.urlunparse(
        parse.ParseResult(
            scheme=parsed.scheme,
            netloc=parsed.netloc,
            path=parsed.path,
            params=parsed.params,
            query=parse.urlencode(existing),
            fragment=parsed.fragment
        )
    )
项目:USTC-Software-2017    作者:igemsoftware2017    | 项目源码 | 文件源码
def add_params(url, **params):
    """
    Append or replace query params in `url` using `params`.
    """
    url = parse.unquote(url)
    parsed = parse.urlparse(url)
    existing = dict(parse.parse_qsl(parsed.query))
    existing.update(params)
    return parse.urlunparse(
        parse.ParseResult(
            scheme=parsed.scheme,
            netloc=parsed.netloc,
            path=parsed.path,
            params=parsed.params,
            query=parse.urlencode(existing),
            fragment=parsed.fragment
        )
    )
项目:serveradmin    作者:innogames    | 项目源码 | 文件源码
def pagination(context, page, pagination_id=None):
    request = context['request']
    url = request.path
    qs = parse_qsl(request.META['QUERY_STRING'])
    new_qs = [(key, value) for key, value in qs if key != 'page']
    if new_qs:
        sep = '&'
        url += urlencode(new_qs)
    else:
        sep = '?'

    return {
        'page': page,
        'paginator': page.paginator,
        'pagination_id': pagination_id,
        'url': url,
        'sep': sep
    }
项目:loginsight-export    作者:vmware    | 项目源码 | 文件源码
def login(self, request, context):
        """Attempt to create a new session with provided credentials."""
        self.session_timeout()
        attempted_credentials = dict(parse_qsl(request.text))

        assert attempted_credentials['_eventName'] == 'loginAjax'

        for u in self.users_known:
            if u.username == attempted_credentials['username'] and u.provider == attempted_credentials['authMethod']:
                if u.password == attempted_credentials['password']:
                    mockserverlogger.info("Successful authentication as {u.username} = {u.userId}".format(u=u))
                    context.status_code = 200
                    sessionId = self.sessions_known.append(Session(u.userId, 1800, time.time()))
                    context.cookies.set('JSESSIONID', sessionId)
                    return json.dumps({"succ": True})
                else:  # wrong password, bail out
                    mockserverlogger.warning("Correct username but invalid password (which is OK to say, because this is a mock)")
                    break
        context.status_code = 401
        return json.dumps({"errorMessage": "Invalid username or password.", "errorCode": "FIELD_ERROR"})
项目:bernard    作者:BernardFW    | 项目源码 | 文件源码
def patch_qs(url: Text, data: Dict[Text, Text]) -> Text:
    """
    Given an URL, change the query string to include the values specified in
    the dictionary.

    If the keys of the dictionary can be found in the query string of the URL,
    then they will be removed.

    It is guaranteed that all other values of the query string will keep their
    order.
    """

    qs_id = 4
    p = list(urlparse(url))
    qs = parse_qsl(p[qs_id])  # type: List[Tuple[Text, Text]]
    patched_qs = list(chain(
        filter(lambda x: x[0] not in data, qs),
        data.items(),
    ))

    p[qs_id] = urlencode(patched_qs)

    return urlunparse(p)
项目:splitwise    作者:namaggarwal    | 项目源码 | 文件源码
def getAuthorizeURL(self):

        client = oauth.Client(self.consumer)

        #Get the request token
        resp, content = client.request(Splitwise.REQUEST_TOKEN_URL, "POST")

        if Splitwise.isDebug():
            print(resp, content)

        #Check if the response is correct
        if resp['status'] != '200':
            raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])

        request_token = dict(parse_qsl(content.decode("utf-8")))

        return "%s?oauth_token=%s" % (Splitwise.AUTHORIZE_URL, request_token['oauth_token']), request_token['oauth_token_secret']
项目:splitwise    作者:namaggarwal    | 项目源码 | 文件源码
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier):

        token = oauth.Token(oauth_token,oauth_token_secret)
        token.set_verifier(oauth_verifier)
        client = oauth.Client(self.consumer, token)

        resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST")

        if Splitwise.isDebug():
            print(resp, content)

        #Check if the response is correct
        if resp['status'] != '200':
            raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status'])

        access_token = dict(parse_qsl(content.decode("utf-8")))

        return access_token
项目:erpnext_quickbooks    作者:indictranstech    | 项目源码 | 文件源码
def get_authorize_url(self):
        """
        Returns the Authorize URL as returned by QB, and specified by OAuth 1.0a.
        :return URI:
        """
        if self.qbService is None:
            self.set_up_service()

        response = self.qbService.get_raw_request_token(
           params={'oauth_callback': self.callback_url})

        oauth_resp = dict(parse_qsl(response.text))
        self.request_token = oauth_resp['oauth_token']
        self.request_token_secret = oauth_resp['oauth_token_secret']

        return self.qbService.get_authorize_url(self.request_token)
项目:mach9    作者:silver-castle    | 项目源码 | 文件源码
def test_adds_other_supplied_values_as_query_string():
    app = Mach9('passes')

    @app.route(COMPLEX_PARAM_URL)
    def passes():
        return text('this should pass')

    new_kwargs = dict(PASSING_KWARGS)
    new_kwargs['added_value_one'] = 'one'
    new_kwargs['added_value_two'] = 'two'

    url = app.url_for('passes', **new_kwargs)

    query = dict(parse_qsl(urlsplit(url).query))

    assert query['added_value_one'] == 'one'
    assert query['added_value_two'] == 'two'
项目:PassiveScanner    作者:jjf012    | 项目源码 | 文件源码
def url_etl(url):
    params_new = {}
    u = urlparse(url)
    path_new = re.sub(r"\d+", "N", u.path)
    query = unquote(u.query)
    # ??????????id
    if not query:
        url_new = urlunparse((u.scheme, u.netloc, path_new, "", "", ""))
        return url_new
    params = parse_qsl(query, True)
    for k, v in params:
        # ??????
        if k in ["_", "timestamp"]:
            continue
        # ????????
        if v:
            params_new[k] = etl(v)
    query_new = urlencode(params_new)
    url_new = urlunparse((u.scheme, u.netloc, path_new, u.params, query_new, u.fragment))
    return url_new
项目:alexa-voice-service-client    作者:richtier    | 项目源码 | 文件源码
def handle_callback(self):
        params = dict(parse_qsl(urlparse(self.path).query))
        payload = {
            'client_id': self.client_id,
            'client_secret': self.client_secret,
            'code': quote(params['code']),
            'grant_type': 'authorization_code',
            'redirect_uri': self.callback_url,
        }
        response = requests.post(self.oauth2_token_url, json=payload)
        if response.status_code != 200:
            self.send_response(response.status_code)
            self.send_header('Content-type', 'text/html')
            self.end_headers()
            self.wfile.write(response.content)
        else:
            self.send_response(response.status_code)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            refresh_token = response.json()['refresh_token']
            self.wfile.write(bytes('refresh_token: ' + refresh_token, 'utf8'))
        return
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def serialize_sample(self, sample):
        sample = OrderedDict(parse_qsl(sample))
        if self.pb_param not in sample:
            return False
        if 'token' in sample:
            del sample['token']
        if 'callback' in sample:
            sample['callback'] = ''
        return sample
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def parse_qs(self, sample):
        pb = match('(?:&?\d[^&=]+)*', sample)
        return OrderedDict([(pb.group(0), ''),
                            *parse_qsl(sample[pb.end() + 1:], True)])
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def get_url_query(url):
    parsed_url = urlparse(url)
    url_query = parse_qsl(parsed_url.fragment)
    # login_response_url_query can have multiple key
    url_query = dict(url_query)
    return url_query
项目:Gnome-Authenticator    作者:bil-elmoussaoui    | 项目源码 | 文件源码
def read(self):
        with open(self.filename, 'rb') as image_file:
            image = Image.open(image_file)
            image.load()
        self.codes = scan_codes('qrcode', image)
        self.remove()
        if self.codes:
            otpauth_url = self.codes[0].decode()
            self.codes = dict(parse_qsl(urlparse(otpauth_url)[4]))
            return self.codes
        else:
            logging.error("Invalid QR image")
            return None
项目:libmozdata    作者:mozilla    | 项目源码 | 文件源码
def build_path(self, method, url):
        """
        Build a unique filename from method & url
        """
        # Build directory to request
        out = urlparse(url)
        parts = [
            '{}_{}'.format(out.scheme, out.hostname),
        ]
        parts += filter(None, out.path.split('/'))
        directory = os.path.join(MOCKS_DIR, *parts)

        # Build sorted query filename
        query = sorted(parse_qsl(out.query))
        query = ['{}={}'.format(k, v.replace('/', '_')) for k, v in query]
        query_str = '_'.join(query)

        # Use hashes to avoid too long names
        if len(query_str) > 150:
            query_str = '{}_{}'.format(query_str[0:100], hashlib.md5(query_str.encode('utf-8')).hexdigest())
        filename = '{}_{}.gz'.format(method, query_str)

        # Build directory
        if not os.path.isdir(directory):
            try:
                os.makedirs(directory)
            except Exception as e:
                logger.error('Concurrency error when building directories: {}'.format(e))

        return os.path.join(directory, filename)
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def _has_strict_url_match(self, url, other):
        url_parsed = urlparse(url)
        other_parsed = urlparse(other)

        if url_parsed[:3] != other_parsed[:3]:
            return False

        url_qsl = sorted(parse_qsl(url_parsed.query))
        other_qsl = sorted(parse_qsl(other_parsed.query))
        return url_qsl == other_qsl
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def __call__(self, environ, start_response):
        self.requests.append((
            environ.get('PATH_INFO'),
            urlparse.parse_qsl(environ.get('QUERY_STRING'))))
        start_response('%s OK' % self.response_code, self.headers)
        return [self.response_data]
项目:nightreads    作者:avinassh    | 项目源码 | 文件源码
def _update_url_query_param(url, query_params):
    url_parts = parse.urlparse(url)
    old_qs_args = dict(parse.parse_qsl(url_parts[4]))
    old_qs_args.update(query_params)
    new_qs = parse.urlencode(old_qs_args)
    return parse.urlunparse(
        list(url_parts[0:4]) + [new_qs] + list(url_parts[5:]))
项目:pyop    作者:IdentityPython    | 项目源码 | 文件源码
def test_error_url_should_contain_state_from_authentication_request(self):
        authn_params = {'redirect_uri': 'test_redirect_uri', 'response_type': 'code', 'state': 'test_state'}
        authn_req = AuthorizationRequest().from_dict(authn_params)
        error_url = InvalidAuthenticationRequest('test', authn_req, 'invalid_request').to_error_url()

        error = dict(parse_qsl(urlparse(error_url).query))
        assert error['state'] == authn_params['state']
项目:pyop    作者:IdentityPython    | 项目源码 | 文件源码
def _verify_client_authentication(self, request_body, http_headers=None):
        # type (str, Optional[Mapping[str, str]] -> Mapping[str, str]
        """
        Verifies the client authentication.
        :param request_body: urlencoded token request
        :param http_headers:
        :return: The parsed request body.
        """
        if http_headers is None:
            http_headers = {}
        token_request = dict(parse_qsl(request_body))
        token_request['client_id']  = verify_client_authentication(self.clients, token_request, http_headers.get('Authorization'))
        return token_request
项目:pyop    作者:IdentityPython    | 项目源码 | 文件源码
def handle_userinfo_request(self, request=None, http_headers=None):
        # type: (Optional[str], Optional[Mapping[str, str]]) -> oic.oic.message.OpenIDSchema
        """
        Handles a userinfo request.
        :param request: urlencoded request (either query string or POST body)
        :param http_headers: http headers
        """
        if http_headers is None:
            http_headers = {}
        userinfo_request = dict(parse_qsl(request))
        bearer_token = extract_bearer_token_from_http_request(userinfo_request, http_headers.get('Authorization'))

        introspection = self.authz_state.introspect_access_token(bearer_token)
        if not introspection['active']:
            raise InvalidAccessToken('The access token has expired')
        scope = introspection['scope']
        user_id = self.authz_state.get_user_id_for_subject_identifier(introspection['sub'])

        requested_claims = scope2claims(scope.split())
        authentication_request = self.authz_state.get_authorization_request_for_access_token(bearer_token)
        requested_claims.update(self._get_requested_claims_in(authentication_request, 'userinfo'))
        user_claims = self.userinfo.get_claims_for(user_id, requested_claims)

        user_claims.setdefault('sub', introspection['sub'])
        response = OpenIDSchema(**user_claims)
        logger.debug('userinfo=%s from requested_claims=%s userinfo=%s',
                     response, requested_claims, user_claims)
        return response
项目:py-restfmclient    作者:pcdummy    | 项目源码 | 文件源码
def url(self):
        url_parts = list(urlparse.urlparse(self._base_url + self._path))
        query = dict(urlparse.parse_qsl(url_parts[4]))
        query.update(self._query)
        url_parts[4] = urlencode(query)
        return urlparse.urlunparse(url_parts)
项目:py-restfmclient    作者:pcdummy    | 项目源码 | 文件源码
def _store(self, store_path, method, url, result):
        if not HAS_AIOFILES:  # pragma: no cover
            return

        url_parts = list(urlparse.urlparse(url))
        filename = 'index.json'
        if url_parts[4] != '':  # pragma: no coverage
            query = OrderedDict(
                sorted(dict(urlparse.parse_qsl(url_parts[4])).items())
            )
            filename += '?%s' % urlencode(query)
        save_path = path.join(
            unquote(path.join(
                store_path,
                url_parts[2][1:],
                method.lower(),
            )),
            filename
        )

        try:
            os.makedirs(path.dirname(save_path), 0o755)
        except OSError:  # pragma: no cover
            # Dir exists.
            pass

        self.logger.debug('Saving: %s, len: %d' % (save_path, len(result)))
        async with aiofiles.open(save_path, mode='w', loop=self._loop) as f:
            await f.write(result)
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def parse_oauth_response(data):
        """Parses the data string as OAuth response, returning it as a dict.

        The keys and values of the dictionary will be text strings (i.e. not binary strings).
        """

        if isinstance(data, six.binary_type):
            data = data.decode('utf-8')
        qsl = urllib_parse.parse_qsl(data)

        resp = {}
        for key, value in qsl:
            resp[key] = value

        return resp
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def add_params_to_qs(query, params):
    """Extend a query with a list of two-tuples."""
    if isinstance(params, dict):
        params = params.items()
    queryparams = urlparse.parse_qsl(query, keep_blank_values=True)
    queryparams.extend(params)
    return urlencode(queryparams)
项目:nsc-cloudproject-s22016    作者:agitaretech    | 项目源码 | 文件源码
def add_params_to_qs(query, params):
    """Extend a query with a list of two-tuples."""
    if isinstance(params, dict):
        params = params.items()
    queryparams = urlparse.parse_qsl(query, keep_blank_values=True)
    queryparams.extend(params)
    return urlencode(queryparams)
项目:nsc-cloudproject-s22016    作者:agitaretech    | 项目源码 | 文件源码
def uri_query_params(self):
        if not self.uri_query:
            return []
        return urlparse.parse_qsl(self.uri_query, keep_blank_values=True,
                                  strict_parsing=True)
项目:nsc-cloudproject-s22016    作者:agitaretech    | 项目源码 | 文件源码
def add_params_to_qs(query, params):
    """Extend a query with a list of two-tuples."""
    if isinstance(params, dict):
        params = params.items()
    queryparams = urlparse.parse_qsl(query, keep_blank_values=True)
    queryparams.extend(params)
    return urlencode(queryparams)
项目:nsc-cloudproject-s22016    作者:agitaretech    | 项目源码 | 文件源码
def uri_query_params(self):
        if not self.uri_query:
            return []
        return urlparse.parse_qsl(self.uri_query, keep_blank_values=True,
                                  strict_parsing=True)
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def _build_url(self, base_url, **kwargs):
        """ ?????????? ? ???????? ?????? GET-?????????? """
        urlparts = list(parse.urlparse(base_url))
        query = dict(parse.parse_qsl(urlparts[4]))
        query.update(kwargs)
        urlparts[4] = parse.urlencode(query)
        return parse.urlunparse(urlparts)
项目:socialauth    作者:emilyhorsman    | 项目源码 | 文件源码
def get_initial_oauth_tokens(self):
        url = 'https://api.twitter.com/oauth/request_token'
        client = oauth2.Client(self.consumer)
        resp, content = client.request(url, 'GET')
        if resp.status != 200:
            raise Error('{} from Twitter'.format(resp.status))

        oauth_values = dict(parse_qsl(content.decode('utf-8')))
        self.oauth_token        = oauth_values.get('oauth_token')
        self.oauth_token_secret = oauth_values.get('oauth_token_secret')
        if not self.oauth_token or not self.oauth_token_secret:
            raise Error('No oauth_token or oauth_token_secret from Twitter')

        return (self.oauth_token, self.oauth_token_secret,)
项目:msgraph-sdk-python    作者:microsoftgraph    | 项目源码 | 文件源码
def test_path_creation_with_query(self, MockHttpProvider, MockAuthProvider):
        """
        Tests that a path is created with the correct query parameters
        """
        http_provider = msgraph.HttpProvider()
        auth_provider = msgraph.AuthProvider()
        client = msgraph.GraphClient("graphurl/", http_provider, auth_provider)

        request = client.drives["me"].items["root"].children.request(top=3, select="test")

        query_dict = dict(parse_qsl(urlparse(request.request_url).query))
        expected_dict = {"select":"test", "top":"3"}
        assert all(item in query_dict.items() for item in expected_dict.items())