我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urlparse.urlunparse()。
def can_fetch(self, useragent, url): """using the parsed robots.txt decide if useragent can fetch url""" if self.disallow_all: return False if self.allow_all: return True # search for given user agent matches # the first match counts parsed_url = urlparse.urlparse(urllib.unquote(url)) url = urlparse.urlunparse(('', '', parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment)) url = urllib.quote(url) if not url: url = "/" for entry in self.entries: if entry.applies_to(useragent): return entry.allowance(url) # try the default entry last if self.default_entry: return self.default_entry.allowance(url) # agent not found ==> access granted return True
def resolveEntity(self, publicId, systemId): assert systemId is not None source = DOMInputSource() source.publicId = publicId source.systemId = systemId source.byteStream = self._get_opener().open(systemId) # determine the encoding if the transport provided it source.encoding = self._guess_media_encoding(source) # determine the base URI is we can import posixpath, urlparse parts = urlparse.urlparse(systemId) scheme, netloc, path, params, query, fragment = parts # XXX should we check the scheme here as well? if path and not path.endswith("/"): path = posixpath.dirname(path) + "/" parts = scheme, netloc, path, params, query, fragment source.baseURI = urlparse.urlunparse(parts) return source
def make_next_param(login_url, current_url): ''' Reduces the scheme and host from a given URL so it can be passed to the given `login` URL more efficiently. :param login_url: The login URL being redirected to. :type login_url: str :param current_url: The URL to reduce. :type current_url: str ''' l = urlparse(login_url) c = urlparse(current_url) if (not l.scheme or l.scheme == c.scheme) and \ (not l.netloc or l.netloc == c.netloc): return urlunparse(('', '', c.path, c.params, c.query, '')) return current_url
def getlinkinfos(self): # File reading is done in __init__() routine. Store parser in # local variable to indicate success of parsing. # If no parser was stored, fail. if not self.parser: return [] rawlinks = self.parser.getlinks() base = urlparse.urljoin(self.url, self.parser.getbase() or "") infos = [] for rawlink in rawlinks: t = urlparse.urlparse(rawlink) # DON'T DISCARD THE FRAGMENT! Instead, include # it in the tuples which are returned. See Checker.dopage(). fragment = t[-1] t = t[:-1] + ('',) rawlink = urlparse.urlunparse(t) link = urlparse.urljoin(base, rawlink) infos.append((link, rawlink, fragment)) return infos
def _BuildUrl(self, url, path_elements=None, extra_params=None): # Break url into consituent parts (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url) # Add any additional path elements to the path if path_elements: # Filter out the path elements that have a value of None p = [i for i in path_elements if i] if not path.endswith('/'): path += '/' path += '/'.join(p) # Add any additional query parameters to the query string if extra_params and len(extra_params) > 0: extra_query = self._EncodeParameters(extra_params) # Add it to the existing query if query: query += '&' + extra_query else: query = extra_query # Return the rebuilt URL return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def _add_query_parameter(url, name, value): """Adds a query parameter to a url. Replaces the current value if it already exists in the URL. Args: url: string, url to add the query parameter to. name: string, query parameter name. value: string, query parameter value. Returns: Updated query parameter. Does not update the url if value is None. """ if value is None: return url else: parsed = list(urlparse.urlparse(url)) q = dict(parse_qsl(parsed[4])) q[name] = value parsed[4] = urllib.urlencode(q) return urlparse.urlunparse(parsed)
def scrobble_show(self, show_name, season_number, episode_number, progress, scrobble_type): self.logger.info( 'Scrobbling ({scrobble_type}) {show_name} - S{season_number}E{episode_number} - {progress} to trak.tv.' .format(show_name=show_name, scrobble_type=scrobble_type, season_number=season_number.zfill(2), episode_number=episode_number.zfill(2), progress=progress)) data = {} data['show'] = {} data['show']['title'] = show_name data['episode'] = {} data['episode']['season'] = int(season_number) data['episode']['number'] = int(episode_number) data['progress'] = int(progress) data['app_version'] = '1.0' data['app_date'] = '2014-09-22' json_data = json.dumps(data) url = urlparse.urlunparse(('https', 'api-v2launch.trakt.tv', '/scrobble/' + scrobble_type, '', '', '')) try: self._do_trakt_auth_post(url, json_data) except: return False return True
def scrobble_movie(self, imdb_id, progress, scrobble_type): self.logger.info('Scrobbling ({scrobble_type}) {imdb_id} - {progress} to trak.tv.' .format(imdb_id=imdb_id, scrobble_type=scrobble_type, progress=progress)) data = {} data['movie'] = {} data['movie']['ids'] = {} data['movie']['ids']['imdb'] = imdb_id data['progress'] = int(progress) data['app_version'] = '1.0' data['app_date'] = '2014-09-22' json_data = json.dumps(data) url = urlparse.urlunparse(('https', 'api-v2launch.trakt.tv', '/scrobble/' + scrobble_type, '', '', '')) try: self._do_trakt_auth_post(url, json_data) except: return False return True
def _parse(url, defaultPort=None): url = url.strip() parsed = urlparse.urlparse(url) scheme = parsed[0] path = urlparse.urlunparse(('','')+parsed[2:]) if defaultPort is None: if scheme == 'https': defaultPort = 443 else: defaultPort = 80 host, port = parsed[1], defaultPort if ':' in host: host, port = host.split(':') port = int(port) if path == "": path = "/" return scheme, host, port, path
def process(self): parsed = urlparse.urlparse(self.uri) protocol = parsed[0] host = parsed[1] port = self.ports[protocol] if ':' in host: host, port = host.split(':') port = int(port) rest = urlparse.urlunparse(('','')+parsed[2:]) if not rest: rest = rest+'/' class_ = self.protocols[protocol] headers = self.getAllHeaders().copy() if not headers.has_key('host'): headers['host'] = host self.content.seek(0, 0) s = self.content.read() clientFactory = class_(self.method, rest, self.clientproto, headers, s, self) reactor.connectTCP(host, port, clientFactory)
def do_get(self, url, top_level=False, top_level_path=""): parts = list(urlparse.urlparse(url)) # 2 is the path offset if top_level: parts[2] = '/' + top_level_path parts[2] = MULTIPLE_SLASH.sub('/', parts[2]) url = urlparse.urlunparse(parts) try: if self.disable_ssl_validation: urllib3.disable_warnings() http = urllib3.PoolManager(cert_reqs='CERT_NONE') else: http = urllib3.PoolManager() r = http.request('GET', url, headers=self.headers) except Exception as e: LOG.error("Request on service '%s' with url '%s' failed", (self.name, url)) raise e if r.status >= 400: raise ServiceError("Request on service '%s' with url '%s' failed" " with code %d" % (self.name, url, r.status)) return r.data
def handle_redirect_to_login(request, **kwargs): login_url = kwargs.get("login_url") redirect_field_name = kwargs.get("redirect_field_name") next_url = kwargs.get("next_url") if login_url is None: login_url = settings.ACCOUNT_LOGIN_URL if next_url is None: next_url = request.get_full_path() try: login_url = urlresolvers.reverse(login_url) except urlresolvers.NoReverseMatch: if callable(login_url): raise if "/" not in login_url and "." not in login_url: raise url_bits = list(urlparse.urlparse(login_url)) if redirect_field_name: querystring = QueryDict(url_bits[4], mutable=True) querystring[redirect_field_name] = next_url url_bits[4] = querystring.urlencode(safe="/") return HttpResponseRedirect(urlparse.urlunparse(url_bits))
def url(self, value): self.__dict__['url'] = value if value is not None: scheme, netloc, path, params, query, fragment = urlparse.urlparse(value) # Exclude default port numbers. if scheme == 'http' and netloc[-3:] == ':80': netloc = netloc[:-3] elif scheme == 'https' and netloc[-4:] == ':443': netloc = netloc[:-4] if scheme not in ('http', 'https'): raise ValueError("Unsupported URL %s (%s)." % (value, scheme)) # Normalized URL excludes params, query, and fragment. self.normalized_url = urlparse.urlunparse((scheme, netloc, path, None, None, None)) else: self.normalized_url = None self.__dict__['url'] = None
def _do_put_request(self, resource, param_dict): req_url = urlparse.urlunparse(["http", self.host, "api/v%s/%s" % (self.api_version, resource), "", "", ""]) print "req_url=%s" % (req_url) opener = urllib2.build_opener(urllib2.HTTPHandler) req = urllib2.Request(req_url, data=json.dumps(param_dict)) req.add_header('Content-Type', 'application/json') req.get_method = lambda: 'PUT' try: return eval(opener.open(req).read()) except urllib2.HTTPError, err: return parse_errors(err) #--------------------------------------------- # error parsing # --------------------------------------------
def handle_redirect_to_login(request, **kwargs): login_url = kwargs.get("login_url") redirect_field_name = kwargs.get("redirect_field_name") next_url = kwargs.get("next_url") if login_url is None: login_url = settings.ACCOUNT_LOGIN_URL if next_url is None: next_url = request.get_full_path() try: login_url = urlresolvers.reverse(login_url) except urlresolvers.NoReverseMatch: if callable(login_url): raise if "/" not in login_url and "." not in login_url: raise url_bits = list(urlparse(login_url)) if redirect_field_name: querystring = QueryDict(url_bits[4], mutable=True) querystring[redirect_field_name] = next_url url_bits[4] = querystring.urlencode(safe="/") return HttpResponseRedirect(urlunparse(url_bits))
def _build_url(self, endpoint, params={}): """Return the full URL for the desired endpoint. Args: endpoint (str): the API endpoint after base URL params (dict): any params to include in the request Returns: (str) the full URL of the request """ new_params = {'circle-token': self._token} new_params.update(params) parsed_url = urlparse(self._base_url) new_parse = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc, path='/'.join((parsed_url.path, endpoint)), params='', query=urlencode(new_params), fragment='') return urlunparse(new_parse)
def __getattr__ (self,name): if name=="urlWithoutVariables": return urlunparse((self.schema,self.__host,self.__path,'','','')) elif name=="pathWithVariables": return urlunparse(('','',self.__path,'',self.__variablesGET.urlEncoded(),'')) elif name=="completeUrl": return urlunparse((self.schema,self.__host,self.__path,self.__params,self.__variablesGET.urlEncoded(),'')) elif name=="finalUrl": if self.__finalurl: return self.__finalurl return self.completeUrl elif name=="urlWithoutPath": return "%s://%s" % (self.schema,self._headers["Host"]) elif name=="path": return self.__path elif name=="postdata": if self.ContentType=="application/x-www-form-urlencoded": return self.__variablesPOST.urlEncoded() elif self.ContentType=="multipart/form-data": return self.__variablesPOST.multipartEncoded() else: return self.__uknPostData else: raise AttributeError
def _http_request(self, verb, path, body, headers): """Makes the actual HTTP request. """ url = urlparse.urlunparse((self.config.scheme, self.config.server, path, None, None, None)) LOG.debug("Request is %s:%s" % (verb, url)) LOG.debug("Request headers are %s" % headers) LOG.debug("Request body is %s" % body) conn = self._get_connection() resp, content = conn.request(url, method=verb, body=body, headers=headers) #http response code is handled else where http_status = (resp.status, resp.reason) resp_headers = dict( (k.lower(), v) for k, v in resp.iteritems() ) resp_body = content LOG.debug("Response status is %s %s" % http_status) LOG.debug("Response headers are %s" % resp_headers) LOG.debug("Response body is %s" % resp_body) return (http_status, resp_headers, resp_body)
def findTags(self): all = lambda x: 1 for elm in self.document(all, {'rel': re.compile(r'\btag\b')}): href = elm.get('href') if not href: continue urlscheme, domain, path, params, query, fragment = \ urlparse.urlparse(_urljoin(self.baseuri, href)) segments = path.split('/') tag = segments.pop() if not tag: if segments: tag = segments.pop() else: # there are no tags continue tagscheme = urlparse.urlunparse((urlscheme, domain, '/'.join(segments), '', '', '')) if not tagscheme.endswith('/'): tagscheme += '/' self.tags.append(FeedParserDict({"term": tag, "scheme": tagscheme, "label": elm.string or ''}))
def normalize_website(cls, w): from django.core.validators import EMPTY_VALUES from urlparse import urlparse, urlunparse, ParseResult w = w.decode('utf-8') if w in EMPTY_VALUES: return None w = w.lower().strip() if not w.startswith('http://') and not w.startswith('https://'): w = 'http://' + w.lstrip('/') try: parsed = urlparse(w) except ValueError as e: return None else: new_parsed = ParseResult(scheme='http', netloc=cls.get_website_tld(w), path=parsed.path.rstrip('/'), params='', query=parsed.query, fragment='') return urlunparse(new_parsed)
def _add_query_parameter(url, name, value): """Adds a query parameter to a url. Replaces the current value if it already exists in the URL. Args: url: string, url to add the query parameter to. name: string, query parameter name. value: string, query parameter value. Returns: Updated query parameter. Does not update the url if value is None. """ if value is None: return url else: parsed = list(urlparse.urlparse(url)) q = dict(urlparse.parse_qsl(parsed[4])) q[name] = value parsed[4] = urllib.urlencode(q) return urlparse.urlunparse(parsed)
def test_login(self): self.hosts = self.add_hosts([ config['lustre_servers'][0]['address'], config['lustre_servers'][1]['address']]) # Chroma puts its FQDN in the manager certificate, but the test config may # be pointing to localhost: if this is the case, substitute the FQDN in the # URL so that the client can validate the certificate. url = config['chroma_managers'][0]['server_http_url'] parsed = urlparse.urlparse(url) if parsed.hostname == 'localhost': parsed = list(parsed) parsed[1] = parsed[1].replace("localhost", socket.getfqdn()) url = urlparse.urlunparse(tuple(parsed)) example_api_client.setup_ca(url) hosts = example_api_client.list_hosts(url, config['chroma_managers'][0]['users'][0]['username'], config['chroma_managers'][0]['users'][0]['password'] ) self.assertListEqual(hosts, [h['fqdn'] for h in self.hosts])
def send_request(host, params, get="GET", display=False): url_parts = list(host) url_parts[4] = urllib.urlencode(params) url = urlparse.urlunparse(url_parts) req = urllib2.Request(url) req.add_header('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8') req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:25.0) Gecko/20100101 Firefox/25.0') try: res = urllib2.urlopen(req) code = res.code except urllib2.HTTPError, e: code = e.code if display: print "Request: %s" % (url)+"\t/\t", print "Response: %i" % code return is_blocked(code) #============================== Tampering functions =========================== # TODO: handle encoding tricks
def download(self, source, dest): """ Download an archive file. :param str source: URL pointing to an archive file. :param str dest: Local path location to download archive file to. """ # propogate all exceptions # URLError, OSError, etc proto, netloc, path, params, query, fragment = urlparse(source) if proto in ('http', 'https'): auth, barehost = splituser(netloc) if auth is not None: source = urlunparse((proto, barehost, path, params, query, fragment)) username, password = splitpasswd(auth) passman = HTTPPasswordMgrWithDefaultRealm() # Realm is set to None in add_password to force the username and password # to be used whatever the realm passman.add_password(None, source, username, password) authhandler = HTTPBasicAuthHandler(passman) opener = build_opener(authhandler) install_opener(opener) response = urlopen(source) try: with open(dest, 'wb') as dest_file: dest_file.write(response.read()) except Exception as e: if os.path.isfile(dest): os.unlink(dest) raise e # Mandatory file validation via Sha1 or MD5 hashing.
def base_url(self, url): """Return url without querystring or fragment""" parts = list(self.parse_url(url)) parts[4:] = ['' for i in parts[4:]] return urlunparse(parts)
def api_replace_host(url_text, replacement): "Replace the host portion of a URL" url = list(urlparse.urlparse(url_text)) if replacement is not None: url[1] = __host_per_rfc_2732(replacement) return urlparse.urlunparse(url)
def _remove_md5_fragment(location): if not location: return '' parsed = urlparse(location) if parsed[-1].startswith('md5='): return urlunparse(parsed[:-1] + ('',)) return location
def login_url(login_view, next_url=None, next_field='next'): ''' Creates a URL for redirecting to a login page. If only `login_view` is provided, this will just return the URL for it. If `next_url` is provided, however, this will append a ``next=URL`` parameter to the query string so that the login view can redirect back to that URL. :param login_view: The name of the login view. (Alternately, the actual URL to the login view.) :type login_view: str :param next_url: The URL to give the login view for redirection. :type next_url: str :param next_field: What field to store the next URL in. (It defaults to ``next``.) :type next_field: str ''' if login_view.startswith(('https://', 'http://', '/')): base = login_view else: base = url_for(login_view) if next_url is None: return base parts = list(urlparse(base)) md = url_decode(parts[4]) md[next_field] = make_next_param(base, next_url) parts[4] = url_encode(md, sort=True) return urlunparse(parts)
def addroot(self, root, add_to_do = 1): if root not in self.roots: troot = root scheme, netloc, path, params, query, fragment = \ urlparse.urlparse(root) i = path.rfind("/") + 1 if 0 < i < len(path): path = path[:i] troot = urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) self.roots.append(troot) self.addrobot(root) if add_to_do: self.newlink((root, ""), ("<root>", root))
def get_callback_url(self): if self.callback and self.verifier: # Append the oauth_verifier. parts = urlparse.urlparse(self.callback) scheme, netloc, path, params, query, fragment = parts[:6] if query: query = '%s&oauth_verifier=%s' % (query, self.verifier) else: query = 'oauth_verifier=%s' % self.verifier return urlparse.urlunparse((scheme, netloc, path, params, query, fragment)) return self.callback