我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用cgi.parse_qs()。
def parse_qs(qs, keep_blank_values=0, strict_parsing=0, unquote=unquote): """like cgi.parse_qs, only with custom unquote function""" d = {} items = [s2 for s1 in qs.split("&") for s2 in s1.split(";")] for item in items: try: k, v = item.split("=", 1) except ValueError: if strict_parsing: raise continue if v or keep_blank_values: k = unquote(k.replace("+", " ")) v = unquote(v.replace("+", " ")) if k in d: d[k].append(v) else: d[k] = [v] return d
def get_target(self, target_plugin, path): """ Parses the path, extracts a token, and looks up a target for that token using the token plugin. Sets target_host and target_port if successful """ # The files in targets contain the lines # in the form of token: host:port # Extract the token parameter from url args = parse_qs(urlparse(path)[4]) # 4 is the query from url if not 'token' in args or not len(args['token']): raise self.server.EClose("Token not present") token = args['token'][0].rstrip('\n') result_pair = target_plugin.lookup(token) if result_pair is not None: return result_pair else: raise self.server.EClose("Token '%s' not found" % token)
def do_dispatch(self, body): self.url = self.path pieces = urlparse.urlparse(self.path) self.path = pieces[2] self.query = cgi.parse_qs(pieces[4]) if self.path == "/": self.serve_main(body) elif self.path == "/shutdown.do": self.serve_shutdown(body) elif self.path == "/stop-transcoder.do": self.serve_stop_transcoder(body) elif self.path == "/status.do": self.serve_status(body) elif self.path == "/play.do": self.serve_play(body) elif self.path == "/stream.do": self.serve_stream(body) else: self.send_error(404, "File not found") # do_dispatch()
def application(environ, start_response): queryString = parse_qs(environ['QUERY_STRING'], keep_blank_values=True) body = '' print(queryString) for key, value in queryString.items() : for element in value : body += key + '=' body += element + '\r\n' print(body) status = '200 OK' response_headers = [ ('Content-Type', 'text/plain'), ] start_response(status, response_headers) return [body]
def extend_access_token(self, app_id, app_secret): """ Extends the expiration time of a valid OAuth access token. See <https://developers.facebook.com/roadmap/offline-access-removal/ #extend_token> """ args = { "client_id": app_id, "client_secret": app_secret, "grant_type": "fb_exchange_token", "fb_exchange_token": self.access_token, } response = urllib2.urlopen("https://graph.facebook.com/oauth/" "access_token?" + urllib.parse.urlencode(args)).read().decode('utf-8') query_str = parse_qs(response) if "access_token" in query_str: result = {"accesstoken": query_str["access_token"][0]} if "expires" in query_str: result["expire"] = query_str["expires"][0] return result else: response = json.loads(response) raise GraphAPIError(response)
def get_access_token_from_code(code, redirect_uri, app_id, app_secret): args = { "code": code, "redirect_uri": redirect_uri, "client_id": app_id, "client_secret": app_secret, } # We would use GraphAPI.request() here, except for that the fact # that the response is a key-value pair, and not JSON. response = urllib2.urlopen("https://graph.facebook.com/oauth/access_token" + "?" + urllib.parse.urlencode(args)).read().decode('utf-8') query_str = parse_qs(response) if "access_token" in query_str: result = {"access_token": query_str["access_token"][0]} if "expires" in query_str: result["expires"] = query_str["expires"][0] return result else: jsonResponse = json.loads(str(response)) # response = json.loads(response) encoding = response.info().get_content_charset('utf8') data = json.loads(response.read().decode(encoding)) return data
def parse_qs_bytes(qs, keep_blank_values=False, strict_parsing=False): """Parses a query string like urlparse.parse_qs, but returns the values as byte strings. Keys still become type str (interpreted as latin1 in python3!) because it's too painful to keep them as byte strings in python3 and in practice they're nearly always ascii anyway. """ # This is gross, but python3 doesn't give us another way. # Latin1 is the universal donor of character encodings. result = parse_qs(qs, keep_blank_values, strict_parsing, encoding='latin1', errors='strict') encoded = {} for k,v in result.iteritems(): encoded[k] = [i.encode('latin1') for i in v] return encoded
def _on_request_body(self, data): self._request.body = data content_type = self._request.headers.get("Content-Type", "") if self._request.method == "POST": if content_type.startswith("application/x-www-form-urlencoded"): arguments = cgi.parse_qs(self._request.body) for name, values in arguments.iteritems(): values = [v for v in values if v] if values: self._request.arguments.setdefault(name, []).extend( values) elif content_type.startswith("multipart/form-data"): if 'boundary=' in content_type: boundary = content_type.split('boundary=',1)[1] if boundary: self._parse_mime_body(boundary, data) else: logging.warning("Invalid multipart/form-data") self.request_callback(self._request) # ????
def do_GET(self): parsed_url = urlparse.urlparse(self.path) parameters = cgi.parse_qs(parsed_url.query) #print "parameters=",parameters if parsed_url.path.find("..")!=-1: self.send_response(404) self.end_headers() return if parsed_url.path=="/": return self.serve_root() # /cluster/$cluster_id$ s = parsed_url.path.split("/") if len(s)==3 and s[1]=="cluster": return self.serve_cluster(s[2]) if len(s)==3 and s[1]=="cluster_details": return self.serve_cluster_details(s[2]) self.send_response(404) self.end_headers()
def _decode_payload(cls, body): compressed_payload_str = None if body.startswith(cls.PAYLOAD_KEY_PARAM): payload_key = body[len(cls.PAYLOAD_KEY_PARAM):] payload_entity = _HugeTaskPayload.get(payload_key) compressed_payload_str = payload_entity.payload elif body.startswith(cls.PAYLOAD_PARAM): compressed_payload_str = body[len(cls.PAYLOAD_PARAM):] if compressed_payload_str: payload_str = zlib.decompress(compressed_payload_str) else: payload_str = body result = {} for (name, value) in cgi.parse_qs(payload_str).items(): if len(value) == 1: result[name] = value[0] else: result[name] = value return result
def do_POST(s): method_word = s.path[1:].split("/") length = int(s.headers.getheader('content-length')) postvars = cgi.parse_qs(s.rfile.read(length), keep_blank_values=1) print method_word ret = "" if method_word[0] == "getfreq": ret = [] print postvars["list"][0] wlist = ast.literal_eval(urllib.unquote(postvars["list"][0])) wlist = noDup2(wlist) print wlist for words in wlist: ret.append(get_frequency(words)) s.send_response(200) s.send_header("content-type","application/json") s.send_header("Access-Control-Allow-Origin","*") s.end_headers() s.wfile.write(json.dumps(ret)) s.wfile.close();
def parse_qs(qs, keep_blank_values=0, strict_parsing=0): """ Like C{cgi.parse_qs}, but with support for parsing byte strings on Python 3. @type qs: C{bytes} """ d = {} items = [s2 for s1 in qs.split(b"&") for s2 in s1.split(b";")] for item in items: try: k, v = item.split(b"=", 1) except ValueError: if strict_parsing: raise continue if v or keep_blank_values: k = unquote(k.replace(b"+", b" ")) v = unquote(v.replace(b"+", b" ")) if k in d: d[k].append(v) else: d[k] = [v] return d
def test_007_get_arg(self): # define a new handler that does a get_arg as well as a read_body def new_app(env, start_response): body = bytes_to_str(env['wsgi.input'].read()) a = cgi.parse_qs(body).get('a', [1])[0] start_response('200 OK', [('Content-type', 'text/plain')]) return [six.b('a is %s, body is %s' % (a, body))] self.site.application = new_app sock = eventlet.connect(self.server_addr) request = b'\r\n'.join(( b'POST / HTTP/1.0', b'Host: localhost', b'Content-Length: 3', b'', b'a=a')) sock.sendall(request) # send some junk after the actual request sock.sendall(b'01234567890123456789') result = read_http(sock) self.assertEqual(result.body, b'a is a, body is a=a')
def put_vtable(self, req, **kwargs): shortest_switch = self.shortest_switch_spp #Check request isn't blank. try: request_body_size = int(req.environ.get('CONTENT_LENGTH', 0)) except(ValueError): request_body_size = 0 d = parse_qs(req.body) #Interpret vlan input for key, value in d.items(): host = ('00:00:00:00:00:0'+key[4]) vlan = value[0][4] shortest_switch.set_vtable(host, vlan) body = self.html() return Response(content_type='text/html', body=body)
def put_vtable(self, req, **kwargs): simple_switch = self.simple_switch_spp #Check request is not Blank. try: request_body_size = int(req.environ.get('CONTENT_LENGTH',0)) except(ValueError): request_body_size = 0 d = parse_qs(req.body) #Interpret the vlan list for key, value in d.items(): host = ('00:00:00:00:00:0'+key[4]) vlan = value[0][4] simple_switch.set_vtable(host, vlan) body = self.html() return Response(content_type='text/html', body=body)
def do_POST(s): length = int(s.headers['content-length']) postvars = cgi.parse_qs(s.rfile.read(length), keep_blank_values=1) logging.debug(postvars) try: username = postvars['u'][0] domain = postvars['d'][0] encTimestamp = postvars['t'][0] except: s.send_response(500) s.end_headers() return cracker.enqueueJob(username, domain, encTimestamp, dcept.passwordHit) s.send_response(200) s.end_headers()
def test_form_request_from_response(): # Copied from scrapy tests (test_from_response_submit_not_first_clickable) def _buildresponse(body, **kwargs): kwargs.setdefault('body', body) kwargs.setdefault('url', 'http://example.com') kwargs.setdefault('encoding', 'utf-8') return HtmlResponse(**kwargs) response = _buildresponse( """<form action="get.php" method="GET"> <input type="submit" name="clickable1" value="clicked1"> <input type="hidden" name="one" value="1"> <input type="hidden" name="two" value="3"> <input type="submit" name="clickable2" value="clicked2"> </form>""") req = SplashFormRequest.from_response( response, formdata={'two': '2'}, clickdata={'name': 'clickable2'}) assert req.method == 'GET' assert req.meta['splash']['args']['url'] == req.url fs = cgi.parse_qs(req.url.partition('?')[2], True) assert fs['clickable2'] == ['clicked2'] assert 'clickable1' not in fs assert fs['one'] == ['1'] assert fs['two'] == ['2']
def _parse_token(cls, s): if not s: raise ValueError("Invalid parameter string.") params = parse_qs(s, keep_blank_values=False) if not params: raise ValueError("Invalid parameter string: %r" % s) try: key = params['oauth_token'][0] except Exception: raise ValueError("'oauth_token' not found in OAuth request.") try: secret = params['oauth_token_secret'][0] except Exception: raise ValueError("'oauth_token_secret' not found in " "OAuth request.") return OAuthToken(key, secret) # Don't use this class directly.
def do_POST(self): # http://stackoverflow.com/questions/4233218/python-basehttprequesthandler-post-variables ctype, pdict = cgi.parse_header(self.headers['content-type']) if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers['content-length']) postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1) else: postvars = {} # print(postvars) if 'Username' not in list(postvars.keys()) \ or 'Password' not in list(postvars.keys()): log('E', 'vali.', 'No credentials.') self.exit_on_error('No credentials.') return if not validate_id(postvars['Username'][0], postvars['Password'][0]): log('E', 'vali.', 'Wrong credentials.') self.exit_on_error('Wrong credentials.') return # print(postvars) try: dispatch(postvars) self.write_response({'Status': 'OK'}) except: log('E', 'hand.', 'Handler throws an exception.') self.exit_on_error('Handler throws and exception.')
def get_info_hash(request, multiple=False): """ Get infohashes from a QS. """ if not multiple: return b2a_hex(cgi.parse_qs(request.query_string)['info_hash'][0]) else: hashes = set() for hash in cgi.parse_qs(request.query_string)['info_hash']: hashes.add(b2a_hex(hash)) return hashes
def render_GET(self, request): def finish(data): # Write the data back to our client request.clientproto = clientproto transport.write(data) transport.loseConnection() # Get the referer and parse it referer = request.getHeader('referer') scheme, netloc, path, parameters, query, fragment = urlparse(referer) args = parse_qs(query) transport = request.transport clientproto = request.clientproto # Modify the original request request.uri = referer request.path = path request.args = args request.clientproto = 'HTTP/1.0' # we don't want chunk encoding request.transport = FakeTransport() # Reload the modified request on the server, without using HTTP deferred = request.notifyFinish() request.process() # XXX TODO handle errors deferred.addCallback(lambda _: request.transport.getData() ).addCallback(self.validate, request ).addCallback(finish) return server.NOT_DONE_YET
def from_string(s): """ Returns a token from something like: oauth_token_secret=xxx&oauth_token=xxx """ params = cgi.parse_qs(s, keep_blank_values=False) key = params['oauth_token'][0] secret = params['oauth_token_secret'][0] token = OAuthToken(key, secret) try: token.callback_confirmed = params['oauth_callback_confirmed'][0] except KeyError: pass # 1.0, no callback confirmed. return token
def _split_url_string(param_str): """Turn URL string into parameters.""" parameters = cgi.parse_qs(param_str, keep_blank_values=False) for k, v in parameters.iteritems(): parameters[k] = urllib.unquote(v[0]) return parameters
def callback(ticket, callback_class, options): # NOTE: cgi.parse_qs creates arrays for *all* values options = cgi.parse_qs(options) print "options: ", options # get the callback, if there is one callback = eval("%s()" % callback_class) callback.set_ticket(ticket) callback.set_options(options) callback._execute()
def parseParameters(f): def wrapper(self, *args, **kw): if not args: return f(self, *args, **kw) context = args[0] query = context.get("QUERY_STRING", None) if query: parameters = parse_qs(query) for key in parameters: value = escape(parameters[key][0]) value = value.replace("'", "\"") try: value = json.loads(value) except ValueError: pass context[key] = value return f(self, *args, **kw) return wrapper
def testParseqs(self): self.failUnlessEqual(cgi.parse_qs("a=b&d=c;+=f"), http.parse_qs("a=b&d=c;+=f")) self.failUnlessRaises(ValueError, http.parse_qs, "blah", strict_parsing = 1) self.failUnlessEqual(cgi.parse_qs("a=&b=c", keep_blank_values = 1), http.parse_qs("a=&b=c", keep_blank_values = 1)) self.failUnlessEqual(cgi.parse_qs("a=&b=c"), http.parse_qs("a=&b=c"))
def set_token_string(self, token_string): """Sets the token key and secret from the token string. Args: token_string: str Token string of form oauth_token=[0]&oauth_token_secret=[1]. If oauth_token is not present, self.key will be None. If oauth_token_secret is not present, self.secret will be None. """ token_params = cgi.parse_qs(token_string, keep_blank_values=False) if 'oauth_token' in token_params: self.key = token_params['oauth_token'][0] if 'oauth_token_secret' in token_params: self.secret = token_params['oauth_token_secret'][0]
def from_string(s): params = cgi.parse_qs(s, keep_blank_values=False) key = params['oauth_token'][0] secret = params['oauth_token_secret'][0] return OAuthToken(key, secret)
def _split_url_string(param_str): parameters = cgi.parse_qs(param_str, keep_blank_values=False) for k, v in parameters.iteritems(): parameters[k] = urllib.unquote(v[0]) return parameters
def from_string(data_string): """Deserializes a token from a string like one returned by `to_string()`.""" if not len(data_string): raise ValueError("Invalid parameter string.") params = parse_qs(data_string, keep_blank_values=False) if not len(params): raise ValueError("Invalid parameter string.") try: key = params['oauth_token'][0] except Exception: raise ValueError("'oauth_token' not found in OAuth request.") try: secret = params['oauth_token_secret'][0] except Exception: raise ValueError("'oauth_token_secret' not found in " "OAuth request.") token = YahooToken(key, secret) session_handle = params.get('oauth_session_handle') if session_handle: setattr(token, 'session_handle', session_handle[0]) timestamp = params.get('token_creation_timestamp') if timestamp: setattr(token, 'timestamp', timestamp[0]) try: token.callback_confirmed = params['oauth_callback_confirmed'][0] except KeyError: pass # 1.0, no callback confirmed. return token
def parse_query(query): queries = {} try: queries = cgi.parse_qs(query) except: return q = {} for key, value in queries.items(): q[key] = value[0] q['mode'] = q.get('mode', 'main') return q
def from_string(s): """Deserializes a token from a string like one returned by `to_string()`.""" if not len(s): raise ValueError("Invalid parameter string.") params = parse_qs(s, keep_blank_values=False) if not len(params): raise ValueError("Invalid parameter string.") try: key = params['oauth_token'][0] except Exception: raise ValueError("'oauth_token' not found in OAuth request.") try: secret = params['oauth_token_secret'][0] except Exception: raise ValueError("'oauth_token_secret' not found in " "OAuth request.") token = Token(key, secret) try: token.callback_confirmed = params['oauth_callback_confirmed'][0] except KeyError: pass # 1.0, no callback confirmed. return token
def to_url(self): """Serialize as a URL for a GET request.""" base_url = urlparse.urlparse(self.url) try: query = base_url.query except AttributeError: # must be python <2.5 query = base_url[4] query = parse_qs(query) for k, v in self.items(): query.setdefault(k, []).append(v) try: scheme = base_url.scheme netloc = base_url.netloc path = base_url.path params = base_url.params fragment = base_url.fragment except AttributeError: # must be python <2.5 scheme = base_url[0] netloc = base_url[1] path = base_url[2] params = base_url[3] fragment = base_url[5] url = (scheme, netloc, path, params, urllib.urlencode(query, True), fragment) return urlparse.urlunparse(url)
def _split_url_string(param_str): """Turn URL string into parameters.""" parameters = parse_qs(param_str.encode('utf-8'), keep_blank_values=True) for k, v in parameters.iteritems(): parameters[k] = urllib.unquote(v[0]) return parameters
def test_deprecated_parse_qs(self): # this func is moved to urllib.parse, this is just a sanity check with check_warnings(('cgi.parse_qs is deprecated, use urllib.parse.' 'parse_qs instead', DeprecationWarning)): self.assertEqual({'a': ['A1'], 'B': ['B3'], 'b': ['B2']}, cgi.parse_qs('a=A1&b=B2&B=B3'))
def GET(self): """ The QUERY_STRING parsed into a MultiDict. Keys and values are strings. Multiple values per key are possible. See MultiDict for details. """ if 'bottle.get' not in self.environ: data = parse_qs(self.query_string, keep_blank_values=True) get = self.environ['bottle.get'] = MultiDict() for key, values in data.iteritems(): for value in values: get[key] = value return self.environ['bottle.get']
def test_deprecated_parse_qs(self): # this func is moved to urlparse, this is just a sanity check with check_warnings(('cgi.parse_qs is deprecated, use urlparse.' 'parse_qs instead', PendingDeprecationWarning)): self.assertEqual({'a': ['A1'], 'B': ['B3'], 'b': ['B2']}, cgi.parse_qs('a=A1&b=B2&B=B3'))
def qs(s): if isinstance(s, six.binary_type): s = s.decode('utf8') blob = cgi.parse_qs(s) if len(blob['data']) != 1: pytest.fail('found multi-item data: %s' % blob['data']) json_bytes = base64.b64decode(blob['data'][0]) blob['data'] = json.loads(json_bytes.decode('utf8')) return blob
def _assertRequested(self, expect_url, expect_data): mock_response = Mock() mock_response.read.return_value = six.b('{"status":1, "error": null}') with patch('six.moves.urllib.request.urlopen', return_value=mock_response) as urlopen: yield assert urlopen.call_count == 1 ((request,), _) = urlopen.call_args assert request.get_full_url() == expect_url data = urllib.parse.parse_qs(request.data.decode('utf8')) assert len(data['data']) == 1 payload_encoded = data['data'][0] payload_json = base64.b64decode(payload_encoded).decode('utf8') payload = json.loads(payload_json) assert payload == expect_data
def _parse_POST(self): import cgi ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.getheader('content-length')) postvars = cgi.parse_qs( self.rfile.read(length), keep_blank_values=1) else: postvars = {} return postvars # -------------- POST handler: where the magic happens --------------
def hello_world(environ, start_response): parameters = parse_qs(environ.get('QUERY_STRING', '')) if 'subject' in parameters: subject = escape(parameters['subject'][0]) else: subject = 'World' start_response('200 OK', [('Content-Type', 'text/html')]) return ['''Hello {subject!s} Hello {subject!s}! '''.format(**{'subject': subject})]
def query(self): ''' The :attr:`query_string` parsed into a :class:`FormsDict`. These values are sometimes called "URL arguments" or "GET parameters", but not to be confused with "URL wildcards" as they are provided by the :class:`Router`. ''' data = parse_qs(self.query_string, keep_blank_values=True) get = self.environ['bottle.get'] = FormsDict() for key, values in data.iteritems(): for value in values: get[key] = value return get