Python urlparse 模块,urlparse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urlparse.urlparse()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def can_fetch(self, useragent, url):
        """using the parsed robots.txt decide if useragent can fetch url"""
        if self.disallow_all:
            return False
        if self.allow_all:
            return True
        # search for given user agent matches
        # the first match counts
        parsed_url = urlparse.urlparse(urllib.unquote(url))
        url = urlparse.urlunparse(('', '', parsed_url.path,
            parsed_url.params, parsed_url.query, parsed_url.fragment))
        url = urllib.quote(url)
        if not url:
            url = "/"
        for entry in self.entries:
            if entry.applies_to(useragent):
                return entry.allowance(url)
        # try the default entry last
        if self.default_entry:
            return self.default_entry.allowance(url)
        # agent not found ==> access granted
        return True
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def GetSCAFileContents( url ):
    fileContents = None
    scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
    if scheme=="sca" :
       queryAsDict = dict([x.split("=") for x in query.split("&")])
       try:
           orb=CORBA.ORB_init()
           fileSys = orb.string_to_object(queryAsDict["fs"])
       except KeyError:
            logging.warning("sca URI missing fs query parameter")
       except:
           logging.warning("Unable to get ORB reference")
       else:
            if fileSys == None:
                logging.warning("Failed to lookup file system")
            else:
                try:
                    scaFile = fileSys.open(path, True)
                    fileSize = scaFile.sizeOf()
                    fileContents = scaFile.read(fileSize)
                    scaFile.close()
                finally:
                    pass
    return fileContents
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def resolveEntity(self, publicId, systemId):
        assert systemId is not None
        source = DOMInputSource()
        source.publicId = publicId
        source.systemId = systemId
        source.byteStream = self._get_opener().open(systemId)

        # determine the encoding if the transport provided it
        source.encoding = self._guess_media_encoding(source)

        # determine the base URI is we can
        import posixpath, urlparse
        parts = urlparse.urlparse(systemId)
        scheme, netloc, path, params, query, fragment = parts
        # XXX should we check the scheme here as well?
        if path and not path.endswith("/"):
            path = posixpath.dirname(path) + "/"
            parts = scheme, netloc, path, params, query, fragment
            source.baseURI = urlparse.urlunparse(parts)

        return source
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def _remove_ignored_parameters(self, request):

        def filter_ignored_parameters(data):
            return [(k, v) for k, v in data if k not in self._ignored_parameters]

        url = urlparse(request.url)
        query = parse_qsl(url.query)
        query = filter_ignored_parameters(query)
        query = urlencode(query)
        url = urlunparse((url.scheme, url.netloc, url.path, url.params, query, url.fragment))
        body = request.body
        content_type = request.headers.get('content-type')
        if body and content_type:
            if content_type == 'application/x-www-form-urlencoded':
                body = parse_qsl(body)
                body = filter_ignored_parameters(body)
                body = urlencode(body)
            elif content_type == 'application/json':
                import json
                body = json.loads(body)
                body = filter_ignored_parameters(sorted(body.items()))
                body = json.dumps(body)
        return url, body
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def get(self, netloc, ua, timeout):
        try:
            headers = {'User-Agent': ua, 'Referer': netloc}
            result = _basic_request(netloc, headers=headers, timeout=timeout)

            match = re.findall('xhr\.open\("GET","([^,]+),', result)
            if not match:
                return False

            url_Parts = match[0].split('"')
            url_Parts[1] = '1680'
            url = urlparse.urljoin(netloc, ''.join(url_Parts))

            match = re.findall('rid=([0-9a-zA-Z]+)', url_Parts[0])
            if not match:
                return False

            headers['Cookie'] = 'rcksid=%s' % match[0]
            result = _basic_request(url, headers=headers, timeout=timeout)
            return self.getCookieString(result, headers['Cookie'])
        except:
            return

    # not very robust but lazieness...
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def googlepass(url):
    try:
        try:
            headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1]))
        except:
            headers = None
        url = url.split('|')[0].replace('\\', '')
        url = client.request(url, headers=headers, output='geturl')
        if 'requiressl=yes' in url:
            url = url.replace('http://', 'https://')
        else:
            url = url.replace('https://', 'http://')
        if headers: url += '|%s' % urllib.urlencode(headers)
        return url
    except:
        return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def geturl(url):
    try:
        r = client.request(url, output='geturl')
        if r == None: return r

        host1 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(url.strip().lower()).netloc)[0]
        host2 = re.findall('([\w]+)[.][\w]+$', urlparse.urlparse(r.strip().lower()).netloc)[0]
        if host1 == host2: return r

        proxies = sorted(get(), key=lambda x: random.random())
        proxies = sorted(proxies, key=lambda x: random.random())
        proxies = proxies[:3]

        for p in proxies:
            p += urllib.quote_plus(url)
            r = client.request(p, output='geturl')
            if not r == None: return parse(r)

    except:
        pass
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def movie(self, imdb, title, localtitle, aliases, year):
        try:
            t = cleantitle.get(title)

            p = self.post_link % urllib.quote_plus(cleantitle.query(title))
            q = urlparse.urljoin(self.base_link, self.search_link)

            r = proxy.request(q, 'playing top', post=p, XHR=True)

            r = client.parseDOM(r, 'li')
            r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a')) for i in r]
            r = [(i[0][0], i[1][0]) for i in r if i[0] and i[1]]
            r = [(i[0], re.findall('(.+?)\((\d{4})', i[1])) for i in r]
            r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]]
            r = [i for i in r if t == cleantitle.get(i[1]) and str(year) == i[2]]

            url = proxy.parse(r[0][0])

            url = re.findall('(?://.+?|)(/.+)', url)[0]
            url = client.replaceHTMLCodes(url)
            url = url.encode('utf-8')
            return url
        except:
            pass
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def searchMovie(self, title, year, aliases):
        try:
            url = '%s/%s-%s/' % (self.base_link, cleantitle.geturl(title), year)
            url = client.request(url, output='geturl')

            if url == None:
                t = cleantitle.get(title)
                q = '%s %s' % (title, year)
                q = urlparse.urljoin(self.base_link, self.search_link % urllib.quote_plus(q))
                r = client.request(q)
                r = client.parseDOM(r, 'div', attrs={'class': 'inner'})
                r = client.parseDOM(r, 'div', attrs={'class': 'info'})
                r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
                r = [(i[0], re.findall('(?:^Watch Movie |^Watch movies |^Watch |)(.+?)\((\d{4})', i[1])) for i in r]
                r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if i[1]]
                url = [i[0] for i in r if self.matchAlias(i[1], aliases) and year == i[2]][0]

            if url == None: raise Exception()
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def searchMovie(self, title, year, aliases, headers):
        try:
            title = cleantitle.normalize(title)
            url = urlparse.urljoin(self.base_link, self.search_link % cleantitle.geturl(title))
            r = client.request(url, headers=headers, timeout='15')
            r = client.parseDOM(r, 'div', attrs={'class': 'ml-item'})
            r = zip(client.parseDOM(r, 'a', ret='href'), client.parseDOM(r, 'a', ret='title'))
            results = [(i[0], i[1], re.findall('\((\d{4})', i[1])) for i in r]
            try:
                r = [(i[0], i[1], i[2][0]) for i in results if len(i[2]) > 0]
                url = [i[0] for i in r if self.matchAlias(i[1], aliases) and (year == i[2])][0]
            except:
                url = None
                pass

            if (url == None):
                url = [i[0] for i in results if self.matchAlias(i[1], aliases)][0]

            url = urlparse.urljoin(self.base_link, '%s/watching.html' % url)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        try:
            tvshowtitle = cleantitle.getsearch(tvshowtitle)
            p = urllib.urlencode({'action': 'ajaxy_sf', 'sf_value': tvshowtitle, 'search': 'false'})
            r = urlparse.urljoin(self.base_link, self.search_link)
            result = client.request(r, post=p, XHR=True)
            diziler = json.loads(result)['diziler'][0]['all']
            for i in diziler:
                t = cleantitle.get(i['post_title'])
                if tvshowtitle == t:
                    url = i['post_link']
                    url = url.split('/')[4]
                    url = url.encode('utf-8')
                    return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if url == None: return

            tv_maze = tvmaze.tvMaze()
            num = tv_maze.episodeAbsoluteNumber(tvdb, int(season), int(episode))
            num = str(num)

            url = urlparse.urljoin(self.base_link, url)

            r = client.request(url)

            r = client.parseDOM(r, 'tr', attrs = {'class': ''})
            r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'td', attrs = {'class': 'epnum'})) for i in r]
            r = [(i[0][0], i[1][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0]
            r = [i[0] for i in r if num == i[1]][0]

            url = re.findall('(?://.+?|)(/.+)', r)[0]
            url = client.replaceHTMLCodes(url)
            url = url.encode('utf-8')
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def resolve(self, url):
        try:
            b = urlparse.urlparse(url).netloc
            b = re.compile('([\w]+[.][\w]+)$').findall(b)[0]

            if not b in base64.b64decode(self.b_link): return url

            u, p, h = url.split('|')
            r = urlparse.parse_qs(h)['Referer'][0]
            #u += '&app_id=Exodus'

            c = self.request(r, output='cookie', close=False)
            result = self.request(u, post=p, referer=r, cookie=c)

            url = result.split('url=')
            url = [urllib.unquote_plus(i.strip()) for i in url]
            url = [i for i in url if i.startswith('http')]
            url = url[-1]

            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def tvshow(self, imdb, tvdb, tvshowtitle, localtvshowtitle, aliases, year):
        try:
            query = self.search_link % (urllib.quote_plus(tvshowtitle))
            query = urlparse.urljoin(self.base_link, query)
            result = client.request(query)
            result = client.parseDOM(result, 'div', attrs={'class': 'movie clearfix'})
            result = [(client.parseDOM(i, 'a', ret='href'),
                  client.parseDOM(i, 'span', attrs={'class': 'title-pl'}),
                  client.parseDOM(i, 'span', attrs={'class': 'title-en'}),
                  client.parseDOM(i, 'img', ret='src'),
                  client.parseDOM(i, 'p'),
                  client.parseDOM(i, 'p', attrs={'class': 'plot'})) for i in result ]

            result = [(i[0][0], u" ".join(i[1] + i[2]), re.findall('(\d{4})', i[4][0])) for i in result]
            result = [i for i in result if 'serial' in i[0]]
            result = [i for i in result if cleantitle.get(tvshowtitle) in cleantitle.get(i[1])]
            years = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1)]
            result = [i[0] for i in result if any(x in i[2] for x in years)][0]
            url = result
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            query = urlparse.urljoin(self.base_link, url)

            r = client.request(query)
            r = dom_parser.parse_dom(r, 'td', attrs={'data-title-name': re.compile('Season %02d' % int(season))})
            r = dom_parser.parse_dom(r, 'a', req='href')[0].attrs['href']
            r = client.request(urlparse.urljoin(self.base_link, r))
            r = dom_parser.parse_dom(r, 'td', attrs={'data-title-name': re.compile('Episode %02d' % int(episode))})
            r = dom_parser.parse_dom(r, 'a', req='href')[0].attrs['href']

            return source_utils.strip_domain(r)
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def __search(self, search_link, imdb, titles):
        try:
            query = search_link % (urllib.quote_plus(cleantitle.query(titles[0])))
            query = urlparse.urljoin(self.base_link, query)

            t = [cleantitle.get(i) for i in set(titles) if i]

            r = client.request(query)

            r = dom_parser.parse_dom(r, 'div', attrs={'class': 'big-list'})
            r = dom_parser.parse_dom(r, 'table', attrs={'class': 'row'})
            r = dom_parser.parse_dom(r, 'td', attrs={'class': 'list-name'})
            r = dom_parser.parse_dom(r, 'a', req='href')
            r = [i.attrs['href']for i in r if i and cleantitle.get(i.content) in t][0]

            url = source_utils.strip_domain(r)

            r = client.request(urlparse.urljoin(self.base_link, url))
            r = dom_parser.parse_dom(r, 'a', attrs={'href': re.compile('.*/tt\d+.*')}, req='href')
            r = [re.findall('.+?(tt\d+).*?', i.attrs['href']) for i in r]
            r = [i[0] for i in r if i]

            return url if imdb in r else None
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def episode(self, url, imdb, tvdb, title, premiered, season, episode):
        try:
            if not url:
                return

            data = urlparse.parse_qs(url)
            data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data])
            tvshowtitle = data['tvshowtitle']
            localtvshowtitle = data['localtvshowtitle']
            aliases = source_utils.aliases_to_array(eval(data['aliases']))

            year = re.findall('(\d{4})', premiered)
            year = year[0] if year else data['year']

            url = self.__search([localtvshowtitle] + aliases, year, season, episode)
            if not url and tvshowtitle != localtvshowtitle:
                url = self.__search([tvshowtitle] + aliases, year, season, episode)
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def __search_movie(self, imdb, year):
        try:
            query = urlparse.urljoin(self.base_link, self.search_link % imdb)

            y = ['%s' % str(year), '%s' % str(int(year) + 1), '%s' % str(int(year) - 1), '0']

            r = client.request(query)

            r = dom_parser.parse_dom(r, 'div', attrs={'class': 'container'})
            r = dom_parser.parse_dom(r, 'div', attrs={'class': 'ml-item-content'})
            r = [(dom_parser.parse_dom(i, 'a', attrs={'class': 'ml-image'}, req='href'), dom_parser.parse_dom(i, 'ul', attrs={'class': 'item-params'})) for i in r]
            r = [(i[0][0].attrs['href'], re.findall('calendar.+?>.+?(\d{4})', ''.join([x.content for x in i[1]]))) for i in r if i[0] and i[1]]
            r = [(i[0], i[1][0] if len(i[1]) > 0 else '0') for i in r]
            r = sorted(r, key=lambda i: int(i[1]), reverse=True)  # with year > no year
            r = [i[0] for i in r if i[1] in y][0]

            url = urlparse.urlparse(r).path
            url = client.replaceHTMLCodes(url)
            url = url.encode('utf-8')
            return url
        except:
            return
项目:plugin.video.exodus    作者:lastship    | 项目源码 | 文件源码
def moonwalk(link, ref, season, episode):
    try:
        if season and episode:
            q = dict(urlparse.parse_qsl(urlparse.urlsplit(link).query))
            q.update({'season': season, 'episode': episode})
            q = (urllib.urlencode(q)).replace('%2C', ',')
            link = link.replace('?' + urlparse.urlparse(link).query, '') + '?' + q

        trans = __get_moonwalk_translators(link, ref)
        trans = trans if trans else [(link, '')]

        urls = []
        for i in trans:
            urls += __get_moonwalk(i[0], ref, info=i[1])
        return urls
    except:
        return []
项目:PyJFuzz    作者:mseclab    | 项目源码 | 文件源码
def need_update(self):
        if "HTTP_PROXY" in os.environ or "HTTPS_PROXY" in os.environ:
            if "HTTP_PROXY" in os.environ:
                if sys.version_info >= (3, 0):
                    proxy = urllib.parse.urlparse(os.environ["HTTP_PROXY"])
                else:
                    proxy = urlparse.urlparse(os.environ["HTTP_PROXY"])
            else:
                if sys.version_info >= (3, 0):
                    proxy = urllib.parse.urlparse(os.environ["HTTPS_PROXY"])
                else:
                    proxy = urlparse.urlparse(os.environ["HTTPS_PROXY"])
            if sys.version_info >= (3, 0):
                conn = http.client.HTTPSConnection(proxy.hostname, proxy.port)
            else:
                conn = httplib.HTTPSConnection(proxy.hostname, proxy.port)
            conn.set_tunnel(self.version_host, 443)
        else:
            if sys.version_info >= (3, 0):
                conn = http.client.HTTPSConnection("raw.githubusercontent.com")
            else:
                conn = httplib.HTTPSConnection("raw.githubusercontent.com")
        conn.request("GET", self.version_url)
        version = conn.getresponse().read()
        try:
            if StrictVersion(version) > StrictVersion(PYJFUZZ_VERSION):
                self.new_version = version
                return True
        except:
            pass
        return False
项目:mbed-cli    作者:ARMmbed    | 项目源码 | 文件源码
def write(self):
        if os.path.isfile(self.lib):
            with open(self.lib) as f:
                lib_repo = Repo.fromurl(f.read().strip())
                if (formaturl(lib_repo.url, 'https') == formaturl(self.url, 'https') # match URLs in common format (https)
                        and (lib_repo.rev == self.rev                              # match revs, even if rev is None (valid for repos with no revisions)
                             or (lib_repo.rev and self.rev
                                 and lib_repo.rev == self.rev[0:len(lib_repo.rev)]))):  # match long and short rev formats
                    #print self.name, 'unmodified'
                    return

        ref = (formaturl(self.url, 'https').rstrip('/') + '/' +
              (('' if self.is_build else '#') +
                self.rev if self.rev else ''))
        action("Updating reference \"%s\" -> \"%s\"" % (relpath(cwd_root, self.path) if cwd_root != self.path else self.name, ref))
        with open(self.lib, 'wb') as f:
            with_auth = urlparse(ref)
            f.write(with_auth._replace(netloc=with_auth.hostname).geturl())
            f.write("\n")
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def api_is_run(url):
    """Determine if a URL looks like a valid run URL"""
    # Note that this generates an extra array element because of the
    # leading slash.
    url_parts = urlparse.urlparse(url).path.split('/')
    if len(url_parts) != 6 \
            or (url_parts[:3] != ['', 'pscheduler', 'tasks' ]) \
            or (url_parts[4] != 'runs'):
        return False

    try:
        uuid.UUID(url_parts[3])
        uuid.UUID(url_parts[5])
    except ValueError:
        return False

    return True
项目:anita    作者:gson1703    | 项目源码 | 文件源码
def __init__(self, iso_url, **kwargs):
        Version.__init__(self, **kwargs)
        if re.match(r'/', iso_url):
            self.m_iso_url = "file://" + iso_url
            self.m_iso_path = iso_url
        else:
            self.m_iso_url = iso_url
            self.m_iso_path = None
        # We can't determine the final ISO file name yet because the work
        # directory is not known at this point, but we can precalculate the
        # basename of it.
        self.m_iso_basename = os.path.basename(
            urllib.url2pathname(urlparse.urlparse(iso_url)[2]))
        m = re.match(r"(.*)cd.*iso|NetBSD-[0-9\._A-Z]+-(.*).iso", self.m_iso_basename)
        if m is None:
            raise RuntimeError("cannot guess architecture from ISO name '%s'"
                % self.m_iso_basename)
        if m.group(1) is not None:
            self.m_arch = m.group(1)
        if m.group(2) is not None:
            self.m_arch = m.group(2)
        check_arch_supported(self.m_arch, 'iso')
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_validate_image_status_before_upload_unexpected_resp_v1(self):
        mock_conn = mock.Mock()
        fake_url = 'http://fake_host/fake_path/fake_image_id'
        parts = urlparse(fake_url)
        path = parts[2]
        fake_image_id = path.split('/')[-1]
        mock_head_resp = mock.Mock()
        mock_head_resp.status = httplib.BAD_REQUEST
        mock_head_resp.read.return_value = 'fakeData'
        mock_head_resp.getheader.return_value = 'queued'
        mock_conn.getresponse.return_value = mock_head_resp
        self.mock_patch_object(self.glance, 'check_resp_status_and_retry')

        self.glance.validate_image_status_before_upload_v1(
            mock_conn, fake_url, extra_headers=mock.Mock())
        self.assertEqual(mock_head_resp.read.call_count, 2)
        self.glance.check_resp_status_and_retry.assert_called_with(
            mock_head_resp, fake_image_id, fake_url)
        mock_conn.request.assert_called_once()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def check_headers(self, headers):
        etag = headers.get('etag')
        if etag is not None:
            if etag.startswith(('W/', 'w/')):
                if etag.startswith('w/'):
                    warn(HTTPWarning('weak etag indicator should be upcase.'),
                         stacklevel=4)
                etag = etag[2:]
            if not (etag[:1] == etag[-1:] == '"'):
                warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)

        location = headers.get('location')
        if location is not None:
            if not urlparse(location).netloc:
                warn(HTTPWarning('absolute URLs required for location header'),
                     stacklevel=4)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def _BuildUrl(self, url, path_elements=None, extra_params=None):
    # Break url into consituent parts
    (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)

    # Add any additional path elements to the path
    if path_elements:
      # Filter out the path elements that have a value of None
      p = [i for i in path_elements if i]
      if not path.endswith('/'):
        path += '/'
      path += '/'.join(p)

    # Add any additional query parameters to the query string
    if extra_params and len(extra_params) > 0:
      extra_query = self._EncodeParameters(extra_params)
      # Add it to the existing query
      if query:
        query += '&' + extra_query
      else:
        query = extra_query

    # Return the rebuilt URL
    return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
项目:openstack-ansible-plugins    作者:openstack    | 项目源码 | 文件源码
def get_netloc(url):
    """Return the netloc from a URL.

    If the input value is not a value URL the method will raise an Ansible
    filter exception.

    :param url: the URL to parse
    :type url: ``str``
    :returns: ``str``
    """
    try:
        netloc = urlparse(url).netloc
    except Exception as exp:
        raise errors.AnsibleFilterError(
            'Failed to return the netloc of: "%s"' % str(exp)
        )
    else:
        return netloc
项目:openstack-ansible-plugins    作者:openstack    | 项目源码 | 文件源码
def get_netorigin(url):
    """Return the netloc from a URL.

    If the input value is not a value URL the method will raise an Ansible
    filter exception.

    :param url: the URL to parse
    :type url: ``str``
    :returns: ``str``
    """
    try:
        parsed_url = urlparse(url)
        netloc = parsed_url.netloc
        scheme = parsed_url.scheme
    except Exception as exp:
        raise errors.AnsibleFilterError(
            'Failed to return the netorigin of: "%s"' % str(exp)
        )
    else:
        return '%s://%s' % (scheme, netloc)
项目:poco    作者:shiwaforce    | 项目源码 | 文件源码
def check_remote(url):
        # TODO need a better solution
        o = urlparse.urlparse(url)
        host = o.netloc
        while "@" in host:
            host = host[host.find("@")+1:]
        while ":" in host:
            host = host[:host.find(":")]
        cmd = list()
        cmd.append("ping")
        if platform.system().lower().startswith("win"):
            cmd.append("-n")
            cmd.append("1")
            cmd.append("-w")
            cmd.append("1000")
        else:
            cmd.append("-c1")
            cmd.append("-t1")
        cmd.append(host)

        p = Popen(" ".join(cmd), stdout=PIPE, stderr=PIPE, shell=True)
        out, err = p.communicate()

        return len(err) == 0
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
项目:SoCFoundationFlow    作者:mattaw    | 项目源码 | 文件源码
def configure(self):
    opts = self.options
    use_cfg = opts.use_config
    if use_cfg is None:
        return
    url = urlparse(opts.use_config_dir)
    kwargs = {}
    if url.scheme:
        kwargs['download'] = True
        kwargs['remote_url'] = url.geturl()
        # search first with the exact url, else try with +'/wafcfg'
        kwargs['remote_locs'] = ['', DEFAULT_DIR]
    tooldir = url.geturl() + ' ' + DEFAULT_DIR
    for cfg in use_cfg.split(','):
        Logs.pprint('NORMAL', "Searching configuration '%s'..." % cfg)
        self.load(cfg, tooldir=tooldir, **kwargs)
    self.start_msg('Checking for configuration')
    self.end_msg(use_cfg)
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def check_headers(self, headers):
        etag = headers.get('etag')
        if etag is not None:
            if etag.startswith(('W/', 'w/')):
                if etag.startswith('w/'):
                    warn(HTTPWarning('weak etag indicator should be upcase.'),
                         stacklevel=4)
                etag = etag[2:]
            if not (etag[:1] == etag[-1:] == '"'):
                warn(HTTPWarning('unquoted etag emitted.'), stacklevel=4)

        location = headers.get('location')
        if location is not None:
            if not urlparse(location).netloc:
                warn(HTTPWarning('absolute URLs required for location header'),
                     stacklevel=4)
项目:quartz-browser    作者:ksharindam    | 项目源码 | 文件源码
def downloadVideo(self):
        url = unicode(self.tabWidget.currentWidget().url().toString())
        # For youtube videos
        if validYoutubeUrl(url):
            vid_id = parse_qs(urlparse(url).query)['v'][0]
            url = 'https://m.youtube.com/watch?v=' + vid_id
            yt = YouTube(url)    # Use PyTube module for restricted videos
            videos = yt.get_videos()
            dialog = youtube_dialog.YoutubeDialog(videos, self)
            if dialog.exec_() == 1 :
                index = abs(dialog.buttonGroup.checkedId())-2
                vid = videos[index]
                reply = networkmanager.get( QNetworkRequest(QUrl.fromUserInput(vid.url)) )
                self.handleUnsupportedContent(reply, vid.filename + '.' + vid.extension)
            return
        # For embeded HTML5 videos
        request = QNetworkRequest(self.video_URL)
        request.setRawHeader('Referer', self.video_page_url)
        reply = networkmanager.get(request)
        self.handleUnsupportedContent(reply)
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def neutron_settings():
    neutron_settings = {}
    if is_relation_made('neutron-api', 'neutron-plugin'):
        neutron_api_info = NeutronAPIContext()()
        neutron_settings.update({
            # XXX: Rename these relations settings?
            'quantum_plugin': neutron_api_info['neutron_plugin'],
            'region': config('region'),
            'quantum_security_groups':
            neutron_api_info['neutron_security_groups'],
            'quantum_url': neutron_api_info['neutron_url'],
        })
        neutron_url = urlparse(neutron_settings['quantum_url'])
        neutron_settings['quantum_host'] = neutron_url.hostname
        neutron_settings['quantum_port'] = neutron_url.port
    return neutron_settings
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _should_use_proxy(url, no_proxy=None):
    """Determines whether a proxy should be used to open a connection to the 
    specified URL, based on the value of the no_proxy environment variable.
    @param url: URL
    @type url: basestring or urllib2.Request
    """
    if no_proxy is None:
        no_proxy_effective = os.environ.get('no_proxy', '')
    else:
        no_proxy_effective = no_proxy

    urlObj = urlparse_.urlparse(_url_as_string(url))
    for np in [h.strip() for h in no_proxy_effective.split(',')]:
        if urlObj.hostname == np:
            return False

    return True
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def get_query_tag_value(path, query_tag):
    """This is a utility method to query for specific the http parameters in the uri.  

    Returns the value of the parameter, or None if not found."""  
    data = { }
    parsed_path = urlparse(path)
    query_tokens = parsed_path.query.split('&')
    # find the 'ids' query, there can only be one
    for tok in query_tokens:
        query_tok = tok.split('=')
        query_key = query_tok[0]
        if query_key is not None and query_key == query_tag:
            # ids tag contains a comma delimited list of ids
            data[query_tag] = query_tok[1]    
            break        
    return data.get(query_tag,None) 

# sign a message with revocation key.  telling of verification problem
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def get_query_tag_value(self, path, query_tag):
        """This is a utility method to query for specific the http parameters in the uri.  

        Returns the value of the parameter, or None if not found."""  
        data = { }
        parsed_path = urlparse(self.path)
        query_tokens = parsed_path.query.split('&')
        # find the 'ids' query, there can only be one
        for tok in query_tokens:
            query_tok = tok.split('=')
            query_key = query_tok[0]
            if query_key is not None and query_key == query_tag:
                # ids tag contains a comma delimited list of ids
                data[query_tag] = query_tok[1]    
                break        
        return data.get(query_tag,None)
项目:python-keylime    作者:mit-ll    | 项目源码 | 文件源码
def get_restful_params(urlstring):
    """Returns a dictionary of paired RESTful URI parameters"""
    parsed_path = urlparse(urlstring.strip("/"))
    tokens = parsed_path.path.split('/')

    # Be sure we at least have /v#/opt
    if len(tokens) < 2:
        return None

    # Be sure first token is API version
    if len(tokens[0]) == 2 and tokens[0][0] == 'v':
        params = list_to_dict(tokens[1:])
        params["api_version"] = tokens[0][1]
        return params
    else:
        return None
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def file_to_url(self, file_rel_path):
        """Convert a relative file path to a file URL."""
        _abs_path = os.path.abspath(file_rel_path)
        return urlparse.urlparse(_abs_path, scheme='file').geturl()
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def parse_url(self, url):
        return urlparse(url)
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def load_logging_config_uri(orb, uri, binding=None):
    scheme, netloc, path, params, query, fragment = urlparse.urlparse(uri)
    if scheme == "file":
        ossie.utils.log4py.config.fileConfig(path, binding)
    elif scheme == "sca":
        q = dict([x.split("=") for x in query.split("&")])
        try:
            fileSys = orb.string_to_object(q["fs"])
        except KeyError:
            logging.warning("sca URI missing fs query parameter")
        else:
            if fileSys == None:
                logging.warning("Failed to lookup file system")
            else:
                try:
                    t = tempfile.mktemp()

                    tf = open(t, "w+")
                    scaFile = fileSys.open(path, True)
                    fileSize = scaFile.sizeOf()
                    buf = scaFile.read(fileSize)
                    tf.write(buf)
                    tf.close()
                    scaFile.close()

                    ossie.utils.log4py.config.fileConfig(t)
                finally:
                    os.remove(t)
    else:
        # Invalid scheme
        logging.warning("Invalid logging config URI scheme")
项目:core-framework    作者:RedhawkSDR    | 项目源码 | 文件源码
def GetConfigFileContents( url ):
    fc=None
    scheme, netloc, path, params, query, fragment = urlparse.urlparse(url)
    if scheme == "file":
        try:
            f = open(path,'r')
            fc=""
            for line in f:
                fc += line
            f.close()
        except:
            fc=None
    elif scheme == "sca":
        fc=GetSCAFileContents(url)

    elif scheme == "http":
        fc=GetHTTPFileContents(url)

    elif scheme == "str":
        ## RESOLVE
        if path.startswith("/"):
          fc=path[1:]
        else:
           fc=path
        pass
    else:
        # Invalid scheme
        logging.warning("Invalid logging config URI scheme")

    return fc
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def authenticate_keystone_user(self, keystone, user, password, tenant):
        """Authenticates a regular user with the keystone public endpoint."""
        self.log.debug('Authenticating keystone user ({})...'.format(user))
        ep = keystone.service_catalog.url_for(service_type='identity',
                                              interface='publicURL')
        keystone_ip = urlparse.urlparse(ep).hostname

        return self.authenticate_keystone(keystone_ip, user, password,
                                          project_name=tenant)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def file_to_url(self, file_rel_path):
        """Convert a relative file path to a file URL."""
        _abs_path = os.path.abspath(file_rel_path)
        return urlparse.urlparse(_abs_path, scheme='file').geturl()
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.