我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cgi.parse_qsl()。
def do_GET(s): """Handle a GET request. Parses the query parameters and prints a message if the flow has completed. Note that we can't detect if an error occurred. """ s.send_response(200) s.send_header("Content-type", "text/html") s.end_headers() query = s.path.split('?', 1)[-1] query = dict(parse_qsl(query)) s.server.query_params = query s.wfile.write("<html><head><title>Authentication Status</title></head>") s.wfile.write("<body><p>The authentication flow has completed.</p>") s.wfile.write("</body></html>")
def _add_query_parameter(url, name, value): """Adds a query parameter to a url. Replaces the current value if it already exists in the URL. Args: url: string, url to add the query parameter to. name: string, query parameter name. value: string, query parameter value. Returns: Updated query parameter. Does not update the url if value is None. """ if value is None: return url else: parsed = list(urlparse.urlparse(url)) q = dict(parse_qsl(parsed[4])) q[name] = value parsed[4] = urllib.urlencode(q) return urlparse.urlunparse(parsed)
def handleRequest(self, env, start_response): path = env["PATH_INFO"] if env.get("QUERY_STRING"): get = dict(cgi.parse_qsl(env['QUERY_STRING'])) else: get = {} ui_request = UiRequest(self, get, env, start_response) if config.debug: # Let the exception catched by werkezung return ui_request.route(path) else: # Catch and display the error try: return ui_request.route(path) except Exception, err: logging.debug("UiRequest error: %s" % Debug.formatException(err)) return ui_request.error500("Err: %s" % Debug.formatException(err)) # Reload the UiRequest class to prevent restarts in debug mode
def parse_backend_uri(backend_uri): """ Converts the "backend_uri" into a cache scheme ('db', 'memcached', etc), a host and any extra params that are required for the backend. Returns a (scheme, host, params) tuple. """ if backend_uri.find(':') == -1: raise InvalidCacheBackendError("Backend URI must start with scheme://") scheme, rest = backend_uri.split(':', 1) if not rest.startswith('//'): raise InvalidCacheBackendError("Backend URI must start with scheme://") host = rest[2:] qpos = rest.find('?') if qpos != -1: params = dict(parse_qsl(rest[qpos+1:])) host = rest[2:qpos] else: params = {} if host.endswith('/'): host = host[:-1] return scheme, host, params
def _update_query_params(uri, params): """Updates a URI with new query parameters. Args: uri: string, A valid URI, with potential existing query parameters. params: dict, A dictionary of query parameters. Returns: The same URI but with the new query parameters added. """ parts = list(urlparse.urlparse(uri)) query_params = dict(parse_qsl(parts[4])) # 4 is the index of the query part query_params.update(params) parts[4] = urllib.urlencode(query_params) return urlparse.urlunparse(parts)
def _parse_exchange_token_response(content): """Parses response of an exchange token request. Most providers return JSON but some (e.g. Facebook) return a url-encoded string. Args: content: The body of a response Returns: Content as a dictionary object. Note that the dict could be empty, i.e. {}. That basically indicates a failure. """ resp = {} try: resp = simplejson.loads(content) except StandardError: # different JSON libs raise different exceptions, # so we just do a catch-all here resp = dict(parse_qsl(content)) # some providers respond with 'expires', others with 'expires_in' if resp and 'expires' in resp: resp['expires_in'] = resp.pop('expires') return resp
def postReformat(self, postdata): return urllib.urlencode(cgi.parse_qsl(postdata))
def querystringReformat(self, qsdata): temp = qsdata.split("?") if len(temp) == 2: return temp[0] + "?" + urllib.urlencode(cgi.parse_qsl(temp[1])) else: return qsdata
def test_y_token_to_string(self): token = yql.YahooToken('test-key', 'test-secret') token_to_string = token.to_string() string_data = dict(parse_qsl(token_to_string)) self.assertEqual(string_data.get('oauth_token'), 'test-key') self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret')
def test_y_token_to_string2(self): token = yql.YahooToken('test-key', 'test-secret') token.timestamp = '1111' token.session_handle = 'poop' token.callback_confirmed = 'basilfawlty' token_to_string = token.to_string() string_data = dict(parse_qsl(token_to_string)) self.assertEqual(string_data.get('oauth_token'), 'test-key') self.assertEqual(string_data.get('oauth_token_secret'), 'test-secret') self.assertEqual(string_data.get('token_creation_timestamp'), '1111') self.assertEqual(string_data.get('oauth_callback_confirmed'), 'basilfawlty') self.assertEqual(string_data.get('oauth_session_handle'), 'poop')
def test_request_for_two_legged(self): query = 'SELECT * from foo' y = TestTwoLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http()) signed_url = y.execute(query) qs = dict(parse_qsl(signed_url.split('?')[1])) self.assertEqual(qs['q'], query) self.assertEqual(qs['format'], 'json')
def test_request_for_three_legged(self): query = 'SELECT * from foo' y = TestThreeLegged('test-api-key', 'test-secret', httplib2_inst=httplib2.Http()) token = oauth.Token.from_string( 'oauth_token=foo&oauth_token_secret=bar') signed_url = y.execute(query, token=token) qs = dict(parse_qsl(signed_url.split('?')[1])) self.assertEqual(qs['q'], query) self.assertEqual(qs['format'], 'json')
def query_params(self): """The query parameters of the uri used to call the YQL API""" if self.uri: q_string = urlparse(self.uri)[4] return dict(parse_qsl(q_string)) else: return {}
def getPosted(self): if self.env['REQUEST_METHOD'] == "POST": return dict(cgi.parse_qsl( self.env['wsgi.input'].readline().decode() )) else: return {} # Return: <dict> Cookies based on self.env
def getCookies(self): raw_cookies = self.env.get('HTTP_COOKIE') if raw_cookies: cookies = cgi.parse_qsl(raw_cookies) return {key.strip(): val for key, val in cookies} else: return {}
def step1_get_authorize_url(self, redirect_uri=None): """Returns a URI to redirect to the provider. Args: redirect_uri: string, Either the string 'urn:ietf:wg:oauth:2.0:oob' for a non-web-based application, or a URI that handles the callback from the authorization server. This parameter is deprecated, please move to passing the redirect_uri in via the constructor. Returns: A URI as a string to redirect the user to begin the authorization flow. """ if redirect_uri is not None: logger.warning(('The redirect_uri parameter for' 'OAuth2WebServerFlow.step1_get_authorize_url is deprecated. Please' 'move to passing the redirect_uri in via the constructor.')) self.redirect_uri = redirect_uri if self.redirect_uri is None: raise ValueError('The value of redirect_uri must not be None.') query = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': self.redirect_uri, 'scope': self.scope, } query.update(self.params) parts = list(urlparse.urlparse(self.auth_uri)) query.update(dict(parse_qsl(parts[4]))) # 4 is the index of the query part parts[4] = urllib.urlencode(query) return urlparse.urlunparse(parts)
def extract_params(self): """Returns the parameters for this task. Returns: A dictionary of strings mapping parameter names to their values as strings. If the same name parameter has several values then the value will be a list of strings. For POST and PULL requests then the parameters are extracted from the task payload. For all other methods, the parameters are extracted from the url query string. An empty dictionary is returned if the task contains an empty payload or query string. Raises: ValueError: if the payload does not contain valid 'application/x-www-form-urlencoded' data (for POST and PULL) or the url does not contain a valid query (all other methods). """ if self.__method in ('PULL', 'POST'): query = self.__payload else: query = urlparse.urlparse(self.__relative_url).query p = {} if not query: return p for key, value in cgi.parse_qsl( query, keep_blank_values=True, strict_parsing=True): p.setdefault(key, []).append(value) for key, value in p.items(): if len(value) == 1: p[key] = value[0] return p
def _verifyReturnToArgs(query): """Verify that the arguments in the return_to URL are present in this response. """ message = Message.fromPostArgs(query) return_to = message.getArg(OPENID_NS, 'return_to') if return_to is None: raise ProtocolError('Response has no return_to') parsed_url = urlparse(return_to) rt_query = parsed_url[4] parsed_args = cgi.parse_qsl(rt_query) for rt_key, rt_value in parsed_args: try: value = query[rt_key] if rt_value != value: format = ("parameter %s value %r does not match " "return_to's value %r") raise ProtocolError(format % (rt_key, value, rt_value)) except KeyError: format = "return_to parameter %s absent from query %r" raise ProtocolError(format % (rt_key, query)) # Make sure all non-OpenID arguments in the response are also # in the signed return_to. bare_args = message.getArgs(BARE_NS) for pair in bare_args.iteritems(): if pair not in parsed_args: raise ProtocolError("Parameter %s not in return_to URL" % (pair[0],))
def test_deprecated_parse_qsl(self): # this func is moved to urllib.parse, this is just a sanity check with check_warnings(('cgi.parse_qsl is deprecated, use urllib.parse.' 'parse_qsl instead', DeprecationWarning)): self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')], cgi.parse_qsl('a=A1&b=B2&B=B3'))
def test_deprecated_parse_qsl(self): # this func is moved to urlparse, this is just a sanity check with check_warnings(('cgi.parse_qsl is deprecated, use urlparse.' 'parse_qsl instead', PendingDeprecationWarning)): self.assertEqual([('a', 'A1'), ('b', 'B2'), ('B', 'B3')], cgi.parse_qsl('a=A1&b=B2&B=B3'))
def request(self, uri, method="GET", body=None, headers=None, redirections=httplib2.DEFAULT_MAX_REDIRECTS, connection_type=None, force_auth_header=False): if not isinstance(headers, dict): headers = {} if body and method == "POST": parameters = dict(parse_qsl(body)) elif method == "GET": parsed = urlparse.urlparse(uri) parameters = parse_qs(parsed.query) else: parameters = None req = Request.from_consumer_and_token(self.consumer, token=self.token, http_method=method, http_url=uri, parameters=parameters) req.sign_request(self.method, self.consumer, self.token) if force_auth_header: # ensure we always send Authorization headers.update(req.to_header()) if method == "POST": if not force_auth_header: body = req.to_postdata() else: body = req.encode_postdata(req.get_nonoauth_parameters()) headers['Content-Type'] = 'application/x-www-form-urlencoded' elif method == "GET": if not force_auth_header: uri = req.to_url() else: if not force_auth_header: # don't call update twice. headers.update(req.to_header()) return httplib2.Http.request(self, uri, method=method, body=body, headers=headers, redirections=redirections, connection_type=connection_type)
def generate_authorize_url(self, redirect_uri='urn:ietf:wg:oauth:2.0:oob', response_type='code', access_type='offline', approval_prompt='auto', **kwargs): """Returns a URI to redirect to the provider. Args: redirect_uri: Either the string 'urn:ietf:wg:oauth:2.0:oob' for a non-web-based application, or a URI that handles the callback from the authorization server. response_type: Either the string 'code' for server-side or native application, or the string 'token' for client-side application. access_type: Either the string 'offline' to request a refresh token or 'online'. approval_prompt: Either the string 'auto' to let the OAuth mechanism determine if the approval page is needed or 'force' if the approval prompt is desired in all cases. If redirect_uri is 'urn:ietf:wg:oauth:2.0:oob' then pass in the generated verification code to get_access_token, otherwise pass in the query parameters received at the callback uri to get_access_token. If the response_type is 'token', no need to call get_access_token as the API will return it within the query parameters received at the callback: oauth2_token.access_token = YOUR_ACCESS_TOKEN """ self.redirect_uri = redirect_uri query = { 'response_type': response_type, 'client_id': self.client_id, 'redirect_uri': redirect_uri, 'scope': self.scope, 'approval_prompt': approval_prompt, 'access_type': access_type } query.update(kwargs) parts = list(urllib.parse.urlparse(self.auth_uri)) query.update(dict(parse_qsl(parts[4]))) # 4 is the index of the query part parts[4] = urllib.parse.urlencode(query) return urllib.parse.urlunparse(parts)