我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用urlparse.unquote()。
def do_request(self, url, params=None, timeout=None): """Performs the HTTP request, signed with OAuth. :param timeout: optional request timeout, in seconds. :type timeout: float @return: the response content """ req = self.session.post(url, data=params, auth=self.oauth, timeout=timeout or self.default_timeout) # check the response headers / status code. if req.status_code != 200: self.log.error('do_request: Status code %i received, content:', req.status_code) for part in req.text.split('&'): self.log.error(' %s', urllib_parse.unquote(part)) raise exceptions.FlickrError('do_request: Status code %s received' % req.status_code) return req.content
def api_index(request): if check_if_valid_token(request): try: token = urlparse.unquote(request.META['HTTP_AUTHORIZATION']) user_id = extrapolate_user(token) user = User.objects.get(user_id=user_id) if user.is_admin: data = serializers.serialize("json", User.objects.all()) return HttpResponse(data, content_type='application/json') else: data = serializers.serialize("json", User.objects.filter(user_id=user_id)) return HttpResponse(data, content_type='application/json') except User.DoesNotExist: return HttpResponse("null", content_type='application/json') else: return HttpResponse('Unauthorized', status=401)
def check_if_valid_token(request): if 'HTTP_AUTHORIZATION' not in request.META: return False else: token = urlparse.unquote(request.META['HTTP_AUTHORIZATION']) regex = re.compile("(.*?)-(.*)") split_token = token.split('=')[1] regex_groups = regex.search(split_token) if regex_groups.group(1): id = regex_groups.group(1) else: return False if regex_groups.group(2): hash = regex_groups.group(2) else: return False sha = SHA.new() sha.update(ACCESS_TOKEN_SALT + ":" + str(id)) return hash == sha.hexdigest()
def mkfn (text): text = unquote (text) return NAFN.sub ("_", text)
def identify_and_tag_DOI(line): """takes a single citation line and attempts to locate any DOI references. DOI references are recognised in both http (url) format and also the standard DOI notation (DOI: ...) @param line: (string) the reference line in which to search for DOI's. @return: the tagged line and a list of DOI strings (if any) """ # Used to hold the DOI strings in the citation line doi_strings = [] # Run the DOI pattern on the line, returning the re.match objects matched_doi = re_doi.finditer(line) # For each match found in the line for match in reversed(list(matched_doi)): # Store the start and end position start = match.start() end = match.end() # Get the actual DOI string (remove the url part of the doi string) doi_phrase = match.group('doi') if '%2f' in doi_phrase.lower(): doi_phrase = unquote(doi_phrase) # Replace the entire matched doi with a tag line = line[0:start] + "<cds.DOI />" + line[end:] # Add the single DOI string to the list of DOI strings doi_strings.append(doi_phrase) doi_strings.reverse() return line, doi_strings
def CgiDictFromParsedUrl(url): """Extract CGI variables from a parsed url into a dict. Returns a dict containing the following CGI variables for the provided url: SERVER_PORT, QUERY_STRING, SERVER_NAME and PATH_INFO. Args: url: An instance of urlparse.SplitResult. Returns: A dict containing the CGI variables derived from url. """ environ = {} if url.port is not None: environ['SERVER_PORT'] = str(url.port) elif url.scheme == 'https': environ['SERVER_PORT'] = '443' elif url.scheme == 'http': environ['SERVER_PORT'] = '80' environ['QUERY_STRING'] = url.query environ['SERVER_NAME'] = url.hostname if url.path: environ['PATH_INFO'] = urlparse.unquote(url.path) else: environ['PATH_INFO'] = '/' return environ
def unquote_utf8(qs): if isinstance(qs, text_type): qs = qs.encode('utf-8') s = unquote(qs) if isinstance(s, byte_type): return s.decode("utf-8") else: return s
def parse_url(url): scheme = urlparse.urlparse(url).scheme schemeless = url[len(scheme) + 3:] # parse with HTTP URL semantics parts = urlparse.urlparse('http://' + schemeless) path = parts.path or '' path = path[1:] if path and path[0] == '/' else path return dict( transport=scheme, host=urlparse.unquote(parts.hostname or '') or None, port=parts.port or amqppy.DEFAULT_PORT, username=urlparse.unquote(parts.username or '') or None, password=urlparse.unquote(parts.password or '') or None, virtual_host=urlparse.unquote(path or '') or "/", **dict(urlparse.parse_qsl(parts.query)))
def PLAY_SOURCE(payload): return control.play_source(urlparse.unquote(payload))
def api(request, id_number): if check_if_valid_token(request): token = urlparse.unquote(request.META['HTTP_AUTHORIZATION']) user_id = extrapolate_user(token) data = serializers.serialize("json", User.objects.filter(user_id=user_id)) return HttpResponse(data, content_type='application/json') else: return HttpResponse('Unauthorized', status=401) # This is purposely vulnerable see - https://github.com/OWASP/railsgoat/wiki/Extras:-Broken-Regular-Expression
def db_url_parse(url, engine=None, conn_max_age=0): """ Parses a database URL. """ if url == "sqlite://:memory:": # urlparse will choke on :memory: return { "ENGINE": DATABASE_ENGINE_SCHEMES["sqlite"], "NAME": ":memory:", } config = {} url = urlparse.urlparse(url) # split query strings from path path = url.path[1:] if "?" in path and not url.query: path, query = path.split("?", 2) else: path, query = path, url.query query = urlparse.parse_qs(query) # sqlite with no path should assume :memory: (sqlalchemy behavior) if url.scheme == "sqlite" and path == "": path = ":memory:" # handle postgresql percent-encoded paths hostname = url.hostname or "" if "%2f" in hostname.lower(): hostname = hostname.replace("%2f", "/").replace("%2F", "/") config.update({ "NAME": urlparse.unquote(path or ""), "USER": urlparse.unquote(url.username or ""), "PASSWORD": urlparse.unquote(url.password or ""), "HOST": hostname, "PORT": url.port or "", "CONN_MAX_AGE": conn_max_age, }) engine = DATABASE_ENGINE_SCHEMES[url.scheme] if engine is None else engine # pass the query string into OPTIONS options = {} for key, values in query.items(): if url.scheme == "mysql" and key == "ssl-ca": options["ssl"] = {"ca": values[-1]} continue options[key] = values[-1] # postgresql schema URLs if "currentSchema" in options and engine == "django.db.backends.postgresql_psycopg2": options["options"] = "-c search_path={0}".format(options["currentSchema"]) if options: config["OPTIONS"] = options if engine: config["ENGINE"] = engine return config
def get(self, path="/", *args, **kwargs): self.response.headers['Content-Type'] = 'text/plain' # Prevent Hash-collision attacks assert len(self.request.arguments()) < 50 # Fill the (surprisingly empty) kwargs dict with named request params tmpArgs = dict((k, self.request.get_all(k)) for k in self.request.arguments()) for key in tmpArgs.keys()[:]: if len(tmpArgs[key]) == 0: continue if not key in kwargs.keys(): if len(tmpArgs[key]) == 1: kwargs[key] = tmpArgs[key][0] else: kwargs[key] = tmpArgs[key] else: if isinstance(kwargs[key], list): kwargs[key] = kwargs[key] + tmpArgs[key] else: kwargs[key] = [kwargs[key]] + tmpArgs[key] del tmpArgs if "self" in kwargs.keys(): # self is reserved for bound methods raise NotImplementedError() path = urlparse.urlparse(path).path pathlist = [urlparse.unquote(x) for x in path.strip("/").split("/")] if len(pathlist) < 2: raise NotImplementedError() tfunc = pathlist[1] pathlist = pathlist[2:] if tfunc == "exportDb": self.response.write(self.exportDb(*pathlist, **kwargs)) elif tfunc == "exportBlob": self.response.write(self.exportBlob(*pathlist, **kwargs)) elif tfunc == "download": self.response.write(self.download(*pathlist, **kwargs)) elif tfunc == "info": self.response.write(self.info(*pathlist, **kwargs)) elif tfunc == "listCursors": self.response.write(self.listCursors(*pathlist, **kwargs)) elif tfunc == "listKinds": self.response.write(self.listKinds(*pathlist, **kwargs)) elif tfunc == "_ah": pass else: raise NotImplementedError()
def do_upload(self, filename, url, params=None, fileobj=None, timeout=None): """Performs a file upload to the given URL with the given parameters, signed with OAuth. :param timeout: optional request timeout, in seconds. :type timeout: float @return: the response content """ # work-around to allow non-ascii characters in file name # Flickr doesn't store the name but does use it as a default title if 'title' not in params: params['title'] = os.path.basename(filename).encode('utf8') # work-around for Flickr expecting 'photo' to be excluded # from the oauth signature: # 1. create a dummy request without 'photo' # 2. create real request and use auth headers from the dummy one dummy_req = requests.Request('POST', url, data=params, auth=self.oauth) prepared = dummy_req.prepare() headers = prepared.headers self.log.debug('do_upload: prepared headers = %s', headers) if not fileobj: fileobj = open(filename, 'rb') params['photo'] = ('dummy name', fileobj) m = MultipartEncoder(fields=params) auth = {'Authorization': headers.get('Authorization'), 'Content-Type': m.content_type} self.log.debug('POST %s', auth) req = self.session.post(url, data=m, headers=auth, timeout=timeout or self.default_timeout) # check the response headers / status code. if req.status_code != 200: self.log.error('do_upload: Status code %i received, content:', req.status_code) for part in req.text.split('&'): self.log.error(' %s', urllib_parse.unquote(part)) raise exceptions.FlickrError('do_upload: Status code %s received' % req.status_code) return req.content
def selectLanguage( self, path ): """ Tries to select the best language for the current request. """ if translations is None: # This project doesn't use the multi-language feature, nothing to do here return( path ) if conf["viur.languageMethod"] == "session": # We store the language inside the session, try to load it from there if not session.current.getLanguage(): if "X-Appengine-Country" in self.request.headers.keys(): lng = self.request.headers["X-Appengine-Country"].lower() if lng in conf["viur.availableLanguages"]+list( conf["viur.languageAliasMap"].keys() ): session.current.setLanguage( lng ) self.language = lng else: session.current.setLanguage( conf["viur.defaultLanguage"] ) else: self.language = session.current.getLanguage() elif conf["viur.languageMethod"] == "domain": host = self.request.host_url.lower() host = host[ host.find("://")+3: ].strip(" /") #strip http(s):// if host.startswith("www."): host = host[ 4: ] if host in conf["viur.domainLanguageMapping"].keys(): self.language = conf["viur.domainLanguageMapping"][ host ] else: # We have no language configured for this domain, try to read it from session if session.current.getLanguage(): self.language = session.current.getLanguage() elif conf["viur.languageMethod"] == "url": tmppath = urlparse.urlparse( path ).path tmppath = [ urlparse.unquote( x ) for x in tmppath.lower().strip("/").split("/") ] if len( tmppath )>0 and tmppath[0] in conf["viur.availableLanguages"]+list( conf["viur.languageAliasMap"].keys() ): self.language = tmppath[0] return( path[ len( tmppath[0])+1: ] ) #Return the path stripped by its language segment else: # This URL doesnt contain an language prefix, try to read it from session if session.current.getLanguage(): self.language = session.current.getLanguage() elif "X-Appengine-Country" in self.request.headers.keys(): lng = self.request.headers["X-Appengine-Country"].lower() if lng in conf["viur.availableLanguages"] or lng in conf["viur.languageAliasMap"]: self.language = lng return( path )