我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用six.moves.urllib.parse.parse_qs()。
def do_GET(self): self.send_response(200) self.send_header('Content-Type', 'text/plain') self.end_headers() parts = urlparse(self.path) query = parse_qs(parts.query) result = {} for k, v in query.items(): if len(v) == 1: result[k] = v[0] elif len(v) == 0: result[k] = None else: result[k] = v self.server.callback_result = result self.wfile.write(b'You can close this window now')
def _parse_connection_url(url): """Parse connection parameters from a database url. .. note:: HBase Thrift does not support authentication and there is no database name, so we are not looking for these in the url. """ opts = {} result = netutils.urlsplit(url) opts['table_prefix'] = urlparse.parse_qs( result.query).get('table_prefix', [None])[0] opts['table_prefix_separator'] = urlparse.parse_qs( result.query).get('table_prefix_separator', ['_'])[0] opts['dbtype'] = result.scheme if ':' in result.netloc: opts['host'], port = result.netloc.split(':') else: opts['host'] = result.netloc port = 9090 opts['port'] = port and int(port) or 9090 return opts
def get_body(self, request): params = urlparse.parse_qs(request.query_string) if len(request.body) == 0: LOG.debug("Empty body provided in request") return None, params try: content_type = request.get_content_type() except exception.InvalidContentType: LOG.debug("Unrecognized Content-Type provided in request") return None, '' if not content_type: LOG.debug("No Content-Type provided in request") return None, '' return content_type, request.body
def facebook_validation_function(url): try: url_parts = _get_url_parts(url) # check if acceptable domain domain = url_parts[1] if not (domain == 'facebook.com' or domain.endswith('.facebook.com')): return None path = _get_initial_path(url_parts) # old style numeric profiles if path == "profile.php": # ex. https://www.facebook.com/profile.php?id=100010279981469 path = parse_qs(url_parts[3]).get('id')[0] if path == 'people': # ex. https://www.facebook.com/people/John-Doe/100013326345115 path = url_parts[2].strip('/').split('/')[2].lower() # TODO: validate against allowed username characteristics # https://github.com/project-callisto/callisto-core/issues/181 if not path or path == "" or path.endswith( '.php') or path in generic_fb_urls: return None else: return path except ValidationError: return None
def test_authorize_view(self): with self.app.test_client() as client: response = client.get('/oauth2authorize') location = response.headers['Location'] q = urlparse.parse_qs(location.split('?', 1)[1]) state = json.loads(q['state'][0]) self.assertIn(oauth2client.GOOGLE_AUTH_URI, location) self.assertNotIn(self.oauth2.client_secret, location) self.assertIn(self.oauth2.client_id, q['client_id']) self.assertEqual( flask.session['google_oauth2_csrf_token'], state['csrf_token']) self.assertEqual(state['return_url'], '/') with self.app.test_client() as client: response = client.get('/oauth2authorize?return_url=/test') location = response.headers['Location'] q = urlparse.parse_qs(location.split('?', 1)[1]) state = json.loads(q['state'][0]) self.assertEqual(state['return_url'], '/test') with self.app.test_client() as client: response = client.get('/oauth2authorize?extra_param=test') location = response.headers['Location'] self.assertIn('extra_param=test', location)
def _pagination(self, collection, path, **params): if params.get('page_reverse', False): linkrel = 'previous' else: linkrel = 'next' next = True while next: res = self.get(path, params=params) yield res next = False try: for link in res['%s_links' % collection]: if link['rel'] == linkrel: query_str = urlparse.urlparse(link['href']).query params = urlparse.parse_qs(query_str) next = True break except KeyError: break
def test_authorize_view(self): with self.app.test_client() as client: response = client.get('/oauth2authorize') location = response.headers['Location'] q = urlparse.parse_qs(location.split('?', 1)[1]) state = json.loads(q['state'][0]) self.assertIn(GOOGLE_AUTH_URI, location) self.assertNotIn(self.oauth2.client_secret, location) self.assertIn(self.oauth2.client_id, q['client_id']) self.assertEqual( flask.session['google_oauth2_csrf_token'], state['csrf_token']) self.assertEqual(state['return_url'], '/') with self.app.test_client() as client: response = client.get('/oauth2authorize?return_url=/test') location = response.headers['Location'] q = urlparse.parse_qs(location.split('?', 1)[1]) state = json.loads(q['state'][0]) self.assertEqual(state['return_url'], '/test') with self.app.test_client() as client: response = client.get('/oauth2authorize?extra_param=test') location = response.headers['Location'] self.assertIn('extra_param=test', location)
def __init__(self, raw, errors='replace'): """Takes a string of type application/x-www-form-urlencoded. """ # urllib needs bytestrings in py2 and unicode strings in py3 raw_str = raw.encode('ascii') if PY2 else raw self.decoded = _decode(unquote_plus(raw_str), errors=errors) self.raw = raw common_kw = dict(keep_blank_values=True, strict_parsing=False) if PY2: # in python 2 parse_qs does its own unquote_plus'ing ... as_dict = parse_qs(raw_str, **common_kw) # ... but doesn't decode to unicode. for k, vals in list(as_dict.items()): as_dict[_decode(k, errors=errors)] = [ _decode(v, errors=errors) for v in vals ] else: # in python 3 parse_qs does the decoding as_dict = parse_qs(raw_str, errors=errors, **common_kw) Mapping.__init__(self, as_dict)
def update_query_parameters(url, query_parameters): """ Return url with updated query parameters. Arguments: url (str): Original url whose query parameters need to be updated. query_parameters (dict): A dictionary containing query parameters to be added to course selection url. Returns: (slug): slug identifier for the identity provider that can be used for identity verification of users associated the enterprise customer of the given user. """ scheme, netloc, path, query_string, fragment = urlsplit(url) url_params = parse_qs(query_string) # Update url query parameters url_params.update(query_parameters) return urlunsplit( (scheme, netloc, path, urlencode(url_params, doseq=True), fragment), )
def traverse_pagination(response, endpoint): """ Traverse a paginated API response. Extracts and concatenates "results" (list of dict) returned by DRF-powered APIs. Arguments: response (Dict): Current response dict from service API endpoint (slumber Resource object): slumber Resource object from edx-rest-api-client Returns: list of dict. """ results = response.get('results', []) next_page = response.get('next') while next_page: querystring = parse_qs(urlparse(next_page).query, keep_blank_values=True) response = endpoint.get(**querystring) results += response.get('results', []) next_page = response.get('next') return results
def get_msg(hinfo, binding): if binding == BINDING_SOAP: xmlstr = hinfo["data"] elif binding == BINDING_HTTP_POST: _inp = hinfo["data"][3] i = _inp.find(TAG1) i += len(TAG1) + 1 j = _inp.find('"', i) xmlstr = _inp[i:j] else: # BINDING_HTTP_REDIRECT parts = urlparse(hinfo["headers"][0][1]) xmlstr = parse_qs(parts.query)["SAMLRequest"][0] return xmlstr # ------------------------------------------------------------------------
def get_msg(hinfo, binding, response=False): if binding == BINDING_SOAP: msg = hinfo["data"] elif binding == BINDING_HTTP_POST: _inp = hinfo["data"][3] i = _inp.find(TAG1) i += len(TAG1) + 1 j = _inp.find('"', i) msg = _inp[i:j] elif binding == BINDING_HTTP_ARTIFACT: # either by POST or by redirect if hinfo["data"]: _inp = hinfo["data"][3] i = _inp.find(TAG1) i += len(TAG1) + 1 j = _inp.find('"', i) msg = _inp[i:j] else: parts = urlparse(hinfo["url"]) msg = parse_qs(parts.query)["SAMLart"][0] else: # BINDING_HTTP_REDIRECT parts = urlparse(hinfo["headers"][0][1]) msg = parse_qs(parts.query)["SAMLRequest"][0] return msg
def get_msg(hinfo, binding, response=False): if binding == BINDING_SOAP: msg = hinfo["data"] elif binding == BINDING_HTTP_POST: _inp = hinfo["data"][3] i = _inp.find(TAG1) i += len(TAG1) + 1 j = _inp.find('"', i) msg = _inp[i:j] elif binding == BINDING_URI: if response: msg = hinfo["data"] else: msg = "" return parse_qs(hinfo["url"].split("?")[1])["ID"][0] else: # BINDING_HTTP_REDIRECT parts = urlparse(hinfo["headers"][0][1]) msg = parse_qs(parts.query)["SAMLRequest"][0] return msg
def test_parse_faulty_request_to_err_status(self): req_id, authn_request = self.client.create_authn_request( destination="http://www.example.com") binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, "%s" % authn_request, "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split('?')[1]) print(_dict) try: self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) status = None except OtherError as oe: print(oe.args) status = s_utils.error_status_factory(oe) assert status print(status) assert _eq(status.keyswv(), ["status_code", "status_message"]) assert status.status_message.text == 'Not destined for me!' status_code = status.status_code assert _eq(status_code.keyswv(), ["status_code", "value"]) assert status_code.value == samlp.STATUS_RESPONDER assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
def test_parse_ok_request(self): req_id, authn_request = self.client.create_authn_request( message_id="id1", destination="http://localhost:8088/sso") print(authn_request) binding = BINDING_HTTP_REDIRECT htargs = self.client.apply_binding(binding, "%s" % authn_request, "http://www.example.com", "abcd") _dict = parse_qs(htargs["headers"][0][1].split('?')[1]) print(_dict) req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding) # returns a dictionary print(req) resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST]) assert resp_args["destination"] == "http://lingon.catalogix.se:8087/" assert resp_args["in_response_to"] == "id1" name_id_policy = resp_args["name_id_policy"] assert _eq(name_id_policy.keyswv(), ["format", "allow_create"]) assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT assert resp_args[ "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
def parse_discovery_service_response(url="", query="", returnIDParam="entityID"): """ Deal with the response url from a Discovery Service :param url: the url the user was redirected back to or :param query: just the query part of the URL. :param returnIDParam: This is where the identifier of the IdP is place if it was specified in the query. Default is 'entityID' :return: The IdP identifier or "" if none was given """ if url: part = urlparse(url) qsd = parse_qs(part[4]) elif query: qsd = parse_qs(query) else: qsd = {} try: return qsd[returnIDParam][0] except KeyError: return ""
def disco(environ, start_response, _sp): query = parse_qs(environ["QUERY_STRING"]) entity_id = query["entityID"][0] _sid = query["sid"][0] came_from = CACHE.outstanding_queries[_sid] _sso = SSO(_sp, environ, start_response, cache=CACHE, **ARGS) resp = _sso.redirect_to_auth(_sso.sp, entity_id, came_from) # Add cookie kaka = make_cookie("ve_disco", entity_id, "SEED_SAW") resp.headers.append(kaka) return resp(environ, start_response) # ---------------------------------------------------------------------------- # noinspection PyUnusedLocal
def _post_servers(self, request): # type: (requests.PreparedRequest) -> requests.Response data = parse_qs(force_text(request.body)) self.server_id = self._public_id('srv') self.auth_token = ''.join(random.choice(mixed_alphabet) for i in xrange(20)) self.api_version = force_text(request.headers['X-Cloak-API-Version']) self.name = data['name'][0] self.target_id = data['target'][0] # Make sure these exist data['email'][0] data['password'][0] result = { 'server_id': self.server_id, 'auth_token': self.auth_token, 'server': self._server_result(), } return self._response(request, 201, result)
def _post_server_csr(self, request): # type: (requests.PreparedRequest) -> requests.Response data = parse_qs(force_text(request.body)) if self._authenticate(request): self.csr = data['csr'][0] self.pki_tag = ''.join(random.choice(mixed_alphabet) for i in xrange(16)) response = self._response(request, 202) else: response = self._response(request, 401) return response # # Utils #
def forward_request(self, method, path, data, headers): req_data = None if method == 'POST' and path == '/': req_data = urlparse.parse_qs(to_str(data)) action = req_data.get('Action')[0] if req_data: if action == 'CreateChangeSet': return create_change_set(req_data) elif action == 'DescribeChangeSet': return describe_change_set(req_data) elif action == 'ExecuteChangeSet': return execute_change_set(req_data) elif action == 'UpdateStack' and req_data.get('TemplateURL'): # Temporary fix until the moto CF backend can handle TemplateURL (currently fails) url = re.sub(r'https?://s3\.amazonaws\.com', aws_stack.get_local_service_url('s3'), req_data.get('TemplateURL')[0]) req_data['TemplateBody'] = requests.get(url).content modified_data = urlparse.urlencode(req_data, doseq=True) return Request(data=modified_data, headers=headers, method=method) elif action == 'ValidateTemplate': return validate_template(req_data) return True
def oauth_authorization_request(self, token): """ Generates the URL for the authorization link """ if not isinstance(token, dict): token = parse_qs(token) oauth_token = token.get(self.OAUTH_TOKEN_PARAMETER_NAME)[0] state = self.get_or_create_state() base_url = self.setting('MEDIAWIKI_URL') return '{0}?{1}'.format(base_url, urlencode({ 'title': 'Special:Oauth/authenticate', self.OAUTH_TOKEN_PARAMETER_NAME: oauth_token, self.REDIRECT_URI_PARAMETER_NAME: self.get_redirect_uri(state) }))
def access_token(self, token): """ Fetches the Mediawiki access token. """ auth_token = self.oauth_auth(token) response = requests.post( url=self.setting('MEDIAWIKI_URL'), params={'title': 'Special:Oauth/token'}, auth=auth_token ) credentials = parse_qs(response.content) oauth_token_key = credentials.get(b('oauth_token'))[0] oauth_token_secret = credentials.get(b('oauth_token_secret'))[0] oauth_token_key = oauth_token_key.decode() oauth_token_secret = oauth_token_secret.decode() return { 'oauth_token': oauth_token_key, 'oauth_token_secret': oauth_token_secret }
def _process_url(page_from, page_to, url): # Get url page query = urlparse(url).query query = parse_qs(query) page = query.get('page') # Preserve if match if page: page_from = int(page_from) page_to = int(page_to) page = int(page[0]) if page >= page_from and page <= page_to: return url return None
def test_get_servers_with_limit(self): req = self.req('/fake/servers?limit=3') res_dict = self.controller.index(req) servers = res_dict['servers'] self.assertEqual([s['id'] for s in servers], [fakes.get_fake_uuid(i) for i in range(len(servers))]) servers_links = res_dict['servers_links'] self.assertEqual(servers_links[0]['rel'], 'next') href_parts = urlparse.urlparse(servers_links[0]['href']) self.assertEqual('/v2/fake/servers', href_parts.path) params = urlparse.parse_qs(href_parts.query) expected_params = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]} self.assertThat(params, matchers.DictMatches(expected_params))
def test_get_server_details_with_limit(self): req = self.req('/fake/servers/detail?limit=3') res = self.controller.detail(req) servers = res['servers'] self.assertEqual([s['id'] for s in servers], [fakes.get_fake_uuid(i) for i in range(len(servers))]) servers_links = res['servers_links'] self.assertEqual(servers_links[0]['rel'], 'next') href_parts = urlparse.urlparse(servers_links[0]['href']) self.assertEqual('/v2/fake/servers/detail', href_parts.path) params = urlparse.parse_qs(href_parts.query) expected = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]} self.assertThat(params, matchers.DictMatches(expected))
def test_get_server_details_with_limit_and_other_params(self): req = self.req('/fake/servers/detail' '?limit=3&blah=2:t' '&sort_key=id1&sort_dir=asc') res = self.controller.detail(req) servers = res['servers'] self.assertEqual([s['id'] for s in servers], [fakes.get_fake_uuid(i) for i in range(len(servers))]) servers_links = res['servers_links'] self.assertEqual(servers_links[0]['rel'], 'next') href_parts = urlparse.urlparse(servers_links[0]['href']) self.assertEqual('/v2/fake/servers/detail', href_parts.path) params = urlparse.parse_qs(href_parts.query) expected = {'limit': ['3'], 'blah': ['2:t'], 'sort_key': ['id1'], 'sort_dir': ['asc'], 'marker': [fakes.get_fake_uuid(2)]} self.assertThat(params, matchers.DictMatches(expected))
def test_get_servers_with_limit(self): req = fakes.HTTPRequest.blank('/fake/servers?limit=3') res_dict = self.controller.index(req) servers = res_dict['servers'] self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))], [s['id'] for s in servers]) servers_links = res_dict['servers_links'] self.assertEqual('next', servers_links[0]['rel']) href_parts = urlparse.urlparse(servers_links[0]['href']) self.assertEqual('/v2/fake/servers', href_parts.path) params = urlparse.parse_qs(href_parts.query) expected_params = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]} self.assertThat(params, matchers.DictMatches(expected_params))
def test_get_server_details_with_limit(self): req = fakes.HTTPRequest.blank('/fake/servers/detail?limit=3') res = self.controller.detail(req) servers = res['servers'] self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))], [s['id'] for s in servers]) servers_links = res['servers_links'] self.assertEqual('next', servers_links[0]['rel']) href_parts = urlparse.urlparse(servers_links[0]['href']) self.assertEqual('/v2/fake/servers/detail', href_parts.path) params = urlparse.parse_qs(href_parts.query) expected = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]} self.assertThat(params, matchers.DictMatches(expected))
def test_get_server_details_with_limit_and_other_params(self): req = fakes.HTTPRequest.blank('/fake/servers/detail' '?limit=3&blah=2:t' '&sort_key=id1&sort_dir=asc') res = self.controller.detail(req) servers = res['servers'] self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))], [s['id'] for s in servers]) servers_links = res['servers_links'] self.assertEqual('next', servers_links[0]['rel']) # Retrieve the parameters from the next link, they should contain the # same limit, filter, and sort information as the original request as # well as a marker; this ensures that the caller can simply use the # "next" link and that they do not need to manually insert the limit # and sort information. href_parts = urlparse.urlparse(servers_links[0]['href']) self.assertEqual('/v2/fake/servers/detail', href_parts.path) params = urlparse.parse_qs(href_parts.query) expected = {'limit': ['3'], 'blah': ['2:t'], 'sort_key': ['id1'], 'sort_dir': ['asc'], 'marker': [fakes.get_fake_uuid(2)]} self.assertThat(params, matchers.DictMatches(expected))
def delete(self, req, id): if self.ext_mgr.is_loaded('os-extended-quotas'): context = req.environ['nova.context'] authorize_delete(context) params = urlparse.parse_qs(req.environ.get('QUERY_STRING', '')) user_id = params.get('user_id', [None])[0] if user_id and not self.ext_mgr.is_loaded('os-user-quotas'): raise webob.exc.HTTPNotFound() try: nova.context.authorize_project_context(context, id) # NOTE(alex_xu): back-compatible with db layer hard-code admin # permission checks. This has to be left only for API v2.0 # because this version has to be stable even if it means that # only admins can call this method while the policy could be # changed. nova.context.require_admin_context(context) if user_id: QUOTAS.destroy_all_by_project_and_user(context, id, user_id) else: QUOTAS.destroy_all_by_project(context, id) return webob.Response(status_int=202) except exception.Forbidden: raise webob.exc.HTTPForbidden() raise webob.exc.HTTPNotFound()
def test_simple_notification(self): responses.add('POST', 'https://api.pushover.net/1/messages.json', body=SUCCESS) self.plugin.set_option('userkey', 'abcdef', self.project) self.plugin.set_option('apikey', 'ghijkl', self.project) group = self.create_group(message='Hello world', culprit='foo.bar') event = self.create_event( group=group, message='Hello world', tags={'level': 'warning'}, ) rule = Rule.objects.create(project=self.project, label='my rule') notification = Notification(event=event, rule=rule) with self.options({'system.url-prefix': 'http://example.com'}): self.plugin.notify(notification) request = responses.calls[0].request payload = parse_qs(request.body) assert payload == { 'message': ['{}\n\nTags: level=warning'.format(event.get_legacy_message())], 'title': ['Bar: Hello world'], 'url': ['http://example.com/baz/bar/issues/{}/'.format(group.id)], 'url_title': ['Issue Details'], 'priority': ['0'], 'user': ['abcdef'], 'token': ['ghijkl'], }
def test_notification_without_culprit(self): responses.add('POST', 'http://example.com/slack') self.plugin.set_option('webhook', 'http://example.com/slack', self.project) self.plugin.set_option('exclude_culprit', True, self.project) group = self.create_group(message='Hello world', culprit='foo.bar') event = self.create_event(group=group, message='Hello world', tags={'level': 'warning'}) rule = Rule.objects.create(project=self.project, label='my rule') notification = Notification(event=event, rule=rule) with self.options({'system.url-prefix': 'http://example.com'}): self.plugin.notify(notification) request = responses.calls[0].request payload = json.loads(parse_qs(request.body)['payload'][0]) assert payload == { 'username': 'Sentry', 'attachments': [ { 'color': '#f18500', 'fields': [ { 'short': True, 'value': 'foo Bar', 'title': 'Project' }, ], 'fallback': '[foo Bar] Hello world', 'title': 'Hello world', 'title_link': 'http://example.com/baz/bar/issues/1/?referrer=slack', }, ], }
def test_notification_without_project(self): responses.add('POST', 'http://example.com/slack') self.plugin.set_option('webhook', 'http://example.com/slack', self.project) self.plugin.set_option('exclude_project', True, self.project) group = self.create_group(message='Hello world', culprit='foo.bar') event = self.create_event(group=group, message='Hello world', tags={'level': 'warning'}) rule = Rule.objects.create(project=self.project, label='my rule') notification = Notification(event=event, rule=rule) with self.options({'system.url-prefix': 'http://example.com'}): self.plugin.notify(notification) request = responses.calls[0].request payload = json.loads(parse_qs(request.body)['payload'][0]) assert payload == { 'username': 'Sentry', 'attachments': [ { 'color': '#f18500', 'fields': [ { 'short': False, 'value': 'foo.bar', 'title': 'Culprit', }, ], 'fallback': '[foo Bar] Hello world', 'title': 'Hello world', 'title_link': 'http://example.com/baz/bar/issues/1/?referrer=slack', }, ], }
def query(self): return parse.parse_qs(self.parsed_url.query, keep_blank_values=True)
def test_reference_name(self): full_url = protocol.ticket_request_url( "http://example.co.uk/path/to/resource", reference_name="1", start=2, end=100) parsed = urlparse(full_url) self.assertEqual(parsed.scheme, "http") self.assertEqual(parsed.netloc, "example.co.uk") self.assertEqual(parsed.path, "/path/to/resource") query = parse_qs(parsed.query) self.assertEqual(query["referenceName"], ["1"]) self.assertEqual(query["start"], ["2"]) self.assertEqual(query["end"], ["100"]) self.assertEqual(len(query), 3)
def test_reference_md5(self): md5 = "b9185d4fade27aa27e17f25fafec695f" full_url = protocol.ticket_request_url( "https://example.com/resource", reference_md5=md5) parsed = urlparse(full_url) self.assertEqual(parsed.scheme, "https") self.assertEqual(parsed.netloc, "example.com") self.assertEqual(parsed.path, "/resource") query = parse_qs(parsed.query) self.assertEqual(query["referenceMD5"], [md5]) self.assertEqual(len(query), 1)
def test_format(self): for data_format in ["cram", "CRAM", "BAM"]: full_url = protocol.ticket_request_url( "http://example.co.uk/path/to/resource", data_format=data_format) parsed = urlparse(full_url) self.assertEqual(parsed.scheme, "http") self.assertEqual(parsed.netloc, "example.co.uk") self.assertEqual(parsed.path, "/path/to/resource") query = parse_qs(parsed.query) self.assertEqual(query["format"], [data_format.upper()])
def test_embedded_query_strings(self): full_url = protocol.ticket_request_url("http://a.com/stuff?a=a&b=b") query = parse_qs(urlparse(full_url).query) self.assertEqual(query["a"], ["a"]) self.assertEqual(query["b"], ["b"]) full_url = protocol.ticket_request_url( "http://a.com/stuff?a=a&b=b", reference_name="123") query = parse_qs(urlparse(full_url).query) self.assertEqual(query["a"], ["a"]) self.assertEqual(query["b"], ["b"]) self.assertEqual(query["referenceName"], ["123"])
def ticket_request_url( url, fmt=None, reference_name=None, reference_md5=None, start=None, end=None, fields=None, tags=None, notags=None, data_format=None): parsed_url = urlparse(url) get_vars = parse_qs(parsed_url.query) # TODO error checking if reference_name is not None: get_vars["referenceName"] = reference_name if reference_md5 is not None: get_vars["referenceMD5"] = reference_md5 if start is not None: get_vars["start"] = int(start) if end is not None: get_vars["end"] = int(end) if data_format is not None: get_vars["format"] = data_format.upper() # if fields is not None: # get_vars["fields"] = ",".join(fields) # if tags is not None: # get_vars["tags"] = ",".join(tags) # if notags is not None: # get_vars["notags"] = ",".join(notags) new_url = list(parsed_url) new_url[4] = urlencode(get_vars, doseq=True) return urlunparse(new_url)