我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urlparse.urlparse()。
def can_fetch(self, useragent, url): """using the parsed robots.txt decide if useragent can fetch url""" if self.disallow_all: return False if self.allow_all: return True # search for given user agent matches # the first match counts parsed_url = urlparse.urlparse(urllib.unquote(url)) url = urlparse.urlunparse(('', '', parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment)) url = urllib.quote(url) if not url: url = "/" for entry in self.entries: if entry.applies_to(useragent): return entry.allowance(url) # try the default entry last if self.default_entry: return self.default_entry.allowance(url) # agent not found ==> access granted return True
def GetSCAFileContents( url ): fileContents = None scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) if scheme=="sca" : queryAsDict = dict([x.split("=") for x in query.split("&")]) try: orb=CORBA.ORB_init() fileSys = orb.string_to_object(queryAsDict["fs"]) except KeyError: logging.warning("sca URI missing fs query parameter") except: logging.warning("Unable to get ORB reference") else: if fileSys == None: logging.warning("Failed to lookup file system") else: try: scaFile = fileSys.open(path, True) fileSize = scaFile.sizeOf() fileContents = scaFile.read(fileSize) scaFile.close() finally: pass return fileContents
def reduce_uri(self, uri, default_port=True): """Accept authority or URI and extract only the authority and path.""" # note HTTP URLs do not have a userinfo component parts = urlparse.urlsplit(uri) if parts[1]: # URI scheme = parts[0] authority = parts[1] path = parts[2] or '/' else: # host or host:port scheme = None authority = uri path = '/' host, port = splitport(authority) if default_port and port is None and scheme is not None: dport = {"http": 80, "https": 443, }.get(scheme) if dport is not None: authority = "%s:%d" % (host, dport) return authority, path
def resolveEntity(self, publicId, systemId): assert systemId is not None source = DOMInputSource() source.publicId = publicId source.systemId = systemId source.byteStream = self._get_opener().open(systemId) # determine the encoding if the transport provided it source.encoding = self._guess_media_encoding(source) # determine the base URI is we can import posixpath, urlparse parts = urlparse.urlparse(systemId) scheme, netloc, path, params, query, fragment = parts # XXX should we check the scheme here as well? if path and not path.endswith("/"): path = posixpath.dirname(path) + "/" parts = scheme, netloc, path, params, query, fragment source.baseURI = urlparse.urlunparse(parts) return source
def _remove_ignored_parameters(self, request): def filter_ignored_parameters(data): return [(k, v) for k, v in data if k not in self._ignored_parameters] url = urlparse(request.url) query = parse_qsl(url.query) query = filter_ignored_parameters(query) query = urlencode(query) url = urlunparse((url.scheme, url.netloc, url.path, url.params, query, url.fragment)) body = request.body content_type = request.headers.get('content-type') if body and content_type: if content_type == 'application/x-www-form-urlencoded': body = parse_qsl(body) body = filter_ignored_parameters(body) body = urlencode(body) elif content_type == 'application/json': import json body = json.loads(body) body = filter_ignored_parameters(sorted(body.items())) body = json.dumps(body) return url, body
def get(self, netloc, ua, timeout): try: headers = {'User-Agent': ua, 'Referer': netloc} result = _basic_request(netloc, headers=headers, timeout=timeout) match = re.findall('xhr\.open\("GET","([^,]+),', result) if not match: return False url_Parts = match[0].split('"') url_Parts[1] = '1680' url = urlparse.urljoin(netloc, ''.join(url_Parts)) match = re.findall('rid=([0-9a-zA-Z]+)', url_Parts[0]) if not match: return False headers['Cookie'] = 'rcksid=%s' % match[0] result = _basic_request(url, headers=headers, timeout=timeout) return self.getCookieString(result, headers['Cookie']) except: return # not very robust but lazieness...
def googlepass(url): try: try: headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1])) except: headers = None url = url.split('|')[0].replace('\\', '') url = client.request(url, headers=headers, output='geturl') if 'requiressl=yes' in url: url = url.replace('http://', 'https://') else: url = url.replace('https://', 'http://') if headers: url += '|%s' % urllib.urlencode(headers) return url except: return
def geturl(url): try: r = client.request(url, output='geturl') if r == None: return r host1 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(url.strip().lower()).netloc)[0] host2 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(r.strip().lower()).netloc)[0] if host1 == host2: return r proxies = sorted(get(), key=lambda x: random.random()) proxies = sorted(proxies, key=lambda x: random.random()) proxies = proxies[:3] for p in proxies: p += urllib.quote_plus(url) r = client.request(p, output='geturl') if not r == None: return parse(r) except: pass
def movie(self, imdb, title, localtitle, aliases, year): try: t = cleantitle.get(title) p = self.post_link % urllib.quote_plus(cleantitle.query(title)) q = urlparse.urljoin(self.base_link, self.search_link) r = proxy.request(q, 'playing top', post=p, XHR=True) r = client.parseDOM(r, 'li') r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a')) for i in r] r = [(i[0][0], i[1][0]) for i in r if i[0] and i[1]] r = [(i[0], re.findall('(.+?)\((\d{4})', i[1])) for i in r] r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]] r = [i for i in r if t == cleantitle.get(i[1]) and str(year) == i[2]] url = proxy.parse(r[0][0]) url = re.findall('(?://.+?|)(/.+)', url)[0] url = client.replaceHTMLCodes(url) url = url.encode('utf-8') return url except: pass
def searchMovie(self, title, year, aliases): try: url = '%s/%s-%s/' % (self.base_link, cleantitle.geturl(title), year) url = client.request(url, output='geturl') if url == None: t = cleantitle.get(title) q = '%s %s' % (title, year) q = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(q)) r = client.request(q) r = client.parseDOM(r, 'div', attrs={'class': 'inner'}) r = client.parseDOM(r, 'div', attrs={'class': 'info'}) r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title')) r = [(i[0], re.findall('(?:^Watch Movie |^Watch movies |^Watch |)(.+?)\((\d{4})', i[1])) for i in r] r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]] url = [i[0] for i in r if self.matchAlias(i[1], aliases) and year == i[2]][0] if url == None: raise Exception() return url except: return
def searchMovie(self, title, year, aliases, headers): try: title = cleantitle.normalize(title) url = urlparse.urljoin(self.base_link, self.search_link % cleantitle.geturl(title)) r = client.request(url, headers=headers, timeout='15') r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'}) r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title')) results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r] try: r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0] url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0] except: url = None pass if (url == None): url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0] url = urlparse.urljoin(self.base_link, '%s/watching.html' % url) return url except: return
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year): try: tvshowtitle = cleantitle.getsearch(tvshowtitle) p = urllib.urlencode({'action': 'ajaxy_sf', 'sf_value': tvshowtitle, 'search': 'false'}) r = urlparse.urljoin(self.base_link, self.search_link) result = client.request(r, post=p, XHR=True) diziler = json.loads(result)['diziler'][0]['all'] for i in diziler: t = cleantitle.get(i['post_title']) if tvshowtitle == t: url = i['post_link'] url = url.split('/')[4] url = url.encode('utf-8') return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if url == None: return tv_maze = tvmaze.tvMaze() num = tv_maze.episodeAbsoluteNumber(tvdb, int(season), int(episode)) num = str(num) url = urlparse.urljoin(self.base_link, url) r = client.request(url) r = client.parseDOM(r, 'tr', attrs = {'class': ''}) r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'td', attrs = {'class': 'epnum'})) for i in r] r = [(i[0][0], i[1][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0] r = [i[0] for i in r if num == i[1]][0] url = re.findall('(?://.+?|)(/.+)', r)[0] url = client.replaceHTMLCodes(url) url = url.encode('utf-8') return url except: return
def resolve(self, url): try: b = urlparse.urlparse(url).netloc b = re.compile('([\w]+[.][\w]+)$').findall(b)[0] if not b in base64.b64decode(self.b_link): return url u, p, h = url.split('|') r = urlparse.parse_qs(h)['Referer'][0] #u += '&app_id=Exodus' c = self.request(r, output='cookie', close=False) result = self.request(u, post=p, referer=r, cookie=c) url = result.split('url=') url = [urllib.unquote_plus(i.strip()) for i in url] url = [i for i in url if i.startswith('http')] url = url[-1] return url except: return
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year): try: query = self.search_link % (urllib.quote_plus(tvshowtitle)) query = urlparse.urljoin(self.base_link, query) result = client.request(query) result = client.parseDOM(result, 'div', attrs={'class': 'movie clearfix'}) result = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'span', attrs={'class': 'title-pl'}), client.parseDOM(i, 'span', attrs={'class': 'title-en'}), client.parseDOM(i, 'img', ret='src'), client.parseDOM(i, 'p'), client.parseDOM(i, 'p', attrs={'class': 'plot'})) for i in result ] result = [(i[0][0], u" ".join(i[1] + i[2]), re.findall('(\d{4})', i[4][0])) for i in result] result = [i for i in result if 'serial' in i[0]] result = [i for i in result if cleantitle.get(tvshowtitle) in cleantitle.get(i[1])] years = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1)] result = [i[0] for i in result if any(x in i[2] for x in years)][0] url = result return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if not url: return query = urlparse.urljoin(self.base_link, url) r = client.request(query) r = dom_parser.parse_dom(r, 'td', attrs={'data-title-name': re.compile('Season %02d' % int(season))}) r = dom_parser.parse_dom(r, 'a', req='href')[0].attrs['href'] r = client.request(urlparse.urljoin(self.base_link, r)) r = dom_parser.parse_dom(r, 'td', attrs={'data-title-name': re.compile('Episode %02d' % int(episode))}) r = dom_parser.parse_dom(r, 'a', req='href')[0].attrs['href'] return source_utils.strip_domain(r) except: return
def __search(self, search_link, imdb, titles): try: query = search_link % (urllib.quote_plus(cleantitle.query(titles[0]))) query = urlparse.urljoin(self.base_link, query) t = [cleantitle.get(i) for i in set(titles) if i] r = client.request(query) r = dom_parser.parse_dom(r, 'div', attrs={'class': 'big-list'}) r = dom_parser.parse_dom(r, 'table', attrs={'class': 'row'}) r = dom_parser.parse_dom(r, 'td', attrs={'class': 'list-name'}) r = dom_parser.parse_dom(r, 'a', req='href') r = [i.attrs['href']for i in r if i and cleantitle.get(i.content) in t][0] url = source_utils.strip_domain(r) r = client.request(urlparse.urljoin(self.base_link, url)) r = dom_parser.parse_dom(r, 'a', attrs={'href': re.compile('.*/tt\d+.*')}, req='href') r = [re.findall('.+?(tt\d+).*?', i.attrs['href']) for i in r] r = [i[0] for i in r if i] return url if imdb in r else None except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if not url: return data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) tvshowtitle = data['tvshowtitle'] localtvshowtitle = data['localtvshowtitle'] aliases = source_utils.aliases_to_array(eval(data['aliases'])) year = re.findall('(\d{4})', premiered) year = year[0] if year else data['year'] url = self.__search([localtvshowtitle] + aliases, year, season, episode) if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + aliases, year, season, episode) return url except: return
def __search_movie(self, imdb, year): try: query = urlparse.urljoin(self.base_link, self.search_link % imdb) y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0'] r = client.request(query) r = dom_parser.parse_dom(r, 'div', attrs={'class': 'container'}) r = dom_parser.parse_dom(r, 'div', attrs={'class': 'ml-item-content'}) r = [(dom_parser.parse_dom(i, 'a', attrs={'class': 'ml-image'}, req='href'), dom_parser.parse_dom(i, 'ul', attrs={'class': 'item-params'})) for i in r] r = [(i[0][0].attrs['href'], re.findall('calendar.+?>.+?(\d{4})', ''.join([x.content for x in i[1]]))) for i in r if i[0] and i[1]] r = [(i[0], i[1][0] if len(i[1]) > 0 else '0') for i in r] r = sorted(r, key=lambda i: int(i[1]), reverse=True) # with year > no year r = [i[0] for i in r if i[1] in y][0] url = urlparse.urlparse(r).path url = client.replaceHTMLCodes(url) url = url.encode('utf-8') return url except: return
def moonwalk(link, ref, season, episode): try: if season and episode: q = dict(urlparse.parse_qsl(urlparse.urlsplit(link).query)) q.update({'season': season, 'episode': episode}) q = (urllib.urlencode(q)).replace('%2C', ',') link = link.replace('?' + urlparse.urlparse(link).query, '') + '?' + q trans = __get_moonwalk_translators(link, ref) trans = trans if trans else [(link, '')] urls = [] for i in trans: urls += __get_moonwalk(i[0], ref, info=i[1]) return urls except: return []
def need_update(self): if "HTTP_PROXY" in os.environ or "HTTPS_PROXY" in os.environ: if "HTTP_PROXY" in os.environ: if sys.version_info >= (3, 0): proxy = urllib.parse.urlparse(os.environ["HTTP_PROXY"]) else: proxy = urlparse.urlparse(os.environ["HTTP_PROXY"]) else: if sys.version_info >= (3, 0): proxy = urllib.parse.urlparse(os.environ["HTTPS_PROXY"]) else: proxy = urlparse.urlparse(os.environ["HTTPS_PROXY"]) if sys.version_info >= (3, 0): conn = http.client.HTTPSConnection(proxy.hostname, proxy.port) else: conn = httplib.HTTPSConnection(proxy.hostname, proxy.port) conn.set_tunnel(self.version_host, 443) else: if sys.version_info >= (3, 0): conn = http.client.HTTPSConnection("raw.githubusercontent.com") else: conn = httplib.HTTPSConnection("raw.githubusercontent.com") conn.request("GET", self.version_url) version = conn.getresponse().read() try: if StrictVersion(version) > StrictVersion(PYJFUZZ_VERSION): self.new_version = version return True except: pass return False
def write(self): if os.path.isfile(self.lib): with open(self.lib) as f: lib_repo = Repo.fromurl(f.read().strip()) if (formaturl(lib_repo.url, 'https') == formaturl(self.url, 'https') # match URLs in common format (https) and (lib_repo.rev == self.rev # match revs, even if rev is None (valid for repos with no revisions) or (lib_repo.rev and self.rev and lib_repo.rev == self.rev[0:len(lib_repo.rev)]))): # match long and short rev formats #print self.name, 'unmodified' return ref = (formaturl(self.url, 'https').rstrip('/') + '/' + (('' if self.is_build else '#') + self.rev if self.rev else '')) action("Updating reference \"%s\" -> \"%s\"" % (relpath(cwd_root, self.path) if cwd_root != self.path else self.name, ref)) with open(self.lib, 'wb') as f: with_auth = urlparse(ref) f.write(with_auth._replace(netloc=with_auth.hostname).geturl()) f.write("\n")
def api_is_run(url): """Determine if a URL looks like a valid run URL""" # Note that this generates an extra array element because of the # leading slash. url_parts = urlparse.urlparse(url).path.split('/') if len(url_parts) != 6 \ or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]) \ or (url_parts[4] != 'runs'): return False try: uuid.UUID(url_parts[3]) uuid.UUID(url_parts[5]) except ValueError: return False return True
def __init__(self, iso_url, **kwargs): Version.__init__(self, **kwargs) if re.match(r'/', iso_url): self.m_iso_url = "file://" + iso_url self.m_iso_path = iso_url else: self.m_iso_url = iso_url self.m_iso_path = None # We can't determine the final ISO file name yet because the work # directory is not known at this point, but we can precalculate the # basename of it. self.m_iso_basename = os.path.basename( urllib.url2pathname(urlparse.urlparse(iso_url)[2])) m = re.match(r"(.*)cd.*iso|NetBSD-[0-9\._A-Z]+-(.*).iso", self.m_iso_basename) if m is None: raise RuntimeError("cannot guess architecture from ISO name '%s'" % self.m_iso_basename) if m.group(1) is not None: self.m_arch = m.group(1) if m.group(2) is not None: self.m_arch = m.group(2) check_arch_supported(self.m_arch, 'iso')
def test_validate_image_status_before_upload_unexpected_resp_v1(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' parts = urlparse(fake_url) path = parts[2] fake_image_id = path.split('/')[-1] mock_head_resp = mock.Mock() mock_head_resp.status = httplib.BAD_REQUEST mock_head_resp.read.return_value = 'fakeData' mock_head_resp.getheader.return_value = 'queued' mock_conn.getresponse.return_value = mock_head_resp self.mock_patch_object(self.glance, 'check_resp_status_and_retry') self.glance.validate_image_status_before_upload_v1( mock_conn, fake_url, extra_headers=mock.Mock()) self.assertEqual(mock_head_resp.read.call_count, 2) self.glance.check_resp_status_and_retry.assert_called_with( mock_head_resp, fake_image_id, fake_url) mock_conn.request.assert_called_once()
def check_headers(self, headers): etag = headers.get('etag') if etag is not None: if etag.startswith(('W/', 'w/')): if etag.startswith('w/'): warn(HTTPWarning('weak etag indicator should be upcase.'), stacklevel=4) etag = etag[2:] if not (etag[:1] == etag[-1:] == '"'): warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4) location = headers.get('location') if location is not None: if not urlparse(location).netloc: warn(HTTPWarning('absolute URLs required for location header'), stacklevel=4)
def make_next_param(login_url, current_url): ''' Reduces the scheme and host from a given URL so it can be passed to the given `login` URL more efficiently. :param login_url: The login URL being redirected to. :type login_url: str :param current_url: The URL to reduce. :type current_url: str ''' l = urlparse(login_url) c = urlparse(current_url) if (not l.scheme or l.scheme == c.scheme) and \ (not l.netloc or l.netloc == c.netloc): return urlunparse(('', '', c.path, c.params, c.query, '')) return current_url
def _BuildUrl(self, url, path_elements=None, extra_params=None): # Break url into consituent parts (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url) # Add any additional path elements to the path if path_elements: # Filter out the path elements that have a value of None p = [i for i in path_elements if i] if not path.endswith('/'): path += '/' path += '/'.join(p) # Add any additional query parameters to the query string if extra_params and len(extra_params) > 0: extra_query = self._EncodeParameters(extra_params) # Add it to the existing query if query: query += '&' + extra_query else: query = extra_query # Return the rebuilt URL return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
def get_netloc(url): """Return the netloc from a URL. If the input value is not a value URL the method will raise an Ansible filter exception. :param url: the URL to parse :type url: ``str`` :returns: ``str`` """ try: netloc = urlparse(url).netloc except Exception as exp: raise errors.AnsibleFilterError( 'Failed to return the netloc of: "%s"' % str(exp) ) else: return netloc
def get_netorigin(url): """Return the netloc from a URL. If the input value is not a value URL the method will raise an Ansible filter exception. :param url: the URL to parse :type url: ``str`` :returns: ``str`` """ try: parsed_url = urlparse(url) netloc = parsed_url.netloc scheme = parsed_url.scheme except Exception as exp: raise errors.AnsibleFilterError( 'Failed to return the netorigin of: "%s"' % str(exp) ) else: return '%s://%s' % (scheme, netloc)
def check_remote(url): # TODO need a better solution o = urlparse.urlparse(url) host = o.netloc while "@" in host: host = host[host.find("@")+1:] while ":" in host: host = host[:host.find(":")] cmd = list() cmd.append("ping") if platform.system().lower().startswith("win"): cmd.append("-n") cmd.append("1") cmd.append("-w") cmd.append("1000") else: cmd.append("-c1") cmd.append("-t1") cmd.append(host) p = Popen(" ".join(cmd), stdout=PIPE, stderr=PIPE, shell=True) out, err = p.communicate() return len(err) == 0
def configure(self): opts = self.options use_cfg = opts.use_config if use_cfg is None: return url = urlparse(opts.use_config_dir) kwargs = {} if url.scheme: kwargs['download'] = True kwargs['remote_url'] = url.geturl() # search first with the exact url, else try with +'/wafcfg' kwargs['remote_locs'] = ['', DEFAULT_DIR] tooldir = url.geturl() + ' ' + DEFAULT_DIR for cfg in use_cfg.split(','): Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg) self.load(cfg, tooldir=tooldir, **kwargs) self.start_msg('Checking for configuration') self.end_msg(use_cfg)
def downloadVideo(self): url = unicode(self.tabWidget.currentWidget().url().toString()) # For youtube videos if validYoutubeUrl(url): vid_id = parse_qs(urlparse(url).query)['v'][0] url = 'https://m.youtube.com/watch?v=' + vid_id yt = YouTube(url) # Use PyTube module for restricted videos videos = yt.get_videos() dialog = youtube_dialog.YoutubeDialog(videos, self) if dialog.exec_() == 1 : index = abs(dialog.buttonGroup.checkedId())-2 vid = videos[index] reply = networkmanager.get( QNetworkRequest(QUrl.fromUserInput(vid.url)) ) self.handleUnsupportedContent(reply, vid.filename + '.' + vid.extension) return # For embeded HTML5 videos request = QNetworkRequest(self.video_URL) request.setRawHeader('Referer', self.video_page_url) reply = networkmanager.get(request) self.handleUnsupportedContent(reply)
def _add_query_parameter(url, name, value): """Adds a query parameter to a url. Replaces the current value if it already exists in the URL. Args: url: string, url to add the query parameter to. name: string, query parameter name. value: string, query parameter value. Returns: Updated query parameter. Does not update the url if value is None. """ if value is None: return url else: parsed = list(urlparse.urlparse(url)) q = dict(parse_qsl(parsed[4])) q[name] = value parsed[4] = urllib.urlencode(q) return urlparse.urlunparse(parsed)
def neutron_settings(): neutron_settings = {} if is_relation_made('neutron-api', 'neutron-plugin'): neutron_api_info = NeutronAPIContext()() neutron_settings.update({ # XXX: Rename these relations settings? 'quantum_plugin': neutron_api_info['neutron_plugin'], 'region': config('region'), 'quantum_security_groups': neutron_api_info['neutron_security_groups'], 'quantum_url': neutron_api_info['neutron_url'], }) neutron_url = urlparse(neutron_settings['quantum_url']) neutron_settings['quantum_host'] = neutron_url.hostname neutron_settings['quantum_port'] = neutron_url.port return neutron_settings
def _should_use_proxy(url, no_proxy=None): """Determines whether a proxy should be used to open a connection to the specified URL, based on the value of the no_proxy environment variable. @param url: URL @type url: basestring or urllib2.Request """ if no_proxy is None: no_proxy_effective = os.environ.get('no_proxy', '') else: no_proxy_effective = no_proxy urlObj = urlparse_.urlparse(_url_as_string(url)) for np in [h.strip() for h in no_proxy_effective.split(',')]: if urlObj.hostname == np: return False return True
def get_query_tag_value(path, query_tag): """This is a utility method to query for specific the http parameters in the uri. Returns the value of the parameter, or None if not found.""" data = { } parsed_path = urlparse(path) query_tokens = parsed_path.query.split('&') # find the 'ids' query, there can only be one for tok in query_tokens: query_tok = tok.split('=') query_key = query_tok[0] if query_key is not None and query_key == query_tag: # ids tag contains a comma delimited list of ids data[query_tag] = query_tok[1] break return data.get(query_tag,None) # sign a message with revocation key. telling of verification problem
def get_query_tag_value(self, path, query_tag): """This is a utility method to query for specific the http parameters in the uri. Returns the value of the parameter, or None if not found.""" data = { } parsed_path = urlparse(self.path) query_tokens = parsed_path.query.split('&') # find the 'ids' query, there can only be one for tok in query_tokens: query_tok = tok.split('=') query_key = query_tok[0] if query_key is not None and query_key == query_tag: # ids tag contains a comma delimited list of ids data[query_tag] = query_tok[1] break return data.get(query_tag,None)
def get_restful_params(urlstring): """Returns a dictionary of paired RESTful URI parameters""" parsed_path = urlparse(urlstring.strip("/")) tokens = parsed_path.path.split('/') # Be sure we at least have /v#/opt if len(tokens) < 2: return None # Be sure first token is API version if len(tokens[0]) == 2 and tokens[0][0] == 'v': params = list_to_dict(tokens[1:]) params["api_version"] = tokens[0][1] return params else: return None
def file_to_url(self, file_rel_path): """Convert a relative file path to a file URL.""" _abs_path = os.path.abspath(file_rel_path) return urlparse.urlparse(_abs_path, scheme='file').geturl()
def download(self, source, dest): """ Download an archive file. :param str source: URL pointing to an archive file. :param str dest: Local path location to download archive file to. """ # propogate all exceptions # URLError, OSError, etc proto, netloc, path, params, query, fragment = urlparse(source) if proto in ('http', 'https'): auth, barehost = splituser(netloc) if auth is not None: source = urlunparse((proto, barehost, path, params, query, fragment)) username, password = splitpasswd(auth) passman = HTTPPasswordMgrWithDefaultRealm() # Realm is set to None in add_password to force the username and password # to be used whatever the realm passman.add_password(None, source, username, password) authhandler = HTTPBasicAuthHandler(passman) opener = build_opener(authhandler) install_opener(opener) response = urlopen(source) try: with open(dest, 'wb') as dest_file: dest_file.write(response.read()) except Exception as e: if os.path.isfile(dest): os.unlink(dest) raise e # Mandatory file validation via Sha1 or MD5 hashing.
def parse_url(self, url): return urlparse(url)
def load_logging_config_uri(orb, uri, binding=None): scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri) if scheme == "file": ossie.utils.log4py.config.fileConfig(path, binding) elif scheme == "sca": q = dict([x.split("=") for x in query.split("&")]) try: fileSys = orb.string_to_object(q["fs"]) except KeyError: logging.warning("sca URI missing fs query parameter") else: if fileSys == None: logging.warning("Failed to lookup file system") else: try: t = tempfile.mktemp() tf = open(t, "w+") scaFile = fileSys.open(path, True) fileSize = scaFile.sizeOf() buf = scaFile.read(fileSize) tf.write(buf) tf.close() scaFile.close() ossie.utils.log4py.config.fileConfig(t) finally: os.remove(t) else: # Invalid scheme logging.warning("Invalid logging config URI scheme")
def GetConfigFileContents( url ): fc=None scheme, netloc, path, params, query, fragment = urlparse.urlparse(url) if scheme == "file": try: f = open(path,'r') fc="" for line in f: fc += line f.close() except: fc=None elif scheme == "sca": fc=GetSCAFileContents(url) elif scheme == "http": fc=GetHTTPFileContents(url) elif scheme == "str": ## RESOLVE if path.startswith("/"): fc=path[1:] else: fc=path pass else: # Invalid scheme logging.warning("Invalid logging config URI scheme") return fc
def authenticate_keystone_user(self, keystone, user, password, tenant): """Authenticates a regular user with the keystone public endpoint.""" self.log.debug('Authenticating keystone user ({})...'.format(user)) ep = keystone.service_catalog.url_for(service_type='identity', interface='publicURL') keystone_ip = urlparse.urlparse(ep).hostname return self.authenticate_keystone(keystone_ip, user, password, project_name=tenant)