我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用six.moves.urllib.parse.quote()。
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters): parameters = [ ("digits", hotp._length), ("secret", base64.b32encode(hotp._key)), ("algorithm", hotp._algorithm.name.upper()), ] if issuer is not None: parameters.append(("issuer", issuer)) parameters.extend(extra_parameters) uriparts = { "type": type_name, "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer else quote(account_name)), "parameters": urlencode(parameters), } return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
def encode(data, mime_type='', charset='utf-8', base64=True): """ Encode data to DataURL """ if isinstance(data, six.text_type): data = data.encode(charset) else: charset = None if base64: data = utils.text(b64encode(data)) else: data = utils.text(quote(data)) result = ['data:', ] if mime_type: result.append(mime_type) if charset: result.append(';charset=') result.append(charset) if base64: result.append(';base64') result.append(',') result.append(data) return ''.join(result)
def test_encoding_rdf(): # With encoding specified encoding = "ISO-8859-1" csvw = CSVW(csv_path="./tests/iso_encoding.csv", metadata_path="./tests/iso_encoding.csv-metadata.json", csv_encoding=encoding) rdf_output = csvw.to_rdf() g = ConjunctiveGraph() g.parse(data=rdf_output, format="turtle") units = Namespace('http://example.org/units/') cars = Namespace('http://example.org/cars/') meta = Namespace("http://example.org/properties/") expected_unit = units[quote(u"\xb5100".encode('utf-8'))] assert (cars['1'], meta['UnitOfMeasurement'], expected_unit) in g assert expected_unit in list(g.objects())
def encode_string(value, double=False): """ Url encode a string to ASCII in order to escape any characters not allowed :, /, ?, #, &, =. If parameter 'double=True' perform two passes of encoding which may be required for some REST api endpoints. This is usually done to remove any additional special characters produce by the single encoding pass. :param value: Value to encode :param double: Double encode string :return: """ # Replace special characters in string using the %xx escape encoded_str = quote(value, '') if double: # double encode encoded_str = quote(encoded_str, '') return encoded_str
def get(self, stack_id, resource_name, with_attr=None): """Get the details for a specific resource. :param stack_id: ID of stack containing the resource :param resource_name: ID of resource to get the details for :param with_attr: Attributes to show """ stack_id = self._resolve_stack_id(stack_id) url_str = '/stacks/%s/resources/%s' % ( parse.quote(stack_id, ''), parse.quote(encodeutils.safe_encode(resource_name), '')) if with_attr: params = {'with_attr': with_attr} url_str += '?%s' % parse.urlencode(params, True) resp = self.client.get(url_str) body = utils.get_response_body(resp) return Resource(self, body.get('resource'))
def mark_unhealthy(self, stack_id, resource_name, mark_unhealthy, resource_status_reason): """Mark a resource as healthy or unhealthy. :param stack_id: ID of stack containing the resource :param resource_name: ID of resource :param mark_unhealthy: Mark resource unhealthy if set to True :param resource_status_reason: Reason for resource status change. """ stack_id = self._resolve_stack_id(stack_id) url_str = '/stacks/%s/resources/%s' % ( parse.quote(stack_id, ''), parse.quote(encodeutils.safe_encode(resource_name), '')) resp = self.client.patch( url_str, data={"mark_unhealthy": mark_unhealthy, "resource_status_reason": resource_status_reason}) body = utils.get_response_body(resp) return body
def _send_update(self, url, attribute_map, attribute_calculations, record): d = record.date.replace(tzinfo=self.station_time_zone).astimezone(pytz.UTC).replace(tzinfo=None) url = '%s&ID=%s&PASSWORD=%s&dateutc=%s' % ( url, self.station_id, url_quote(self.password, safe=''), url_quote(d.strftime('%Y-%m-%d %H:%M:%S'), safe=''), ) for parameter, field in attribute_map or []: if field and record[field] is not None: url += '&%s=%s' % (parameter, url_quote(record[field], safe=''), ) for parameter, function in attribute_calculations or []: if function: value = function(self, record) if value is not None: url += '&%s=%s' % (parameter, url_quote(value, safe=''), ) response = self._session.get(url) assert 200 <= response.status_code < 300, 'Status code %s unexpected' % response.status_code assert response.text == 'success', 'Response "%s" unexpected' % response.text
def code(item): """ Turn a NameID class instance into a quoted string of comma separated attribute,value pairs. The attribute names are replaced with digits. Depends on knowledge on the specific order of the attributes for the class that is used. :param item: The class instance :return: A quoted string """ _res = [] i = 0 for attr in ATTR: val = getattr(item, attr) if val: _res.append("%d=%s" % (i, quote(val))) i += 1 return ",".join(_res)
def check_connection(default='http://google.com', timeout=1): """Test the internet connection. Parameters ---------- default : str URL to test; defaults to a Google IP address. timeout : number Time in seconds to wait before giving up. Returns ------- success : bool True if appears to be online, else False """ success = True try: surl = urlparse.quote(default, safe=':./') urlrequest.urlopen(surl, timeout=timeout) except urlerror.URLError as derp: success = False logger.debug("Network unreachable: {}".format(derp)) return success
def get_local_path(self, img, pfmri): """Return an opener for the license text from the local disk or None if the data for the text is not on-disk.""" if img.version <= 3: # Older images stored licenses without accounting for # '/', spaces, etc. properly. path = os.path.join(img.get_license_dir(pfmri), "license." + self.attrs["license"]) else: # Newer images ensure licenses are stored with encoded # name so that '/', spaces, etc. are properly handled. path = os.path.join(img.get_license_dir(pfmri), "license." + quote(self.attrs["license"], "")) return path
def abandon_0(self, *tokens): """Aborts an in-flight transaction for the Transaction ID specified in the request path. Returns no output.""" try: # cherrypy decoded it, but we actually need it encoded. trans_id = quote(tokens[0], "") except IndexError: trans_id = None try: self.repo.abandon(trans_id) except srepo.RepositoryError as e: # Assume a bad request was made. A 404 can't be # returned here as misc.versioned_urlopen will interpret # that to mean that the server doesn't support this # operation. raise cherrypy.HTTPError(http_client.BAD_REQUEST, str(e))
def manifest(self, *tokens): """Manifest requests coming from the BUI need to be redirected back through the RewriteRules defined in the Apache configuration in order to be served directly. pkg(1) will never hit this code, as those requests don't get handled by this webapp. """ self.setup(cherrypy.request) rel_uri = cherrypy.request.path_info # we need to recover the escaped portions of the URI redir = rel_uri.lstrip("/").split("/") pub_mf = "/".join(redir[0:4]) pkg_name = "/".join(redir[4:]) # encode the URI so our RewriteRules can process them pkg_name = quote(pkg_name) pkg_name = pkg_name.replace("/", "%2F") pkg_name = pkg_name.replace("%40", "@", 1) # build a URI that we can redirect to redir = "{0}/{1}".format(pub_mf, pkg_name) redir = "/{0}".format(redir.lstrip("/")) raise cherrypy.HTTPRedirect(redir)
def get_project(self, repo): return self.request('GET', '/projects/{}'.format(quote(repo, safe='')))
def get_issue(self, repo, issue_id): try: return self.request( 'GET', '/projects/{}/issues'.format( quote(repo, safe=''), ), params={ # XXX(dcramer): this is an undocumented API 'iid': issue_id, } )[0] except IndexError: raise ApiError('Issue not found with ID', 404)
def create_issue(self, repo, data): return self.request( 'POST', '/projects/{}/issues'.format(quote(repo, safe='')), data=data, )
def create_note(self, repo, global_issue_id, data): return self.request( 'POST', '/projects/{}/issues/{}/notes'.format( quote(repo, safe=''), global_issue_id, ), data=data, )
def list_project_members(self, repo): return self.request( 'GET', '/projects/{}/members'.format(quote(repo, safe='')), )
def query_single(self, object_type, url_params, query_params=None): # type: (str, list, dict) -> dict """ Query for a single object. :param object_type: string query type (e.g., "users" or "groups") :param url_params: required list of strings to provide as additional URL components :param query_params: optional dictionary of query options :return: the found object (a dictionary), which is empty if none were found """ # Server API convention (v2) is that the pluralized object type goes into the endpoint # but the object type is the key in the response dictionary for the returned object. self.local_status["single-query-count"] += 1 query_type = object_type + "s" # poor man's plural query_path = "/organizations/{}/{}".format(self.org_id, query_type) for component in url_params if url_params else []: query_path += "/" + urlparse.quote(component, safe='/@') if query_params: query_path += "?" + urlparse.urlencode(query_params) try: result = self.make_call(query_path) body = result.json() except RequestError as re: if re.result.status_code == 404: if self.logger: self.logger.debug("Ran %s query: %s %s (0 found)", object_type, url_params, query_params) return {} else: raise re if body.get("result") == "success": value = body.get(object_type, {}) if self.logger: self.logger.debug("Ran %s query: %s %s (1 found)", object_type, url_params, query_params) return value else: raise ClientError("OK status but no 'success' result", result)
def query_multiple(self, object_type, page=0, url_params=None, query_params=None): # type: (str, int, list, dict) -> tuple """ Query for a page of objects. Defaults to the (0-based) first page. Sadly, the sort order is undetermined. :param object_type: string query type (e.g., "users" or "groups") :param page: numeric page (0-based) of results to get (up to 200 in a page) :param url_params: optional list of strings to provide as additional URL components :param query_params: optional dictionary of query options :return: tuple (list of returned dictionaries (one for each query result), bool for whether this is last page) """ # Server API convention (v2) is that the pluralized object type goes into the endpoint # and is also the key in the response dictionary for the returned objects. self.local_status["multiple-query-count"] += 1 query_type = object_type + "s" # poor man's plural query_path = "/{}/{}/{:d}".format(query_type, self.org_id, page) for component in url_params if url_params else []: query_path += "/" + urlparse.quote(component) if query_params: query_path += "?" + urlparse.urlencode(query_params) try: result = self.make_call(query_path) body = result.json() except RequestError as re: if re.result.status_code == 404: if self.logger: self.logger.debug("Ran %s query: %s %s (0 found)", object_type, url_params, query_params) return [], True else: raise re if body.get("result") == "success": values = body.get(query_type, []) last_page = body.get("lastPage", False) if self.logger: self.logger.debug("Ran multi-%s query: %s %s (page %d: %d found)", object_type, url_params, query_params, page, len(values)) return values, last_page else: raise ClientError("OK status but no 'success' result", result)
def __extract_metadata(self, doc, payload): filename = os.path.basename(doc.path) headers = { 'Accept': 'application/json', 'Content-Disposition': 'attachment; filename=%s' % url_quote(filename) } if doc.meta['Content-Type']: headers['Content-Type'] = doc.meta['Content-Type'] tika_url = self.config.get(helper.TIKA_META) connection = self.config[helper.INJECTOR].get_http_connection(tika_url) payload.seek(0) connection.request('PUT', '/meta', payload.read(), headers) payload.seek(0) response = connection.getresponse() try: if response.status >= 400: logging.error('tika error %d (%s): %s', response.status, response.reason, doc.path) return {} response_data = response.read() finally: response.close() try: result = json.loads(response_data.decode('utf-8')) except (ValueError, UnicodeDecodeError): logging.error('invalid response from tika for %s', doc.path) result = {} return result
def _id_to_header(self, id_): """Convert an id to a Content-ID header value. Args: id_: string, identifier of individual request. Returns: A Content-ID header with the id_ encoded into it. A UUID is prepended to the value because Content-ID headers are supposed to be universally unique. """ if self._base_id is None: self._base_id = uuid.uuid4() return '<%s+%s>' % (self._base_id, quote(id_))
def encode_params(obj, keys=()): if isinstance(obj, dict): # key[bar]=1&key[baz]=2 return "&".join(encode_params(val, keys + (key,)) for key, val in obj.items()) # key[foo][]=1&key[foo][]=2 elif isinstance(obj, (list, tuple)): return "&".join(encode_params(val, keys + ("",)) for val in obj) # `obj` is just a value, key=1 else: encoded_keys = "" for depth, key in enumerate(keys): # All keys but top-level keys should be in brackets, i.e. # `key[foo]=1`, not `[key][foo]=1` encoded_keys += key if depth == 0 else "[" + key + "]" return quote(encoded_keys) + "=" + quote(obj)
def swift_get_container(request, container_name, with_data=True): if with_data: headers, data = swift_api(request).get_object(container_name, "") else: data = None headers = swift_api(request).head_container(container_name) timestamp = None is_public = False public_url = None try: is_public = GLOBAL_READ_ACL in headers.get('x-container-read', '') if is_public: swift_endpoint = base.url_for(request, 'object-store', endpoint_type='publicURL') parameters = urlparse.quote(container_name.encode('utf8')) public_url = swift_endpoint + '/' + parameters ts_float = float(headers.get('x-timestamp')) timestamp = datetime.utcfromtimestamp(ts_float).isoformat() except Exception: pass container_info = { 'name': container_name, 'container_object_count': headers.get('x-container-object-count'), 'container_bytes_used': headers.get('x-container-bytes-used'), 'timestamp': timestamp, 'data': data, 'is_public': is_public, 'public_url': public_url, } return Container(container_info)
def get_pagination_options(limit=None, marker=None, sorts=None): options = [] if limit: options.append("limit=%d" % limit) if marker: options.append("marker=%s" % urllib_parse.quote(marker)) for sort in sorts or []: options.append("sort=%s" % urllib_parse.quote(sort)) return "&".join(options)
def oauth2_authorize(request): """ View to start the OAuth2 Authorization flow. This view starts the OAuth2 authorization flow. If scopes is passed in as a GET URL parameter, it will authorize those scopes, otherwise the default scopes specified in settings. The return_url can also be specified as a GET parameter, otherwise the referer header will be checked, and if that isn't found it will return to the root path. Args: request: The Django request object. Returns: A redirect to Google OAuth2 Authorization. """ return_url = request.GET.get('return_url', None) if not return_url: return_url = request.META.get('HTTP_REFERER', '/') scopes = request.GET.getlist('scopes', django_util.oauth2_settings.scopes) # Model storage (but not session storage) requires a logged in user if django_util.oauth2_settings.storage_model: if not request.user.is_authenticated(): return redirect('{0}?next={1}'.format( settings.LOGIN_URL, parse.quote(request.get_full_path()))) # This checks for the case where we ended up here because of a logged # out user but we had credentials for it in the first place else: user_oauth = django_util.UserOAuth2(request, scopes, return_url) if user_oauth.has_credentials(): return redirect(return_url) flow = _make_flow(request=request, scopes=scopes, return_url=return_url) auth_url = flow.step1_get_authorize_url() return shortcuts.redirect(auth_url)
def quote_key(key, reverse=False): """Prepare key for storage data in MongoDB. :param key: key that should be quoted :param reverse: boolean, True --- if we need a reverse order of the keys parts :return: iter of quoted part of the key """ r = -1 if reverse else 1 for k in key.split('.')[::r]: if k.startswith('$'): k = parse.quote(k) yield k
def improve_keys(data, metaquery=False): """Improves keys in dict if they contained '.' or started with '$'. :param data: is a dictionary where keys need to be checked and improved :param metaquery: boolean, if True dots are not escaped from the keys :return: improved dictionary if keys contained dots or started with '$': {'a.b': 'v'} -> {'a': {'b': 'v'}} {'$ab': 'v'} -> {'%24ab': 'v'} """ if not isinstance(data, dict): return data if metaquery: for key in six.iterkeys(data): if '.$' in key: key_list = [] for k in quote_key(key): key_list.append(k) new_key = '.'.join(key_list) data[new_key] = data.pop(key) else: for key, value in data.items(): if isinstance(value, dict): improve_keys(value) if '.' in key: new_dict = {} for k in quote_key(key, reverse=True): new = {} new[k] = new_dict if new_dict else data.pop(key) new_dict = new data.update(new_dict) else: if key.startswith('$'): new_key = parse.quote(key) data[new_key] = data.pop(key) return data
def get(self, user_id, return_request_id=None): """Get the details for a specific user. :param user_id: ID of the user :param return_request_id: If an empty list is provided, populate this list with the request ID value from the header x-openstack-request-id """ resp, body = self.client.get('/users/%s' % parse.quote(str(user_id))) if return_request_id is not None: return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None)) data = body.get('user') return self.resource_class(self, data, loaded=True)
def action(self, user_id, **kwargs): """Perform specified action on user. :param user_id: ID of the user """ url = '/users/%s/action' % parse.quote(str(user_id)) return_request_id = kwargs.pop('return_req_id', None) resp, body = self.client.post(url, data=kwargs) if return_request_id is not None: return_request_id.append(resp.headers.get(OS_REQ_ID_HDR, None)) return self.resource_class(self, body.get('user'), loaded=True)
def get(self, policy_id): """Get a specific policy.""" url = '/policies/%s' % parse.quote(str(policy_id)) resq, body = self.client.get(url) return self.resource_class(self, body.get('policy'), loaded=True)
def action(self, policy_id, **kwargs): """Perform specified action on a policy.""" url = '/policies/%s/action' % parse.quote(str(policy_id)) resq, body = self.client.post(url, data=kwargs) return self.resource_class(self, body.get('policy'), loaded=True)
def delete(self, policy_id): """Delete a specific policy.""" return self._delete('/policies/%s' % parse.quote(str(policy_id)))
def delete(self, rule_id): """Delete a specific rule. :param rule_id: Id of the rule to delete """ return self._delete('/rules/%s' % parse.quote(str(rule_id)))