我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用six.moves.urllib.parse.parse_qsl()。
def sort_url_by_query_keys(url): """A helper function which sorts the keys of the query string of a url. For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10' returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to prevent non-deterministic ordering of the query string causing problems with unit tests. :param url: url which will be ordered by query keys :returns url: url with ordered query keys """ parsed = urlparse.urlparse(url) queries = urlparse.parse_qsl(parsed.query, True) sorted_query = sorted(queries, key=lambda x: x[0]) encoded_sorted_query = urlparse.urlencode(sorted_query, True) url_parts = (parsed.scheme, parsed.netloc, parsed.path, parsed.params, encoded_sorted_query, parsed.fragment) return urlparse.urlunparse(url_parts)
def params(arg): """ Parse `arg` and returns a dict describing the TPDA arguments. Resolution order: - `arg` is a string is parsed as an URL - `arg` has an items() method and is parsed (works for multidicts, ..) - `arg` is a iterable is parsed as a ((name, value), (name, value), ..) """ if isinstance(arg, str): pr = parse.urlparse(arg) if not pr.query: raise exceptions.IncompleteURI return Params(parse.parse_qsl(pr.query)).params() elif hasattr(arg, 'items'): return Params(arg.items()).params() elif hasattr(arg, '__iter__'): return Params(arg).params()
def test_logout(self): end_session_endpoint = 'https://provider.example.com/end_session' post_logout_uri = 'https://client.example.com/post_logout' authn = OIDCAuthentication(self.app, provider_configuration_info={'issuer': ISSUER, 'end_session_endpoint': end_session_endpoint}, client_registration_info={'client_id': 'foo', 'post_logout_redirect_uris': [post_logout_uri]}) id_token = IdToken(**{'sub': 'sub1', 'nonce': 'nonce'}) with self.app.test_request_context('/logout'): flask.session['access_token'] = 'abcde' flask.session['userinfo'] = {'foo': 'bar', 'abc': 'xyz'} flask.session['id_token'] = id_token.to_dict() flask.session['id_token_jwt'] = id_token.to_jwt() end_session_redirect = authn._logout() assert all(k not in flask.session for k in ['access_token', 'userinfo', 'id_token', 'id_token_jwt']) assert end_session_redirect.status_code == 303 assert end_session_redirect.headers['Location'].startswith(end_session_endpoint) parsed_request = dict(parse_qsl(urlparse(end_session_redirect.headers['Location']).query)) assert parsed_request['state'] == flask.session['end_session_state'] assert parsed_request['id_token_hint'] == id_token.to_jwt() assert parsed_request['post_logout_redirect_uri'] == post_logout_uri
def params(self, collapse=True): """Extracts the query parameters from the split urls components. This method will provide back as a dictionary the query parameter names and values that were provided in the url. :param collapse: Boolean, turn on or off collapsing of query values with the same name. Since a url can contain the same query parameter name with different values it may or may not be useful for users to care that this has happened. This parameter when True uses the last value that was given for a given name, while if False it will retain all values provided by associating the query parameter name with a list of values instead of a single (non-list) value. """ if self.query: if collapse: return dict(parse.parse_qsl(self.query)) else: params = {} for (key, value) in parse.parse_qsl(self.query): if key in params: if isinstance(params[key], list): params[key].append(value) else: params[key] = [params[key], value] else: params[key] = value return params else: return {}
def request_token(self): resp = self.oauth_request(self.request_token_url, 'GET') oauth_token = dict(parse.parse_qsl(resp.read().decode())) self.authorize_url = self.authorize_url % (oauth_token['oauth_token'], self.callback) self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']} return self.oauth_token
def access_token(self, oauth_token=None, oauth_verifier=None): self.oauth_token = oauth_token or self.oauth_token args = {'oauth_verifier': oauth_verifier} if oauth_verifier else {} # if callback is oob resp = self.oauth_request(self.access_token_url, 'GET', args) oauth_token = dict(parse.parse_qsl(resp.read().decode())) self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']} return self.oauth_token
def xauth(self, username, password): args = { 'x_auth_username': username, 'x_auth_password': password, 'x_auth_mode': 'client_auth' } resp = self.oauth_request(self.access_token_url, 'GET', args) oauth_token = dict(parse.parse_qsl(resp.read().decode())) return {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
def __init__(self, url): parts = parse.urlparse(url) _query = frozenset(parse.parse_qsl(parts.query)) _path = parse.unquote_plus(parts.path) parts = parts._replace(query=_query, path=_path) self.parts = parts
def _cs_request_with_retries(self, url, method, **kwargs): # Check that certain things are called correctly if method in ['GET', 'DELETE']: assert 'body' not in kwargs elif method == 'PUT': assert 'body' in kwargs # Call the method args = parse.parse_qsl(parse.urlparse(url)[4]) kwargs.update(args) munged_url = url.rsplit('?', 1)[0] munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_') munged_url = munged_url.replace('-', '_') callback = "%s_%s" % (method.lower(), munged_url) if not hasattr(self, callback): raise AssertionError('Called unknown API method: %s %s, ' 'expected fakes method name: %s' % (method, url, callback)) # Note the call self.callstack.append((method, url, kwargs.get('body', None))) status, headers, body = getattr(self, callback)(**kwargs) r = utils.TestResponse({ "status_code": status, "text": body, "headers": headers, }) return r, body # # Quotas #
def _qsl_get_one(qstring, param): """Return one unique param from query string""" args = parse.parse_qsl(qstring) sigvals = [arg[1] for arg in args if arg[0] == param] if len(sigvals) != 1: raise exceptions.InvalidArgument(param) return sigvals[0] # Uses RDAP
def get_s3_uri(self, uri): parsed = urlparse(uri) client = self.session_factory().client('s3') params = dict( Bucket=parsed.netloc, Key=parsed.path[1:]) if parsed.query: params.update(dict(parse_qsl(parsed.query))) result = client.get_object(**params) return result['Body'].read()
def get(self, url, query, cache): payloads_file = (self.tmpdir + "/" + hashlib.sha1(url.encode('utf-8')).hexdigest() + ".json") if (not cache or not os.access(payloads_file, 0) or time.time() - os.stat(payloads_file).st_mtime > 24 * 60 * 60): payloads = [] next_query = query while next_query: log.debug(str(next_query)) result = requests.get(url, params=next_query) payloads += result.json() next_query = None for link in result.headers.get('Link', '').split(','): if 'rel="next"' in link: m = re.search('<(.*)>', link) if m: parsed_url = parse.urlparse(m.group(1)) # append query in case it was not preserved # (gitlab has that problem) next_query = query next_query.update( dict(parse.parse_qsl(parsed_url.query)) ) if cache: with open(payloads_file, 'w') as f: json.dump(payloads, f) else: with open(payloads_file, 'r') as f: payloads = json.load(f) return payloads
def test_authenticatate_with_extra_request_parameters(self): extra_params = {"foo": "bar", "abc": "xyz"} authn = OIDCAuthentication(self.app, provider_configuration_info={'issuer': ISSUER}, client_registration_info={'client_id': 'foo'}, extra_request_args=extra_params) with self.app.test_request_context('/'): a = authn._authenticate() request_params = dict(parse_qsl(urlparse(a.location).query)) assert set(extra_params.items()).issubset(set(request_params.items()))
def test_configure_for_plugin(self): expected_url = "{}/consumers/joe/auth-key" . format (mock_kong_admin_url) responses.add(responses.POST, expected_url, status=201) data = { "key": "123" } response = self.api.configure_for_plugin("joe", "auth-key", data) assert response.status_code == 201 body = parse_qs(responses.calls[0].request.body) body_exactly = parse_qsl(responses.calls[0].request.body) assert body['key'][0] == "123", \ "Expect correct. data to be sent. Got: {}" . format (body_exactly)
def get_auth_params(self, request, action): settings = self.get_settings() ret = dict(settings.get('AUTH_PARAMS', {})) dynamic_auth_params = request.GET.get('auth_params') if dynamic_auth_params: ret.update(dict(parse_qsl(dynamic_auth_params))) return ret
def expand_redirect_url(starting_url, key, bucket): """ Add key and bucket parameters to starting URL query string. """ parsed = urlparse.urlparse(starting_url) query = collections.OrderedDict(urlparse.parse_qsl(parsed.query)) query.update([('key', key), ('bucket', bucket)]) redirect_url = urlparse.urlunparse(( parsed.scheme, parsed.netloc, parsed.path, parsed.params, urlparse.urlencode(query), None)) return redirect_url
def request_access_token(self, *args, **kwargs): response = self.request(*args, **kwargs) return dict(parse_qsl(response.text))
def __call__(self, env, start_response): status = '200 OK' params = dict(parse_qsl(env.get('QUERY_STRING'))) ws = env.get('wsgi.websocket') if ws and not params.get('ignore_ws'): msg = 'WS Request Url: ' + env.get('REQUEST_URI', '') msg += ' Echo: ' + ws.receive() ws.send(msg) return [] result = 'Requested Url: ' + env.get('REQUEST_URI', '') if env['REQUEST_METHOD'] == 'POST': result += ' Post Data: ' + env['wsgi.input'].read(int(env['CONTENT_LENGTH'])).decode('utf-8') if params.get('addproxyhost') == 'true': result += ' Proxy Host: ' + env.get('wsgiprox.proxy_host', '') result = result.encode('iso-8859-1') if params.get('chunked') == 'true': headers = [] else: headers = [('Content-Length', str(len(result)))] write = start_response(status, headers) if params.get('write') == 'true': write(result) return iter([]) else: return iter([result]) # ============================================================================
def client_request(self, client, method, url, **kwargs): # Check that certain things are called correctly if method in ["GET", "DELETE"]: assert "json" not in kwargs # Note the call self.callstack.append( (method, url, kwargs.get("headers") or {}, kwargs.get("json") or kwargs.get("data"))) try: fixture = self.fixtures[url][method] except KeyError: pass else: return TestResponse({"headers": fixture[0], "text": fixture[1]}) # Call the method args = parse.parse_qsl(parse.urlparse(url)[4]) kwargs.update(args) munged_url = url.rsplit('?', 1)[0] munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_') munged_url = munged_url.replace('-', '_') callback = "%s_%s" % (method.lower(), munged_url) if not hasattr(self, callback): raise AssertionError('Called unknown API method: %s %s, ' 'expected fakes method name: %s' % (method, url, callback)) resp = getattr(self, callback)(**kwargs) if len(resp) == 3: status, headers, body = resp else: status, body = resp headers = {} self.last_request_id = headers.get('x-openstack-request-id', 'req-test') return TestResponse({ "status_code": status, "text": body, "headers": headers, })
def client_request(self, client, method, url, **kwargs): # Check that certain things are called correctly if method in ["GET", "DELETE"]: assert "json" not in kwargs # Note the call self.callstack.append( (method, url, kwargs.get("headers") or {}, kwargs.get("json") or kwargs.get("data"))) try: fixture = self.fixtures[url][method] except KeyError: pass else: return TestResponse({"headers": fixture[0], "text": fixture[1]}) # Call the method args = parse.parse_qsl(parse.urlparse(url)[4]) kwargs.update(args) munged_url = url.rsplit('?', 1)[0] munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_') munged_url = munged_url.replace('-', '_') callback = "%s_%s" % (method.lower(), munged_url) if not hasattr(self, callback): raise AssertionError('Called unknown API method: %s %s, ' 'expected fakes method name: %s' % (method, url, callback)) resp = getattr(self, callback)(**kwargs) if len(resp) == 3: status, headers, body = resp else: status, body = resp headers = {} return TestResponse({ "status_code": status, "text": body, "headers": headers, })
def temp_url(self, lifetime=300, method='GET', inline=True, filename=None): """Obtains a temporary URL to an object. Args: lifetime (int): The time (in seconds) the temporary URL will be valid method (str): The HTTP method that can be used on the temporary URL inline (bool, default True): If False, URL will have a Content-Disposition header that causes browser to download as attachment. filename (str, optional): A urlencoded filename to use for attachment, otherwise defaults to object name """ global_options = settings.get()['swift'] auth_url = global_options.get('auth_url') temp_url_key = global_options.get('temp_url_key') if not self.resource: raise ValueError('can only create temporary URL on object') if not temp_url_key: raise ValueError( 'a temporary url key must be set with settings.update ' 'or by setting the OS_TEMP_URL_KEY environment variable') if not auth_url: raise ValueError( 'an auth url must be set with settings.update ' 'or by setting the OS_AUTH_URL environment variable') obj_path = '/v1/%s' % self[len(self.drive):] # Generate the temp url using swifts helper. Note that this method is ONLY # useful for obtaining the temp_url_sig and the temp_url_expires parameters. # These parameters will be used to construct a properly-escaped temp url obj_url = generate_temp_url(obj_path, lifetime, temp_url_key, method) query_begin = obj_url.rfind('temp_url_sig', 0, len(obj_url)) obj_url_query = obj_url[query_begin:] obj_url_query = dict(parse.parse_qsl(obj_url_query)) query = ['temp_url_sig=%s' % obj_url_query['temp_url_sig'], 'temp_url_expires=%s' % obj_url_query['temp_url_expires']] if inline: query.append('inline') if filename: query.append('filename=%s' % parse.quote(filename)) auth_url_parts = parse.urlparse(auth_url) return parse.urlunparse((auth_url_parts.scheme, auth_url_parts.netloc, parse.quote(obj_path), auth_url_parts.params, '&'.join(query), auth_url_parts.fragment))
def encode_uri(uri): split = list(urlsplit(uri)) split[1] = split[1].encode('idna').decode('ascii') split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii') query = list((q, quote_plus(v.encode('utf-8'))) for (q, v) in parse_qsl(split[3])) split[3] = urlencode(query).decode('ascii') return urlunsplit(split)