我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.parse_qsl()。
def get_url_query(self, url): if not isinstance(url, str): url = str(url) parsed_url = urlparse(url) url_query = parse_qsl(parsed_url.fragment or parsed_url.query) # login_response_url_query can have multiple key url_query = dict(url_query) token = self.get_token_from_url(url) if token: url_query["access_token"] = token return url_query ############################################################################
def parseURL(url: str) -> URLParts: """ Parse the given URL into a URLParts named tuple and normalize any relevant domain names """ try: parsed = urlparse(url) if parsed.hostname: hostname = parsed.hostname else: hostname = '' if config.hostnameSanitizers: for regex, result in config.hostnameSanitizers.items(): if re.compile(regex).match(hostname): domain = result break else: domain = hostname else: domain = hostname return URLParts(domain=domain, path=parsed.path, queries=dict(parse_qsl(parsed.query))) except ValueError: return None
def test_authorize_url(self): """ :return: """ request_params = { 'client_id': self.client_id, 'redirect_uri': self.redirect_url, "forcelogin": "false" } parse_result = parse.urlparse(self.oauth2_client.authorize_url) query_params = dict(parse.parse_qsl(parse_result.query)) self.assertEqual(parse_result.hostname, "api.weibo.com") self.assertEqual(parse_result.path, "/oauth2/authorize") self.assertDictEqual(query_params, request_params)
def facebook_compliance_fix(session): def _compliance_fix(r): # if Facebook claims to be sending us json, let's trust them. if 'application/json' in r.headers.get('content-type', {}): return r # Facebook returns a content-type of text/plain when sending their # x-www-form-urlencoded responses, along with a 200. If not, let's # assume we're getting JSON and bail on the fix. if 'text/plain' in r.headers.get('content-type', {}) and r.status_code == 200: token = dict(parse_qsl(r.text, keep_blank_values=True)) else: return r expires = token.get('expires') if expires is not None: token['expires_in'] = expires token['token_type'] = 'Bearer' r._content = to_unicode(dumps(token)).encode('UTF-8') return r session.register_compliance_hook('access_token_response', _compliance_fix) return session
def get_parsed_body(request): """Get the body (json or formData) of the request parsed. Args: request: request object to get the body from. Returns: A dictionary of the body. Raises: ValueError: if it is neither a json or a formData """ data = None if not request.parsed_body == '': try: data = json.loads(request.body.decode('utf-8')) except Exception: # Decode formData data = dict(parse_qsl(request.body)) if not data: # Not a form data raise return data
def do_GET(self): # noqa self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() query_params = dict(parse_qsl(urlparse(self.path).query)) code = query_params.get('code') if code: self.wfile.write( six.b( HTML_TEMPLATE.substitute( post_login_message=DOC_URL, login_result='Login successful'))) self.server.return_code(code) else: msg = query_params.get( 'error_description', query_params.get('error')) self.wfile.write( six.b( HTML_TEMPLATE.substitute( post_login_message=msg, login_result='Login failed'))) self.server.return_code(LocalServerError(msg))
def noredirect_url(url, forced_path=None): """ ?????????? ? ???? ?????????, ???????????? ???????????? """ url_parts = list(urlparse(url)) query = dict(parse_qsl(url_parts[4])) if forced_path: forced_path_parts = list(urlparse(forced_path)) url_parts[2] = forced_path_parts[2] forced_path_query = dict(parse_qsl(forced_path_parts[4])) query.update(forced_path_query) query.update({ options.MULTILANGUAGE_GET_PARAM: 1 }) url_parts[4] = urlencode(query) return urlunparse(url_parts)
def get_user_information(self): token = oauth2.Token(self.oauth_token, self.oauth_token_secret) token.set_verifier(self.oauth_verifier) client = oauth2.Client(self.consumer, token) url = 'https://api.twitter.com/oauth/access_token' resp, content = client.request(url, 'POST') if resp.status != 200: raise Error('{} from Twitter'.format(resp.status)) provider_user = dict(parse_qsl(content.decode('utf-8'))) if 'user_id' not in provider_user: raise Error('No user_id from Twitter') self.status = 200 self.user_id = provider_user.get('user_id') self.user_name = provider_user.get('screen_name', None) return (self.user_id, self.user_name,)
def format_body(self, body): urlencoed_list = urlparse.parse_qsl(body) if not urlencoed_list: return "" max_length = max(map(lambda kv: len(kv[0]), urlencoed_list)) texts = [] for k, v in urlencoed_list: formatter = self._get_value_formatter(v) try: formatted_content = formatter.format_body(v) except: texts.append("{0}: {1}".format( k.ljust(max_length), v)) else: texts.append("{0}:".format(k.ljust(max_length))) texts.append(formatted_content) return u"\n".join(texts)
def format_tui(self, body): urlencoed_list = urlparse.parse_qsl(body) if not urlencoed_list: return [] max_length = max(map(lambda kv: len(kv[0]), urlencoed_list)) texts = [] for k, v in urlencoed_list: formatter = self._get_value_formatter(v) try: formatted_list = formatter.format_tui(v) except: texts.append(Text("{0}: {1}".format( k.ljust(max_length), v))) else: texts.append(Text("{0}:".format(k.ljust(max_length)))) texts = texts + formatted_list return texts
def format_console(self, body): # pragma: no cover urlencoed_list = urlparse.parse_qsl(body) if not urlencoed_list: return "" max_length = max(map(lambda kv: len(kv[0]), urlencoed_list)) texts = [] for k, v in urlencoed_list: formatter = self._get_value_formatter(v) try: formatted_content = formatter.format_console(v) except: texts.append("{0}: {1}".format( k.ljust(max_length), v)) else: texts.append("{0}:".format(k.ljust(max_length))) texts.append(formatted_content) return u"\n".join(texts)
def walkNextPages(sess, url="https://iptime.com/iptime/?page_id=126&dffid=1&dfsid=11"): try: from os.path import basename def get_pageid(url): from urllib.parse import parse_qsl, urlsplit qs = dict(parse_qsl(urlsplit(url).query)) return int(qs.get("pageid", "1")) while True: pageid = get_pageid(url) print("pageid=%d" % pageid) walkListItems(sess, url) root = html.fromstring(sess.get(url=url).text) arrows = [basename(_) for _ in root.xpath(".//ul[@class='pages']//img/@src")] if 'next_1.gif' not in arrows: break nexturl = next(_ for _ in root.xpath(".//ul[@class='pages']//img") if basename(_.attrib['src']) == 'next_1.gif') url = urljoin(url, nexturl.xpath("../../a/@href")[0]) nextpageid = get_pageid(url) assert nextpageid == pageid+1 except BaseException as ex: traceback.print_exc() print(ex)
def _post(self, cmd, data={}): assert cmd in CMD_LIST assert self.user_id data.update({ 'cmd': cmd, 'userid': self.user_id, }) resp = requests.post(REST_URL, data=data) success = False error = None content = {} if resp.status_code == 200: data = dict(parse_qsl(resp.text)) if data['state'] == '1': success = True elif 'errorMessage' in data: error = data['errorMessage'] content = Struct(**{k: v for k, v in data.items() if k not in ('state', 'errorMessage')}) result = PayAppInternalResult(success=success, error=error, content=content) return result
def verify_callback(self, params): """??? ?? ????? ?????. :param params: `dict` ?? ?? ??? `str`, POST ??? """ if type(params) is str: params = Struct(**dict(parse_qsl(params))) elif type(params) is dict: params = Struct(**params) valid = True try: assert self.user_id == params.userid, u'??? ?? ???? ???? ????.' assert self.link_key == params.linkkey, u'?? KEY? ???? ????.' assert self.link_value == params.linkval, u'' except (AssertionError, AttributeError) as e: valid = False print(e) return valid
def test_adds_other_supplied_values_as_query_string(): app = Sanic('passes') @app.route(COMPLEX_PARAM_URL) def passes(): return text('this should pass') new_kwargs = dict(PASSING_KWARGS) new_kwargs['added_value_one'] = 'one' new_kwargs['added_value_two'] = 'two' url = app.url_for('passes', **new_kwargs) query = dict(parse_qsl(urlsplit(url).query)) assert query['added_value_one'] == 'one' assert query['added_value_two'] == 'two'
def merge_url_qs(url: str, **kw) -> str: """Merge the query string elements of a URL with the ones in ``kw``. If any query string element exists in ``url`` that also exists in ``kw``, replace it. :param url: An URL. :param kw: Dictionary with keyword arguments. :return: An URL with keyword arguments merged into the query string. """ segments = urlsplit(url) extra_qs = [ (k, v) for (k, v) in parse_qsl(segments.query, keep_blank_values=1) if k not in kw ] qs = urlencode(sorted(kw.items())) if extra_qs: qs += '&' + urlencode(extra_qs) return urlunsplit((segments.scheme, segments.netloc, segments.path, qs, segments.fragment))
def serialize(url='', data={}): """ Returns a URL with a query string of the given data. """ p = urlparse.urlsplit(url) q = urlparse.parse_qsl(p.query) q.extend((b(k), b(v)) for k, v in sorted(data.items())) q = urlencode(q, doseq=True) p = p.scheme, p.netloc, p.path, q, p.fragment s = urlparse.urlunsplit(p) s = s.lstrip('?') return s # print(serialize('http://www.google.com', {'q': 'cats'})) # http://www.google.com?q=cats #---- REQUESTS & STREAMS -------------------------------------------------------------------------- # The download(url) function returns the HTML (JSON, image data, ...) at the given url. # If this fails it will raise NotFound (404), Forbidden (403) or TooManyRequests (420).
def add_params(url, **params): """ Append or replace query params in `url` using `params`. """ url = parse.unquote(url) parsed = parse.urlparse(url) existing = dict(parse.parse_qsl(parsed.query)) existing.update(params) return parse.urlunparse( parse.ParseResult( scheme=parsed.scheme, netloc=parsed.netloc, path=parsed.path, params=parsed.params, query=parse.urlencode(existing), fragment=parsed.fragment ) )
def pagination(context, page, pagination_id=None): request = context['request'] url = request.path qs = parse_qsl(request.META['QUERY_STRING']) new_qs = [(key, value) for key, value in qs if key != 'page'] if new_qs: sep = '&' url += urlencode(new_qs) else: sep = '?' return { 'page': page, 'paginator': page.paginator, 'pagination_id': pagination_id, 'url': url, 'sep': sep }
def login(self, request, context): """Attempt to create a new session with provided credentials.""" self.session_timeout() attempted_credentials = dict(parse_qsl(request.text)) assert attempted_credentials['_eventName'] == 'loginAjax' for u in self.users_known: if u.username == attempted_credentials['username'] and u.provider == attempted_credentials['authMethod']: if u.password == attempted_credentials['password']: mockserverlogger.info("Successful authentication as {u.username} = {u.userId}".format(u=u)) context.status_code = 200 sessionId = self.sessions_known.append(Session(u.userId, 1800, time.time())) context.cookies.set('JSESSIONID', sessionId) return json.dumps({"succ": True}) else: # wrong password, bail out mockserverlogger.warning("Correct username but invalid password (which is OK to say, because this is a mock)") break context.status_code = 401 return json.dumps({"errorMessage": "Invalid username or password.", "errorCode": "FIELD_ERROR"})
def patch_qs(url: Text, data: Dict[Text, Text]) -> Text: """ Given an URL, change the query string to include the values specified in the dictionary. If the keys of the dictionary can be found in the query string of the URL, then they will be removed. It is guaranteed that all other values of the query string will keep their order. """ qs_id = 4 p = list(urlparse(url)) qs = parse_qsl(p[qs_id]) # type: List[Tuple[Text, Text]] patched_qs = list(chain( filter(lambda x: x[0] not in data, qs), data.items(), )) p[qs_id] = urlencode(patched_qs) return urlunparse(p)
def getAuthorizeURL(self): client = oauth.Client(self.consumer) #Get the request token resp, content = client.request(Splitwise.REQUEST_TOKEN_URL, "POST") if Splitwise.isDebug(): print(resp, content) #Check if the response is correct if resp['status'] != '200': raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status']) request_token = dict(parse_qsl(content.decode("utf-8"))) return "%s?oauth_token=%s" % (Splitwise.AUTHORIZE_URL, request_token['oauth_token']), request_token['oauth_token_secret']
def getAccessToken(self,oauth_token,oauth_token_secret,oauth_verifier): token = oauth.Token(oauth_token,oauth_token_secret) token.set_verifier(oauth_verifier) client = oauth.Client(self.consumer, token) resp, content = client.request(Splitwise.ACCESS_TOKEN_URL, "POST") if Splitwise.isDebug(): print(resp, content) #Check if the response is correct if resp['status'] != '200': raise Exception("Invalid response %s. Please check your consumer key and secret." % resp['status']) access_token = dict(parse_qsl(content.decode("utf-8"))) return access_token
def get_authorize_url(self): """ Returns the Authorize URL as returned by QB, and specified by OAuth 1.0a. :return URI: """ if self.qbService is None: self.set_up_service() response = self.qbService.get_raw_request_token( params={'oauth_callback': self.callback_url}) oauth_resp = dict(parse_qsl(response.text)) self.request_token = oauth_resp['oauth_token'] self.request_token_secret = oauth_resp['oauth_token_secret'] return self.qbService.get_authorize_url(self.request_token)
def test_adds_other_supplied_values_as_query_string(): app = Mach9('passes') @app.route(COMPLEX_PARAM_URL) def passes(): return text('this should pass') new_kwargs = dict(PASSING_KWARGS) new_kwargs['added_value_one'] = 'one' new_kwargs['added_value_two'] = 'two' url = app.url_for('passes', **new_kwargs) query = dict(parse_qsl(urlsplit(url).query)) assert query['added_value_one'] == 'one' assert query['added_value_two'] == 'two'
def url_etl(url): params_new = {} u = urlparse(url) path_new = re.sub(r"\d+", "N", u.path) query = unquote(u.query) # ??????????id if not query: url_new = urlunparse((u.scheme, u.netloc, path_new, "", "", "")) return url_new params = parse_qsl(query, True) for k, v in params: # ?????? if k in ["_", "timestamp"]: continue # ???????? if v: params_new[k] = etl(v) query_new = urlencode(params_new) url_new = urlunparse((u.scheme, u.netloc, path_new, u.params, query_new, u.fragment)) return url_new
def handle_callback(self): params = dict(parse_qsl(urlparse(self.path).query)) payload = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'code': quote(params['code']), 'grant_type': 'authorization_code', 'redirect_uri': self.callback_url, } response = requests.post(self.oauth2_token_url, json=payload) if response.status_code != 200: self.send_response(response.status_code) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(response.content) else: self.send_response(response.status_code) self.send_header('Content-type', 'application/json') self.end_headers() refresh_token = response.json()['refresh_token'] self.wfile.write(bytes('refresh_token: ' + refresh_token, 'utf8')) return
def serialize_sample(self, sample): sample = OrderedDict(parse_qsl(sample)) if self.pb_param not in sample: return False if 'token' in sample: del sample['token'] if 'callback' in sample: sample['callback'] = '' return sample
def parse_qs(self, sample): pb = match('(?:&?\d[^&=]+)*', sample) return OrderedDict([(pb.group(0), ''), *parse_qsl(sample[pb.end() + 1:], True)])
def get_url_query(url): parsed_url = urlparse(url) url_query = parse_qsl(parsed_url.fragment) # login_response_url_query can have multiple key url_query = dict(url_query) return url_query
def read(self): with open(self.filename, 'rb') as image_file: image = Image.open(image_file) image.load() self.codes = scan_codes('qrcode', image) self.remove() if self.codes: otpauth_url = self.codes[0].decode() self.codes = dict(parse_qsl(urlparse(otpauth_url)[4])) return self.codes else: logging.error("Invalid QR image") return None
def build_path(self, method, url): """ Build a unique filename from method & url """ # Build directory to request out = urlparse(url) parts = [ '{}_{}'.format(out.scheme, out.hostname), ] parts += filter(None, out.path.split('/')) directory = os.path.join(MOCKS_DIR, *parts) # Build sorted query filename query = sorted(parse_qsl(out.query)) query = ['{}={}'.format(k, v.replace('/', '_')) for k, v in query] query_str = '_'.join(query) # Use hashes to avoid too long names if len(query_str) > 150: query_str = '{}_{}'.format(query_str[0:100], hashlib.md5(query_str.encode('utf-8')).hexdigest()) filename = '{}_{}.gz'.format(method, query_str) # Build directory if not os.path.isdir(directory): try: os.makedirs(directory) except Exception as e: logger.error('Concurrency error when building directories: {}'.format(e)) return os.path.join(directory, filename)
def _has_strict_url_match(self, url, other): url_parsed = urlparse(url) other_parsed = urlparse(other) if url_parsed[:3] != other_parsed[:3]: return False url_qsl = sorted(parse_qsl(url_parsed.query)) other_qsl = sorted(parse_qsl(other_parsed.query)) return url_qsl == other_qsl
def __call__(self, environ, start_response): self.requests.append(( environ.get('PATH_INFO'), urlparse.parse_qsl(environ.get('QUERY_STRING')))) start_response('%s OK' % self.response_code, self.headers) return [self.response_data]
def _update_url_query_param(url, query_params): url_parts = parse.urlparse(url) old_qs_args = dict(parse.parse_qsl(url_parts[4])) old_qs_args.update(query_params) new_qs = parse.urlencode(old_qs_args) return parse.urlunparse( list(url_parts[0:4]) + [new_qs] + list(url_parts[5:]))
def test_error_url_should_contain_state_from_authentication_request(self): authn_params = {'redirect_uri': 'test_redirect_uri', 'response_type': 'code', 'state': 'test_state'} authn_req = AuthorizationRequest().from_dict(authn_params) error_url = InvalidAuthenticationRequest('test', authn_req, 'invalid_request').to_error_url() error = dict(parse_qsl(urlparse(error_url).query)) assert error['state'] == authn_params['state']
def _verify_client_authentication(self, request_body, http_headers=None): # type (str, Optional[Mapping[str, str]] -> Mapping[str, str] """ Verifies the client authentication. :param request_body: urlencoded token request :param http_headers: :return: The parsed request body. """ if http_headers is None: http_headers = {} token_request = dict(parse_qsl(request_body)) token_request['client_id'] = verify_client_authentication(self.clients, token_request, http_headers.get('Authorization')) return token_request
def handle_userinfo_request(self, request=None, http_headers=None): # type: (Optional[str], Optional[Mapping[str, str]]) -> oic.oic.message.OpenIDSchema """ Handles a userinfo request. :param request: urlencoded request (either query string or POST body) :param http_headers: http headers """ if http_headers is None: http_headers = {} userinfo_request = dict(parse_qsl(request)) bearer_token = extract_bearer_token_from_http_request(userinfo_request, http_headers.get('Authorization')) introspection = self.authz_state.introspect_access_token(bearer_token) if not introspection['active']: raise InvalidAccessToken('The access token has expired') scope = introspection['scope'] user_id = self.authz_state.get_user_id_for_subject_identifier(introspection['sub']) requested_claims = scope2claims(scope.split()) authentication_request = self.authz_state.get_authorization_request_for_access_token(bearer_token) requested_claims.update(self._get_requested_claims_in(authentication_request, 'userinfo')) user_claims = self.userinfo.get_claims_for(user_id, requested_claims) user_claims.setdefault('sub', introspection['sub']) response = OpenIDSchema(**user_claims) logger.debug('userinfo=%s from requested_claims=%s userinfo=%s', response, requested_claims, user_claims) return response
def url(self): url_parts = list(urlparse.urlparse(self._base_url + self._path)) query = dict(urlparse.parse_qsl(url_parts[4])) query.update(self._query) url_parts[4] = urlencode(query) return urlparse.urlunparse(url_parts)
def _store(self, store_path, method, url, result): if not HAS_AIOFILES: # pragma: no cover return url_parts = list(urlparse.urlparse(url)) filename = 'index.json' if url_parts[4] != '': # pragma: no coverage query = OrderedDict( sorted(dict(urlparse.parse_qsl(url_parts[4])).items()) ) filename += '?%s' % urlencode(query) save_path = path.join( unquote(path.join( store_path, url_parts[2][1:], method.lower(), )), filename ) try: os.makedirs(path.dirname(save_path), 0o755) except OSError: # pragma: no cover # Dir exists. pass self.logger.debug('Saving: %s, len: %d' % (save_path, len(result))) async with aiofiles.open(save_path, mode='w', loop=self._loop) as f: await f.write(result)
def parse_oauth_response(data): """Parses the data string as OAuth response, returning it as a dict. The keys and values of the dictionary will be text strings (i.e. not binary strings). """ if isinstance(data, six.binary_type): data = data.decode('utf-8') qsl = urllib_parse.parse_qsl(data) resp = {} for key, value in qsl: resp[key] = value return resp
def add_params_to_qs(query, params): """Extend a query with a list of two-tuples.""" if isinstance(params, dict): params = params.items() queryparams = urlparse.parse_qsl(query, keep_blank_values=True) queryparams.extend(params) return urlencode(queryparams)
def uri_query_params(self): if not self.uri_query: return [] return urlparse.parse_qsl(self.uri_query, keep_blank_values=True, strict_parsing=True)
def _build_url(self, base_url, **kwargs): """ ?????????? ? ???????? ?????? GET-?????????? """ urlparts = list(parse.urlparse(base_url)) query = dict(parse.parse_qsl(urlparts[4])) query.update(kwargs) urlparts[4] = parse.urlencode(query) return parse.urlunparse(urlparts)
def get_initial_oauth_tokens(self): url = 'https://api.twitter.com/oauth/request_token' client = oauth2.Client(self.consumer) resp, content = client.request(url, 'GET') if resp.status != 200: raise Error('{} from Twitter'.format(resp.status)) oauth_values = dict(parse_qsl(content.decode('utf-8'))) self.oauth_token = oauth_values.get('oauth_token') self.oauth_token_secret = oauth_values.get('oauth_token_secret') if not self.oauth_token or not self.oauth_token_secret: raise Error('No oauth_token or oauth_token_secret from Twitter') return (self.oauth_token, self.oauth_token_secret,)
def test_path_creation_with_query(self, MockHttpProvider, MockAuthProvider): """ Tests that a path is created with the correct query parameters """ http_provider = msgraph.HttpProvider() auth_provider = msgraph.AuthProvider() client = msgraph.GraphClient("graphurl/", http_provider, auth_provider) request = client.drives["me"].items["root"].children.request(top=3, select="test") query_dict = dict(parse_qsl(urlparse(request.request_url).query)) expected_dict = {"select":"test", "top":"3"} assert all(item in query_dict.items() for item in expected_dict.items())