我们从Python开源项目中,提取了以下41个代码示例,用于说明如何使用six.moves.urllib.parse.unquote()。
def _header_to_id(self, header): """Convert a Content-ID header value to an id. Presumes the Content-ID header conforms to the format that _id_to_header() returns. Args: header: string, Content-ID header value. Returns: The extracted id value. Raises: BatchError if the header is not in the expected format. """ if header[0] != '<' or header[-1] != '>': raise BatchError("Invalid value for Content-ID: %s" % header) if '+' not in header: raise BatchError("Invalid value for Content-ID: %s" % header) base, id_ = header[1:-1].rsplit('+', 1) return unquote(id_)
def match(self, request): """Matches this route against the current request. :raises: ``exc.HTTPMethodNotAllowed`` if the route defines :attr:`methods` and the request method isn't allowed. .. seealso:: :meth:`BaseRoute.match`. """ match = self.regex.match(unquote(request.path)) if not match or self.schemes and request.scheme not in self.schemes: return None if self.methods and request.method not in self.methods: # This will be caught by the router, so routes with different # methods can be tried. raise exc.HTTPMethodNotAllowed() args, kwargs = _get_route_variables(match, self.defaults.copy()) return self, args, kwargs
def decode(data_url): """ Decode DataURL data """ metadata, data = data_url.rsplit(',', 1) _, metadata = metadata.split('data:', 1) parts = metadata.split(';') if parts[-1] == 'base64': data = b64decode(data) else: data = unquote(data) for part in parts: if part.startswith("charset="): data = data.decode(part[8:]) return data
def verify_merge_update(self, updates, result): g = self.gitlab for (key, value) in six.iteritems(updates): if key == 'private_token': continue if key == 'state_event': key = 'state' value = self.STATE_EVENT2MERGE_STATE[updates['state_event']] result_value = result.get(key) or '' if value != result_value: url = (g['host'] + "/" + parse.unquote(g['repo']) + "/" + "merge_requests/" + str(result['iid'])) raise ValueError("{url}: {key} value expected to be {value}" " but is {result}".format( url=url, key=key, value=value, result=result_value)) # Local Variables: # compile-command: "cd .. ; virtualenv/bin/tox -e flake8" # End:
def post_account(self, headers, query_string=None, data=None, response_dict=None): if query_string == 'bulk-delete': resp = {'Response Status': '200 OK', 'Response Body': '', 'Number Deleted': 0, 'Number Not Found': 0} if response_dict is not None: response_dict['status'] = 200 if data: for path in data.splitlines(): try: __, container, obj = (unquote(path.decode('utf8')) .split('/', 2)) del self.kvs[container][obj] resp['Number Deleted'] += 1 except KeyError: resp['Number Not Found'] += 1 return {}, json.dumps(resp).encode('utf-8') if response_dict is not None: response_dict['status'] = 204 return {}, None
def reduce_url(url): """ Get path to object on bucket from presigned url :param url: presigned url :type url: str :return: path to object on bucket :rtype str """ if (url is None) or (parse.urlparse(url).path == url): return url upr = parse.urlparse(url) if not upr.scheme: path = url else: path = parse.unquote(upr.path) return path
def choose_name(fp, mfst): """Find the package name for this manifest. If it's defined in a set action in the manifest, use that. Otherwise use the basename of the path to the manifest as the name. If a proper package fmri is found, then also return a PkgFmri object so that we can track which packages are being processed. 'fp' is the path to the file for the manifest. 'mfst' is the Manifest object.""" if mfst is None: return unquote(os.path.basename(fp)), None name = mfst.get("pkg.fmri", mfst.get("fmri", None)) if name is not None: try: pfmri = fmri.PkgFmri(name) except fmri.IllegalFmri: pfmri = None return name, pfmri return unquote(os.path.basename(fp)), None
def _parse_root_device_hints(node): """Convert string with hints to dict. """ root_device = node.properties.get('root_device') if not root_device: return {} try: parsed_hints = irlib_utils.parse_root_device_hints(root_device) except ValueError as e: raise exception.InvalidParameterValue( _('Failed to validate the root device hints for node %(node)s. ' 'Error: %(error)s') % {'node': node.uuid, 'error': e}) root_device_hints = {} advanced = {} for hint, value in parsed_hints.items(): if isinstance(value, six.string_types): if value.startswith('== '): root_device_hints[hint] = int(value[3:]) elif value.startswith('s== '): root_device_hints[hint] = urlparse.unquote(value[4:]) else: advanced[hint] = value else: root_device_hints[hint] = value if advanced: raise exception.InvalidParameterValue( _('Ansible-deploy does not support advanced root device hints ' 'based on oslo.utils operators. ' 'Present advanced hints for node %(node)s are %(hints)s.') % { 'node': node.uuid, 'hints': advanced}) return root_device_hints
def build_path(operation, ns): """ Build a path URI for an operation. """ try: return ns.url_for(operation, _external=False) except BuildError as error: # we are missing some URI path parameters uri_templates = { argument: "{{{}}}".format(argument) for argument in error.suggested.arguments } # flask will sometimes try to quote '{' and '}' characters return unquote(ns.url_for(operation, _external=False, **uri_templates))
def unquote_keys(data): """Restores initial view of 'quoted' keys in dictionary data :param data: is a dictionary :return: data with restored keys if they were 'quoted'. """ if isinstance(data, dict): for key, value in data.items(): if isinstance(value, dict): unquote_keys(value) if key.startswith('%24'): k = parse.unquote(key) data[k] = data.pop(key) return data
def match(self, request): """Matches this route against the current request. .. seealso:: :meth:`BaseRoute.match`. """ match = self.regex.match(unquote(request.path)) if match: return self, match.groups(), {}
def unquote(value, *args, **kwargs): """Decodes a value using urllib.unquote or urllib.parse.unquote(PY3) and deserializes it from JSON. Parameters and return value are the same from :func:`decode`. """ return decode(parse.unquote(value), *args, **kwargs)
def match(self, request): if not self.regex.match(parse.unquote(request.path)): return None return _match_routes(self.get_match_children, request)
def reconnect(self): """Reconnect to rabbitmq server""" parsed = urlparse.urlparse(self.amqp_url) port = parsed.port or 5672 self.connection = amqp.Connection(host="%s:%s" % (parsed.hostname, port), userid=parsed.username or 'guest', password=parsed.password or 'guest', virtual_host=unquote( parsed.path.lstrip('/') or '%2F')) self.channel = self.connection.channel() try: self.channel.queue_declare(self.name) except amqp.exceptions.PreconditionFailed: pass #self.channel.queue_purge(self.name)
def parse_arguments_from_fields(self, for_fields, relative_path): if not for_fields: return {} assert isinstance(relative_path, six.text_type) # Scaffolding for Py3 port matched_arguments = self.match(relative_path).match.groupdict() fields = self.get_temp_url_argument_field_index(for_fields) raw_input_values = dict( [(self.convert_str_to_identifier(key), urllib_parse.unquote(value or '')) for key, value in matched_arguments.items()]) fields.accept_input(raw_input_values) return fields.as_kwargs()
def get_session_key(cls): context = ExecutionContext.get_context() try: raw_cookie = context.request.cookies[context.config.web.session_key_name] return urllib_parse.unquote(raw_cookie) except KeyError: return None
def path_decode(bs): return _decode(unquote(bs.encode('ascii') if PY2 else bs))
def parse_data(self, data): headers = {} data = unquote(data) data = data.strip().splitlines() last_key = None value = '' for line in data: if ': ' in line: key, value = line.split(': ', 1) last_key = key else: key = last_key value += '\n' + line headers[key.strip()] = value.strip() self.headers = headers
def decode(txt): """Turns a coded string by code() into a NameID class instance. :param txt: The coded string """ _nid = NameID() for part in txt.split(","): if part.find("=") != -1: i, val = part.split("=") try: setattr(_nid, ATTR[int(i)], unquote(val)) except: pass return _nid
def qs_as_dict(url): qs_str = urlparse(url).query return dict((x[0], unquote(x[1])) for x in [x.split("=") for x in qs_str.split("&")])
def _process_response(self, response): # Removed due to IronPython Bug # https://github.com/IronLanguages/ironpython2/issues/242 # if response.status_code == 422: # raise HTTPError('Unprocessable Entity for url( # decoded): {}'.format(unquote(response.url))) response.raise_for_status() return response.json()
def extract(test=False): text = requests.get(URL).text text = text.split('g_img={url:')[1] text = text.split(',')[0].replace("'", "").replace('"', '').replace("\\", "") img_url = urljoin(URL, text).replace(" ", "") fname = img_url.split('/')[-1] fname = unquote(fname).split('/')[-1] if not test: print('# img_url:', img_url) print('# fname:', fname) save_name = '{date}-{fname}'.format(date=get_date_from_year_to_day(), fname=fname) return (img_url, save_name)
def close(self, add_to_catalog=True): """Closes an open transaction, returning the published FMRI for the corresponding package, and its current state in the catalog. """ def split_trans_id(tid): m = re.match("(\d+)_(.*)", tid) return m.group(1), unquote(m.group(2)) trans_id = self.get_basename() pkg_fmri = split_trans_id(trans_id)[1] # set package state to SUBMITTED pkg_state = "SUBMITTED" # set state to PUBLISHED if self.append_trans: pkg_fmri, pkg_state = self.accept_append(add_to_catalog) else: pkg_fmri, pkg_state = self.accept_publish( add_to_catalog) # Discard the in-flight transaction data. try: shutil.rmtree(self.dir) except EnvironmentError as e: # Ensure that the error goes to stderr, and then drive # on as the actual package was published. misc.emsg(e) return (pkg_fmri, pkg_state)
def __fmri_from_path(pkgpath, ver): """Helper method that takes the full path to the package directory and the name of the manifest file, and returns an FMRI constructed from the information in those components.""" v = pkg.version.Version(unquote(ver), None) f = fmri.PkgFmri(unquote(os.path.basename(pkgpath))) f.version = v return f
def __parse_v_1(line, pub, v): """This function parses the string returned by a version 1 search server and puts it into the expected format of (query_number, publisher, (version, return_type, (results))) If it receives a line it can't parse, it raises a ServerReturnError.""" fields = line.split(None, 2) if len(fields) != 3: raise apx.ServerReturnError(line) try: return_type = int(fields[1]) query_num = int(fields[0]) except ValueError: raise apx.ServerReturnError(line) if return_type == Query.RETURN_ACTIONS: subfields = fields[2].split(None, 2) pfmri = fmri.PkgFmri(subfields[0]) return pfmri, (query_num, pub, (v, return_type, (pfmri, unquote(subfields[1]), subfields[2]))) elif return_type == Query.RETURN_PACKAGES: pfmri = fmri.PkgFmri(fields[2]) return pfmri, (query_num, pub, (v, return_type, pfmri)) else: raise apx.ServerReturnError(line)
def read_dict_file(self): """Reads in a dictionary stored in with an entity and its number on each line. """ self._dict.clear() for line in self._file_handle: token, offset = line.split(" ") if token[0] == "1": token = unquote(token[1:]) else: token = token[1:] offset = int(offset) self._dict[token] = offset IndexStoreBase.read_dict_file(self)
def __init__(self, origin_url, create_repo=False, pkg_name=None, repo_props=EmptyDict, trans_id=None, xport=None, pub=None, progtrack=None): scheme, netloc, path, params, query, fragment = \ urlparse(origin_url, "http", allow_fragments=0) self.pkg_name = pkg_name self.trans_id = trans_id self.scheme = scheme if scheme == "file": path = unquote(path) self.path = path self.progtrack = progtrack self.transport = xport self.publisher = pub self.__local = False self.__uploaded = 0 self.__uploads = {} self.__transactions = {} self._tmpdir = None self._append_mode = False self._upload_mode = None if scheme == "file": self.__local = True self.create_file_repo(repo_props=repo_props, create_repo=create_repo) elif scheme != "file" and create_repo: raise UnsupportedRepoTypeOperationError("create_repo", type=scheme)
def _build_version(vers): """ Private method for building versions from a string. """ return pkg.version.Version(unquote(vers), None)
def count_manifest(mg, d): try: manifest_by_date[d.date().isoformat()] += 1 except KeyError: manifest_by_date[d.date().isoformat()] = 1 try: manifest_by_ip[mg["ip"]] += 1 except KeyError: manifest_by_ip[mg["ip"]] = 1 pm = pkg_pat.search(mg["uri"]) if pm != None and mg["response"] == "200": pg = pm.groupdict() try: manifest_by_pkg[unquote(pg["stem"])] += 1 except KeyError: manifest_by_pkg[unquote(pg["stem"])] = 1 try: manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] += 1 except KeyError: manifest_by_ver_pkg[unquote(pg["stem"] + "@" + pg["version"])] = 1 agent = pkg_agent_pat.search(mg["agent"]) if agent == None: return ag = agent.groupdict() try: manifest_by_arch[ag["arch"]] += 1 except KeyError: manifest_by_arch[ag["arch"]] = 1
def parse_fs_url(fs_url): """Parse a Filesystem URL and return a `ParseResult`. Arguments: fs_url (str): A filesystem URL. Returns: ~fs.opener.parse.ParseResult: a parse result instance. Raises: ~fs.errors.ParseError: if the FS URL is not valid. """ match = _RE_FS_URL.match(fs_url) if match is None: raise ParseError('{!r} is not a fs2 url'.format(fs_url)) fs_name, credentials, url1, url2, path = match.groups() if credentials: username, _, password = credentials.partition(':') username = unquote(username) password = unquote(password) url = url1 else: username = None password = None url = url2 url, has_qs, _params = url.partition('?') resource = unquote(url) if has_qs: params = parse_qs(_params, keep_blank_values=True) params = {k:v[0] for k, v in params.items()} else: params = {} return ParseResult( fs_name, username, password, resource, params, path )
def trusted_params(uri): """Walk through an URI, lookup the set of DNSKEYs for the origin third party provider domain, validate URI signature against the found keys. If valid, returns the trustable URI - otherwise raise an exception. /!\ User MUST use the returned URI, as signature validation is only done on everything *before* the URI. """ # Truncate the signature= part try: uri, sig = uri.split('&signature=') sig = parse.unquote(sig) except ValueError: raise exceptions.IncompleteURI pr = parse.urlparse(uri) if not pr.query: raise exceptions.IncompleteURI expires = _qsl_get_one(pr.query, 'expires') if (datetime.datetime.utcnow() > datetime.datetime.strptime(expires, '%Y%m%d%H%M%S')): raise exceptions.Expired source = _qsl_get_one(pr.query, 'source') txtl = Checker(source, dnssec=True).txt('_tpda') if not txtl: raise exceptions.NoTPDA keys = [RSA.importKey(base64.b64decode(txt.encode('ascii'))) for txt in txtl.split('\n')] digest = SHA256.new() digest.update(uri.encode('ascii')) for key in keys: signer = PKCS1_v1_5.new(key) if signer.verify(digest, base64.b64decode(sig)): return params(uri) raise exceptions.NoSignatureMatch
def sync(self): pull_f2merge_f = { 'state': 'state', 'body': 'description', 'title': 'title', } for number in sorted(self.pull_requests.keys()): pull = self.pull_requests[number] merge = None if number in self.pull2merge: merge = self.pull2merge[number] else: source_branch = 'pull/' + number + '/head' target_branch = pull['base']['ref'] if (self.rev_parse(pull, source_branch) and self.rev_parse(pull, target_branch)): data = {'title': pull['title'], 'source_branch': source_branch, 'target_branch': target_branch} if pull['body']: data['description'] = pull['body'][:DESCRIPTION_MAX] merge = self.create_merge_request(data) if merge: updates = {} for (pull_field, merge_field) in six.iteritems(pull_f2merge_f): if not self.field_equal(pull, pull_field, pull[pull_field], merge, merge_field, merge[merge_field]): (key, value) = self.field_update(pull, pull_field, pull[pull_field], merge, merge_field, merge[merge_field]) updates[key] = value if updates: self.update_merge_request(merge, updates) else: log.debug("https://github.com/" + self.github['repo'] + "/" + "pull/" + number + " == " + self.gitlab['host'] + "/" + parse.unquote(self.gitlab['repo']) + "/" + "merge_requests/" + str(merge['iid']))
def reopen(self, rstore, trans_dir): """The reopen() method is invoked by the repository as needed to load Transaction data.""" self.rstore = rstore try: open_time_str, self.esc_pkg_name = \ os.path.basename(trans_dir).split("_", 1) except ValueError: raise TransactionUnknownIDError(os.path.basename( trans_dir)) self.open_time = \ datetime.datetime.utcfromtimestamp(int(open_time_str)) self.pkg_name = unquote(self.esc_pkg_name) # This conversion should always work, because we encoded the # client release on the initial open of the transaction. self.fmri = fmri.PkgFmri(self.pkg_name, None) self.dir = os.path.join(rstore.trans_root, self.get_basename()) if not os.path.exists(self.dir): raise TransactionUnknownIDError(self.get_basename()) tmode = "rb" if not rstore.read_only: # The mode is important especially when dealing with # NFS because of problems with opening a file as # read/write or readonly multiple times. tmode += "+" # Find out if the package is renamed or obsolete. try: tfpath = os.path.join(self.dir, "manifest") tfile = open(tfpath, tmode) except IOError as e: if e.errno == errno.ENOENT: return raise m = pkg.manifest.Manifest() # If tfile is a StreamingFileObj obj, its read() # methods will return bytes. We need str for # manifest and here's an earlisest point that # we can convert it to str. m.set_content(content=misc.force_str(tfile.read())) tfile.close() if os.path.exists(os.path.join(self.dir, "append")): self.append_trans = True self.obsolete = m.getbool("pkg.obsolete", "false") self.renamed = m.getbool("pkg.renamed", "false") self.types_found = set(( action.name for action in m.gen_actions() )) self.has_reqdeps = any( a.attrs["type"] == "require" for a in m.gen_actions_by_type("depend") )
def parse_main_dict_line(line): """Parses one line of a main dictionary file. Changes to this function must be paired with changes to write_main_dict_line below. This should produce the same data structure that _write_main_dict_line in indexer.py creates to write out each line. """ split_chars = IndexStoreMainDict.sep_chars line = line.rstrip('\n') tmp = line.split(split_chars[0]) tok = unquote(tmp[0]) atl = tmp[1:] res = [] for ati in atl: tmp = ati.split(split_chars[1]) action_type = tmp[0] stl = tmp[1:] at_res = [] for sti in stl: tmp = sti.split(split_chars[2]) subtype = tmp[0] fvl = tmp[1:] st_res = [] for fvi in fvl: tmp = fvi.split(split_chars[3]) full_value = unquote(tmp[0]) pfl = tmp[1:] fv_res = [] for pfi in pfl: tmp = pfi.split(split_chars[4]) pfmri_index = int(tmp[0]) offsets = [ int(t) for t in tmp[1:] ] fv_res.append( (pfmri_index, offsets)) st_res.append((full_value, fv_res)) at_res.append((subtype, st_res)) res.append((action_type, at_res)) return tok, res