我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urlencode()。
def view_autocomplete(self, request, group, **kwargs): field = request.GET.get('autocomplete_field') query = request.GET.get('autocomplete_query') if field != 'issue_id' or not query: return Response({'issue_id': []}) query = query.encode('utf-8') _url = '%s?%s' % (self.build_api_url(group, 'search'), urlencode({'query': query})) try: req = self.make_api_request(group.project, _url) body = safe_urlread(req) except (requests.RequestException, PluginError) as e: return self.handle_api_error(e) try: json_resp = json.loads(body) except ValueError as e: return self.handle_api_error(e) resp = json_resp.get('stories', {}) stories = resp.get('stories', []) issues = [{'text': '(#%s) %s' % (i['id'], i['name']), 'id': i['id']} for i in stories] return Response({field: issues})
def __call__(self): headers = { "Content-Type": "application/x-www-form-urlencoded", "Cache-Control": "no-cache", } body = urlparse.urlencode({ "client_id": self.api_key, "client_secret": self.client_secret, "jwt_token": self.jwt_token }) r = requests.post(self.endpoint, headers=headers, data=body) if r.status_code != 200: raise RuntimeError("Unable to authorize against {}:\n" "Response Code: {:d}, Response Text: {}\n" "Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers)) self.set_expiry(r.json()['expires_in']) return r.json()['access_token']
def _build_query(self, params): """Builds a query string. Args: params: dict, the query parameters Returns: The query parameters properly encoded into an HTTP URI query string. """ if self.alt_param is not None: params.update({'alt': self.alt_param}) astuples = [] for key, value in six.iteritems(params): if type(value) == type([]): for x in value: x = x.encode('utf-8') astuples.append((key, x)) else: if isinstance(value, six.text_type) and callable(value.encode): value = value.encode('utf-8') astuples.append((key, value)) return '?' + urlencode(astuples)
def _list_request(self, resource, permanent=False, **kwargs): """Get the list of objects of the specified type. :param resource: The name of the REST resource, e.g., 'audits'. "param **kw: Parameters for the request. :return: A tuple with the server response and deserialized JSON list of objects """ uri = self._get_uri(resource, permanent=permanent) if kwargs: uri += "?%s" % urlparse.urlencode(kwargs) resp, body = self.get(uri) self.expected_success(200, int(resp['status'])) return resp, self.deserialize(body)
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters): parameters = [ ("digits", hotp._length), ("secret", base64.b32encode(hotp._key)), ("algorithm", hotp._algorithm.name.upper()), ] if issuer is not None: parameters.append(("issuer", issuer)) parameters.extend(extra_parameters) uriparts = { "type": type_name, "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer else quote(account_name)), "parameters": urlencode(parameters), } return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def build_login_url(self, redirect_uri, state=None, scope=None): if not state: state = generate_token() params = { 'client_id': self.client_id, 'redirect_uri': redirect_uri, 'state': state, } if scope: params['scope'] = scope login_url = OAUTH_DIALOG_URL.format(urlencode(params)) return login_url, state
def build_login_url(self, redirect_uri, state=None, scope='email'): if not state: state = generate_token() params = { 'response_type': 'code', 'client_id': self.client_id, 'redirect_uri': redirect_uri, 'state': state, } if scope: params['scope'] = scope login_url = OAUTH_DIALOG_URL.format(urlencode(params)) return login_url, state
def _redirect_with_params(url_name, *args, **kwargs): """Helper method to create a redirect response with URL params. This builds a redirect string that converts kwargs into a query string. Args: url_name: The name of the url to redirect to. kwargs: the query string param and their values to build. Returns: A properly formatted redirect string. """ url = urlresolvers.reverse(url_name, args=args) params = parse.urlencode(kwargs, True) return "{0}?{1}".format(url, params)
def get_authorize_url(self, state=None): """ Gets the URL to use to authorize this app """ payload = {'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.redirect_uri} if self.scope: payload['scope'] = self.scope if state is None: state = self.state if state is not None: payload['state'] = state urlparams = urllibparse.urlencode(payload) return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
def sort_url_by_query_keys(url): """A helper function which sorts the keys of the query string of a url. For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10' returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to prevent non-deterministic ordering of the query string causing problems with unit tests. :param url: url which will be ordered by query keys :returns url: url with ordered query keys """ parsed = urlparse.urlparse(url) queries = urlparse.parse_qsl(parsed.query, True) sorted_query = sorted(queries, key=lambda x: x[0]) encoded_sorted_query = urlparse.urlencode(sorted_query, True) url_parts = (parsed.scheme, parsed.netloc, parsed.path, parsed.params, encoded_sorted_query, parsed.fragment) return urlparse.urlunparse(url_parts)
def find(self, base_url=None, **kwargs): """Find a single item with attributes matching ``**kwargs``. :param base_url: if provided, the generated URL will be appended to it """ kwargs = self._filter_kwargs(kwargs) rl = self._list( '%(base_url)s%(query)s' % { 'base_url': self.build_url(base_url=base_url, **kwargs), 'query': '?%s' % parse.urlencode(kwargs) if kwargs else '', }, self.collection_key) num = len(rl) if num == 0: msg = _("No %(name)s matching %(args)s.") % { 'name': self.resource_class.__name__, 'args': kwargs } raise exceptions.NotFound(msg) elif num > 1: raise exceptions.NoUniqueMatch else: return rl[0]
def get_session_key(self): """Requests session key Issues a GET request to the `get-session-key` endpoint for subsequent use in requests from the `secrets` endpoint. :Returns: String containing session key. """ req = requests.post( '{}/secrets/get-session-key/?preserve_key=True'.format(self.base), headers={ 'accept': 'application/json', 'authorization': 'Token {}'.format(self.token), 'Content-Type': 'application/x-www-form-urlencoded', }, data=urlencode({ 'private_key': self.private_key.strip('\n') }) ) if req.ok: return json.loads(req.text)['session_key'] else: raise RequestError(req)
def _list_request(self, resource, permanent=False, headers=None, extra_headers=False, **kwargs): """Get the list of objects of the specified type. :param resource: The name of the REST resource, e.g., 'nodes'. :param headers: List of headers to use in request. :param extra_headers: Specify whether to use headers. :param **kwargs: Parameters for the request. :returns: A tuple with the server response and deserialized JSON list of objects """ uri = self._get_uri(resource, permanent=permanent) if kwargs: uri += "?%s" % urllib.urlencode(kwargs) resp, body = self.get(uri, headers=headers, extra_headers=extra_headers) self.expected_success(http_client.OK, resp.status) return resp, self.deserialize(body)
def find(self, base_url=None, **kwargs): """Find a single item with attributes matching ``**kwargs``. :param base_url: if provided, the generated URL will be appended to it """ kwargs = self._filter_kwargs(kwargs) rl = self._list( '%(base_url)s%(query)s' % { 'base_url': self.build_url(base_url=base_url, **kwargs), 'query': '?%s' % parse.urlencode(kwargs) if kwargs else '', }, self.collection_key) num = len(rl) if num == 0: msg = _("No %(name)s matching %(args)s.") % { 'name': self.resource_class.__name__, 'args': kwargs } raise exceptions.NotFound(404, msg) elif num > 1: raise exceptions.NoUniqueMatch else: return rl[0]
def syncStories(sync, priority, **kw): app = sync.app params = {} for k,v in kw.items(): if v is not None: params[k] = v params = urlparse.urlencode(params) remote = sync.get('v1/stories?%s' % (params,)) tasks = [] for remote_story in remote: t = SyncStoryTask(remote_story['id'], remote_story, priority=priority) sync.submitTask(t) tasks.append(t) return tasks
def _format_metadata_url(self, api_key, page_number): """Build the query RL for the quandl WIKI metadata. """ query_params = [ ('per_page', '100'), ('sort_by', 'id'), ('page', str(page_number)), ('database_code', 'WIKI'), ] if api_key is not None: query_params = [('api_key', api_key)] + query_params return ( 'https://www.quandl.com/api/v3/datasets.csv?' + urlencode(query_params) )
def _format_wiki_url(self, api_key, symbol, start_date, end_date, data_frequency): """ Build a query URL for a quandl WIKI dataset. """ query_params = [ ('start_date', start_date.strftime('%Y-%m-%d')), ('end_date', end_date.strftime('%Y-%m-%d')), ('order', 'asc'), ] if api_key is not None: query_params = [('api_key', api_key)] + query_params return ( "https://www.quandl.com/api/v3/datasets/WIKI/" "{symbol}.csv?{query}".format( symbol=symbol, query=urlencode(query_params), ) )
def live_request(channel): token = channel_token(channel) if keys.ERROR in token: return token else: q = UsherQuery('api/channel/hls/{channel}.m3u8') q.add_urlkw(keys.CHANNEL, channel) q.add_param(keys.SIG, token[keys.SIG]) q.add_param(keys.TOKEN, token[keys.TOKEN]) q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE) q.add_param(keys.ALLOW_SPECTRE, Boolean.TRUE) q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE) url = '?'.join([q.url, urlencode(q.params)]) request_dict = {'url': url, 'headers': q.headers} log.debug('live_request: |{0}|'.format(str(request_dict))) return request_dict
def video_request(video_id): video_id = valid_video_id(video_id) if video_id: token = vod_token(video_id) if keys.ERROR in token: return token else: q = UsherQuery('vod/{id}') q.add_urlkw(keys.ID, video_id) q.add_param(keys.NAUTHSIG, token[keys.SIG]) q.add_param(keys.NAUTH, token[keys.TOKEN]) q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE) q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE) url = '?'.join([q.url, urlencode(q.params)]) request_dict = {'url': url, 'headers': q.headers} log.debug('video_request: |{0}|'.format(str(request_dict))) return request_dict else: raise NotImplementedError('Unknown Video Type')
def test_get_transfer_ticket(self): dc_path = 'datacenter-1' ds_name = 'datastore-1' params = {'dcPath': dc_path, 'dsName': ds_name} query = urlparse.urlencode(params) url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query session = mock.Mock() session.invoke_api = mock.Mock() class Ticket(object): id = 'fake_id' session.invoke_api.return_value = Ticket() ds_url = datastore.DatastoreURL.urlparse(url) ticket = ds_url.get_transfer_ticket(session, 'PUT') self.assertEqual('%s="%s"' % (constants.CGI_COOKIE_KEY, 'fake_id'), ticket)
def _data_for_exchange(self, code=None, username=None, password=None, scope=None, user_id=None): client_params = { "client_id": self.api.client_id, "client_secret": self.api.client_secret, "redirect_uri": self.api.redirect_uri, "grant_type": "authorization_code" } if code: client_params.update(code=code) elif username and password: client_params.update(username=username, password=password, grant_type="password") if scope: client_params.update(scope=' '.join(scope)) elif user_id: client_params.update(user_id=user_id) return urlencode(client_params)
def get_instance(self, type, id, search_opts=None, session_id=None): if session_id: headers = {'X-Configuration-Session': session_id} else: headers = {} if search_opts is None: search_opts = {} query_params = {} for key, val in search_opts.items(): if val: query_params[key] = val query_string = "" if query_params: params = sorted(query_params.items(), key=lambda x: x[0]) query_string = "?%s" % parse.urlencode(params) url = ("/protectables/{protectable_type}/instances/" "{protectable_id}{query_string}").format( protectable_type=type, protectable_id=id, query_string=query_string) return self._get(url, response_key="instance", headers=headers)
def get_histories(self,usage,search_opts=None): if search_opts is None: search_opts = {} qparams = {} for opt, val in six.iteritems(search_opts): if val: qparams[opt] = val # Transform the dict to a sequence of two-element tuples in fixed # order, then the encoded string will be consistent in Python 2&3. if qparams: items = list(qparams.items()) new_qparams = sorted(items, key=lambda x: x[0]) query_string = "?%s" % parse.urlencode(new_qparams) else: query_string = "" return self._get_histories("/usages/%s/histories%s" % (usage ,query_string), "histories")
def list(self,search_opts=None): if search_opts is None: search_opts = {} qparams = {} for opt, val in six.iteritems(search_opts): if val: qparams[opt] = val # Transform the dict to a sequence of two-element tuples in fixed # order, then the encoded string will be consistent in Python 2&3. if qparams: items = list(qparams.items()) new_qparams = sorted(items, key=lambda x: x[0]) query_string = "?%s" % parse.urlencode(new_qparams) else: query_string = "" return self._list("/usages%s" % (query_string), "usages")
def list(self, **kwargs): """Get a list of software configs. :rtype: list of :class:`SoftwareConfig` """ qparams = {} for opt, val in six.iteritems(kwargs): if val: qparams[opt] = val # Transform the dict to a sequence of two-element tuples in fixed # order, then the encoded string will be consistent in Python 2&3. if qparams: new_qparams = sorted(qparams.items(), key=lambda x: x[0]) query_string = "?%s" % parse.urlencode(new_qparams) else: query_string = "" url = '/software_configs%s' % query_string return self._list(url, "software_configs")
def get(self, stack_id, resource_name, with_attr=None): """Get the details for a specific resource. :param stack_id: ID of stack containing the resource :param resource_name: ID of resource to get the details for :param with_attr: Attributes to show """ stack_id = self._resolve_stack_id(stack_id) url_str = '/stacks/%s/resources/%s' % ( parse.quote(stack_id, ''), parse.quote(encodeutils.safe_encode(resource_name), '')) if with_attr: params = {'with_attr': with_attr} url_str += '?%s' % parse.urlencode(params, True) resp = self.client.get(url_str) body = utils.get_response_body(resp) return Resource(self, body.get('resource'))
def list(self, detailed=True, is_public=True): """ Get a list of all flavors. :rtype: list of :class:`Flavor`. """ qparams = {} # is_public is ternary - None means give all flavors. # By default Nova assumes True and gives admins public flavors # and flavors from their own projects only. if not is_public: qparams['is_public'] = is_public query_string = "?%s" % parse.urlencode(qparams) if qparams else "" detail = "" if detailed: detail = "/detail" return self._list("/flavors%s%s" % (detail, query_string), "flavors")
def do_request(self, method, action, body=None, headers=None, params=None): action = self.action_prefix + action if type(params) is dict and params: params = utils.safe_encode_dict(params) action += '?' + urlparse.urlencode(params, doseq=1) if body: body = self.serialize(body) resp, replybody = self.httpclient.do_request( action, method, body=body, content_type=self.content_type()) status_code = resp.status_code if status_code in (requests.codes.ok, requests.codes.created, requests.codes.accepted, requests.codes.no_content): return self.deserialize(replybody, status_code) else: if not replybody: replybody = resp.reason self._handle_fault_response(status_code, replybody)
def find(self, base_url=None, **kwargs): """Find a single item with attributes matching ``**kwargs``. :param base_url: if provided, the generated URL will be appended to it """ kwargs = self._filter_kwargs(kwargs) rl = self._list( '%(base_url)s%(query)s' % { 'base_url': self.build_url(base_url=base_url, **kwargs), 'query': '?%s' % parse.urlencode(kwargs) if kwargs else '', }, self.collection_key) num = len(rl) if num == 0: msg = "No %s matching %s." % (self.resource_class.__name__, kwargs) raise exceptions.NotFound(404, msg) elif num > 1: raise exceptions.NoUniqueMatch else: return rl[0]