Python urlparse 模块,urlunparse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urlparse.urlunparse()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def can_fetch(self, useragent, url):
        """using the parsed robots.txt decide if useragent can fetch url"""
        if self.disallow_all:
            return False
        if self.allow_all:
            return True
        # search for given user agent matches
        # the first match counts
        parsed_url = urlparse.urlparse(urllib.unquote(url))
        url = urlparse.urlunparse(('', '', parsed_url.path,
            parsed_url.params, parsed_url.query, parsed_url.fragment))
        url = urllib.quote(url)
        if not url:
            url = "/"
        for entry in self.entries:
            if entry.applies_to(useragent):
                return entry.allowance(url)
        # try the default entry last
        if self.default_entry:
            return self.default_entry.allowance(url)
        # agent not found ==> access granted
        return True
项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def resolveEntity(self, publicId, systemId):
        assert systemId is not None
        source = DOMInputSource()
        source.publicId = publicId
        source.systemId = systemId
        source.byteStream = self._get_opener().open(systemId)

        # determine the encoding if the transport provided it
        source.encoding = self._guess_media_encoding(source)

        # determine the base URI is we can
        import posixpath, urlparse
        parts = urlparse.urlparse(systemId)
        scheme, netloc, path, params, query, fragment = parts
        # XXX should we check the scheme here as well?
        if path and not path.endswith("/"):
            path = posixpath.dirname(path) + "/"
            parts = scheme, netloc, path, params, query, fragment
            source.baseURI = urlparse.urlunparse(parts)

        return source
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def getlinkinfos(self):
        # File reading is done in __init__() routine.  Store parser in
        # local variable to indicate success of parsing.

        # If no parser was stored, fail.
        if not self.parser: return []

        rawlinks = self.parser.getlinks()
        base = urlparse.urljoin(self.url, self.parser.getbase() or "")
        infos = []
        for rawlink in rawlinks:
            t = urlparse.urlparse(rawlink)
            # DON'T DISCARD THE FRAGMENT! Instead, include
            # it in the tuples which are returned. See Checker.dopage().
            fragment = t[-1]
            t = t[:-1] + ('',)
            rawlink = urlparse.urlunparse(t)
            link = urlparse.urljoin(base, rawlink)
            infos.append((link, rawlink, fragment))     

        return infos
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def _BuildUrl(self, url, path_elements=None, extra_params=None):
    # Break url into consituent parts
    (scheme, netloc, path, params, query, fragment) = urlparse.urlparse(url)

    # Add any additional path elements to the path
    if path_elements:
      # Filter out the path elements that have a value of None
      p = [i for i in path_elements if i]
      if not path.endswith('/'):
        path += '/'
      path += '/'.join(p)

    # Add any additional query parameters to the query string
    if extra_params and len(extra_params) > 0:
      extra_query = self._EncodeParameters(extra_params)
      # Add it to the existing query
      if query:
        query += '&' + extra_query
      else:
        query = extra_query

    # Return the rebuilt URL
    return urlparse.urlunparse((scheme, netloc, path, params, query, fragment))
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:plex-trakt-scrobbler    作者:cristianmiranda    | 项目源码 | 文件源码
def scrobble_show(self, show_name, season_number, episode_number, progress, scrobble_type):
        self.logger.info(
            'Scrobbling ({scrobble_type}) {show_name} - S{season_number}E{episode_number} - {progress} to trak.tv.'
                .format(show_name=show_name, scrobble_type=scrobble_type, season_number=season_number.zfill(2),
                        episode_number=episode_number.zfill(2), progress=progress))

        data = {}
        data['show'] = {}
        data['show']['title'] = show_name
        data['episode'] = {}
        data['episode']['season'] = int(season_number)
        data['episode']['number'] = int(episode_number)
        data['progress'] = int(progress)
        data['app_version'] = '1.0'
        data['app_date'] = '2014-09-22'
        json_data = json.dumps(data)

        url = urlparse.urlunparse(('https', 'api-v2launch.trakt.tv', '/scrobble/' + scrobble_type, '', '', ''))

        try:
            self._do_trakt_auth_post(url, json_data)
        except:
            return False

        return True
项目:plex-trakt-scrobbler    作者:cristianmiranda    | 项目源码 | 文件源码
def scrobble_movie(self, imdb_id, progress, scrobble_type):
        self.logger.info('Scrobbling ({scrobble_type}) {imdb_id} - {progress} to trak.tv.'
                         .format(imdb_id=imdb_id, scrobble_type=scrobble_type, progress=progress))

        data = {}
        data['movie'] = {}
        data['movie']['ids'] = {}
        data['movie']['ids']['imdb'] = imdb_id
        data['progress'] = int(progress)
        data['app_version'] = '1.0'
        data['app_date'] = '2014-09-22'
        json_data = json.dumps(data)

        url = urlparse.urlunparse(('https', 'api-v2launch.trakt.tv', '/scrobble/' + scrobble_type, '', '', ''))

        try:
            self._do_trakt_auth_post(url, json_data)
        except:
            return False

        return True
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _parse(url, defaultPort=None):
    url = url.strip()
    parsed = urlparse.urlparse(url)
    scheme = parsed[0]
    path = urlparse.urlunparse(('','')+parsed[2:])
    if defaultPort is None:
        if scheme == 'https':
            defaultPort = 443
        else:
            defaultPort = 80
    host, port = parsed[1], defaultPort
    if ':' in host:
        host, port = host.split(':')
        port = int(port)
    if path == "":
        path = "/"
    return scheme, host, port, path
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def process(self):
        parsed = urlparse.urlparse(self.uri)
        protocol = parsed[0]
        host = parsed[1]
        port = self.ports[protocol]
        if ':' in host:
            host, port = host.split(':')
            port = int(port)
        rest = urlparse.urlunparse(('','')+parsed[2:])
        if not rest:
            rest = rest+'/'
        class_ = self.protocols[protocol]
        headers = self.getAllHeaders().copy()
        if not headers.has_key('host'):
            headers['host'] = host
        self.content.seek(0, 0)
        s = self.content.read()
        clientFactory = class_(self.method, rest, self.clientproto, headers,
                               s, self)
        reactor.connectTCP(host, port, clientFactory)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def resolveEntity(self, publicId, systemId):
        assert systemId is not None
        source = DOMInputSource()
        source.publicId = publicId
        source.systemId = systemId
        source.byteStream = self._get_opener().open(systemId)

        # determine the encoding if the transport provided it
        source.encoding = self._guess_media_encoding(source)

        # determine the base URI is we can
        import posixpath, urlparse
        parts = urlparse.urlparse(systemId)
        scheme, netloc, path, params, query, fragment = parts
        # XXX should we check the scheme here as well?
        if path and not path.endswith("/"):
            path = posixpath.dirname(path) + "/"
            parts = scheme, netloc, path, params, query, fragment
            source.baseURI = urlparse.urlunparse(parts)

        return source
项目:OSPTF    作者:xSploited    | 项目源码 | 文件源码
def resolveEntity(self, publicId, systemId):
        assert systemId is not None
        source = DOMInputSource()
        source.publicId = publicId
        source.systemId = systemId
        source.byteStream = self._get_opener().open(systemId)

        # determine the encoding if the transport provided it
        source.encoding = self._guess_media_encoding(source)

        # determine the base URI is we can
        import posixpath, urlparse
        parts = urlparse.urlparse(systemId)
        scheme, netloc, path, params, query, fragment = parts
        # XXX should we check the scheme here as well?
        if path and not path.endswith("/"):
            path = posixpath.dirname(path) + "/"
            parts = scheme, netloc, path, params, query, fragment
            source.baseURI = urlparse.urlunparse(parts)

        return source
项目:office-interoperability-tools    作者:milossramek    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:python-tempestconf    作者:redhat-openstack    | 项目源码 | 文件源码
def do_get(self, url, top_level=False, top_level_path=""):
        parts = list(urlparse.urlparse(url))
        # 2 is the path offset
        if top_level:
            parts[2] = '/' + top_level_path

        parts[2] = MULTIPLE_SLASH.sub('/', parts[2])
        url = urlparse.urlunparse(parts)

        try:
            if self.disable_ssl_validation:
                urllib3.disable_warnings()
                http = urllib3.PoolManager(cert_reqs='CERT_NONE')
            else:
                http = urllib3.PoolManager()
            r = http.request('GET', url, headers=self.headers)
        except Exception as e:
            LOG.error("Request on service '%s' with url '%s' failed",
                      (self.name, url))
            raise e
        if r.status >= 400:
            raise ServiceError("Request on service '%s' with url '%s' failed"
                               " with code %d" % (self.name, url, r.status))
        return r.data
项目:nrp    作者:django-rea    | 项目源码 | 文件源码
def handle_redirect_to_login(request, **kwargs):
    login_url = kwargs.get("login_url")
    redirect_field_name = kwargs.get("redirect_field_name")
    next_url = kwargs.get("next_url")
    if login_url is None:
        login_url = settings.ACCOUNT_LOGIN_URL
    if next_url is None:
        next_url = request.get_full_path()
    try:
        login_url = urlresolvers.reverse(login_url)
    except urlresolvers.NoReverseMatch:
        if callable(login_url):
            raise
        if "/" not in login_url and "." not in login_url:
            raise
    url_bits = list(urlparse.urlparse(login_url))
    if redirect_field_name:
        querystring = QueryDict(url_bits[4], mutable=True)
        querystring[redirect_field_name] = next_url
        url_bits[4] = querystring.urlencode(safe="/")
    return HttpResponseRedirect(urlparse.urlunparse(url_bits))
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def url(self, value):
        self.__dict__['url'] = value
        if value is not None:
            scheme, netloc, path, params, query, fragment = urlparse.urlparse(value)

            # Exclude default port numbers.
            if scheme == 'http' and netloc[-3:] == ':80':
                netloc = netloc[:-3]
            elif scheme == 'https' and netloc[-4:] == ':443':
                netloc = netloc[:-4]
            if scheme not in ('http', 'https'):
                raise ValueError("Unsupported URL %s (%s)." % (value, scheme))

            # Normalized URL excludes params, query, and fragment.
            self.normalized_url = urlparse.urlunparse((scheme, netloc, path, None, None, None))
        else:
            self.normalized_url = None
            self.__dict__['url'] = None
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def resolveEntity(self, publicId, systemId):
        assert systemId is not None
        source = DOMInputSource()
        source.publicId = publicId
        source.systemId = systemId
        source.byteStream = self._get_opener().open(systemId)

        # determine the encoding if the transport provided it
        source.encoding = self._guess_media_encoding(source)

        # determine the base URI is we can
        import posixpath, urlparse
        parts = urlparse.urlparse(systemId)
        scheme, netloc, path, params, query, fragment = parts
        # XXX should we check the scheme here as well?
        if path and not path.endswith("/"):
            path = posixpath.dirname(path) + "/"
            parts = scheme, netloc, path, params, query, fragment
            source.baseURI = urlparse.urlunparse(parts)

        return source
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:bandit-http-server    作者:evancasey    | 项目源码 | 文件源码
def _do_put_request(self, resource, param_dict):
        req_url = urlparse.urlunparse(["http", self.host, "api/v%s/%s" % (self.api_version, resource), "", "", ""]) 
        print "req_url=%s" % (req_url)

        opener = urllib2.build_opener(urllib2.HTTPHandler)

        req = urllib2.Request(req_url, data=json.dumps(param_dict))
        req.add_header('Content-Type', 'application/json')
        req.get_method = lambda: 'PUT'

        try:
            return eval(opener.open(req).read())            
        except urllib2.HTTPError, err:
            return parse_errors(err)

#---------------------------------------------
# error parsing
# --------------------------------------------
项目:wagtail_room_booking    作者:Tamriel    | 项目源码 | 文件源码
def handle_redirect_to_login(request, **kwargs):
    login_url = kwargs.get("login_url")
    redirect_field_name = kwargs.get("redirect_field_name")
    next_url = kwargs.get("next_url")
    if login_url is None:
        login_url = settings.ACCOUNT_LOGIN_URL
    if next_url is None:
        next_url = request.get_full_path()
    try:
        login_url = urlresolvers.reverse(login_url)
    except urlresolvers.NoReverseMatch:
        if callable(login_url):
            raise
        if "/" not in login_url and "." not in login_url:
            raise
    url_bits = list(urlparse(login_url))
    if redirect_field_name:
        querystring = QueryDict(url_bits[4], mutable=True)
        querystring[redirect_field_name] = next_url
        url_bits[4] = querystring.urlencode(safe="/")
    return HttpResponseRedirect(urlunparse(url_bits))
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def resolveEntity(self, publicId, systemId):
        assert systemId is not None
        source = DOMInputSource()
        source.publicId = publicId
        source.systemId = systemId
        source.byteStream = self._get_opener().open(systemId)

        # determine the encoding if the transport provided it
        source.encoding = self._guess_media_encoding(source)

        # determine the base URI is we can
        import posixpath, urlparse
        parts = urlparse.urlparse(systemId)
        scheme, netloc, path, params, query, fragment = parts
        # XXX should we check the scheme here as well?
        if path and not path.endswith("/"):
            path = posixpath.dirname(path) + "/"
            parts = scheme, netloc, path, params, query, fragment
            source.baseURI = urlparse.urlunparse(parts)

        return source
项目:circlecli    作者:TheRealJoeLinux    | 项目源码 | 文件源码
def _build_url(self, endpoint, params={}):
        """Return the full URL for the desired endpoint.

        Args:
            endpoint (str): the API endpoint after base URL
            params (dict): any params to include in the request

        Returns:
            (str) the full URL of the request
        """
        new_params = {'circle-token': self._token}
        new_params.update(params)

        parsed_url = urlparse(self._base_url)
        new_parse = ParseResult(scheme=parsed_url.scheme, netloc=parsed_url.netloc,
                                path='/'.join((parsed_url.path, endpoint)),
                                params='', query=urlencode(new_params),
                                fragment='')

        return urlunparse(new_parse)
项目:defcon-workshop    作者:devsecops    | 项目源码 | 文件源码
def __getattr__ (self,name):
        if name=="urlWithoutVariables":
            return urlunparse((self.schema,self.__host,self.__path,'','',''))
        elif name=="pathWithVariables":
            return urlunparse(('','',self.__path,'',self.__variablesGET.urlEncoded(),''))
        elif name=="completeUrl":
            return urlunparse((self.schema,self.__host,self.__path,self.__params,self.__variablesGET.urlEncoded(),''))
        elif name=="finalUrl":
            if self.__finalurl:
                return self.__finalurl
            return self.completeUrl
        elif name=="urlWithoutPath":
            return "%s://%s" % (self.schema,self._headers["Host"])
        elif name=="path":
            return self.__path
        elif name=="postdata":
            if self.ContentType=="application/x-www-form-urlencoded":
                return self.__variablesPOST.urlEncoded()
            elif self.ContentType=="multipart/form-data":
                return self.__variablesPOST.multipartEncoded()
            else:
                return self.__uknPostData
        else:
            raise AttributeError
项目:pupy    作者:ru-faraon    | 项目源码 | 文件源码
def resolveEntity(self, publicId, systemId):
        assert systemId is not None
        source = DOMInputSource()
        source.publicId = publicId
        source.systemId = systemId
        source.byteStream = self._get_opener().open(systemId)

        # determine the encoding if the transport provided it
        source.encoding = self._guess_media_encoding(source)

        # determine the base URI is we can
        import posixpath, urlparse
        parts = urlparse.urlparse(systemId)
        scheme, netloc, path, params, query, fragment = parts
        # XXX should we check the scheme here as well?
        if path and not path.endswith("/"):
            path = posixpath.dirname(path) + "/"
            parts = scheme, netloc, path, params, query, fragment
            source.baseURI = urlparse.urlunparse(parts)

        return source
项目:edx-video-pipeline    作者:edx    | 项目源码 | 文件源码
def _http_request(self, verb, path, body, headers):
        """Makes the actual HTTP request.
        """
        url = urlparse.urlunparse((self.config.scheme, self.config.server,
            path, None, None, None))
        LOG.debug("Request is %s:%s" % (verb, url))
        LOG.debug("Request headers are %s" % headers)
        LOG.debug("Request body is %s" % body)

        conn = self._get_connection()
        resp, content = conn.request(url, method=verb, body=body,
            headers=headers)
        #http response code is handled else where
        http_status = (resp.status, resp.reason)
        resp_headers = dict(
            (k.lower(), v)
            for k, v in resp.iteritems()
        )
        resp_body = content

        LOG.debug("Response status is %s %s" % http_status)
        LOG.debug("Response headers are %s" % resp_headers)
        LOG.debug("Response body is %s" % resp_body)

        return (http_status, resp_headers, resp_body)
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def findTags(self):
        all = lambda x: 1
        for elm in self.document(all, {'rel': re.compile(r'\btag\b')}):
            href = elm.get('href')
            if not href:
                continue
            urlscheme, domain, path, params, query, fragment = \
                       urlparse.urlparse(_urljoin(self.baseuri, href))
            segments = path.split('/')
            tag = segments.pop()
            if not tag:
                if segments:
                    tag = segments.pop()
                else:
                    # there are no tags
                    continue
            tagscheme = urlparse.urlunparse((urlscheme, domain, '/'.join(segments), '', '', ''))
            if not tagscheme.endswith('/'):
                tagscheme += '/'
            self.tags.append(FeedParserDict({"term": tag, "scheme": tagscheme, "label": elm.string or ''}))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def findTags(self):
        all = lambda x: 1
        for elm in self.document(all, {'rel': re.compile(r'\btag\b')}):
            href = elm.get('href')
            if not href:
                continue
            urlscheme, domain, path, params, query, fragment = \
                       urlparse.urlparse(_urljoin(self.baseuri, href))
            segments = path.split('/')
            tag = segments.pop()
            if not tag:
                if segments:
                    tag = segments.pop()
                else:
                    # there are no tags
                    continue
            tagscheme = urlparse.urlunparse((urlscheme, domain, '/'.join(segments), '', '', ''))
            if not tagscheme.endswith('/'):
                tagscheme += '/'
            self.tags.append(FeedParserDict({"term": tag, "scheme": tagscheme, "label": elm.string or ''}))
项目:stormtrooper    作者:CompileInc    | 项目源码 | 文件源码
def normalize_website(cls, w):
        from django.core.validators import EMPTY_VALUES
        from urlparse import urlparse, urlunparse, ParseResult
        w = w.decode('utf-8')
        if w in EMPTY_VALUES:
            return None
        w = w.lower().strip()
        if not w.startswith('http://') and not w.startswith('https://'):
            w = 'http://' + w.lstrip('/')
        try:
            parsed = urlparse(w)
        except ValueError as e:
            return None
        else:
            new_parsed = ParseResult(scheme='http',
                                     netloc=cls.get_website_tld(w),
                                     path=parsed.path.rstrip('/'),
                                     params='',
                                     query=parsed.query,
                                     fragment='')
            return urlunparse(new_parsed)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def _add_query_parameter(url, name, value):
  """Adds a query parameter to a url.

  Replaces the current value if it already exists in the URL.

  Args:
    url: string, url to add the query parameter to.
    name: string, query parameter name.
    value: string, query parameter value.

  Returns:
    Updated query parameter. Does not update the url if value is None.
  """
  if value is None:
    return url
  else:
    parsed = list(urlparse.urlparse(url))
    q = dict(urlparse.parse_qsl(parsed[4]))
    q[name] = value
    parsed[4] = urllib.urlencode(q)
    return urlparse.urlunparse(parsed)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:intel-manager-for-lustre    作者:intel-hpdd    | 项目源码 | 文件源码
def test_login(self):
        self.hosts = self.add_hosts([
            config['lustre_servers'][0]['address'],
            config['lustre_servers'][1]['address']])

        # Chroma puts its FQDN in the manager certificate, but the test config may
        # be pointing to localhost: if this is the case, substitute the FQDN in the
        # URL so that the client can validate the certificate.
        url = config['chroma_managers'][0]['server_http_url']
        parsed = urlparse.urlparse(url)
        if parsed.hostname == 'localhost':
            parsed = list(parsed)
            parsed[1] = parsed[1].replace("localhost", socket.getfqdn())
            url = urlparse.urlunparse(tuple(parsed))

        example_api_client.setup_ca(url)
        hosts = example_api_client.list_hosts(url,
            config['chroma_managers'][0]['users'][0]['username'],
            config['chroma_managers'][0]['users'][0]['password']
        )
        self.assertListEqual(hosts, [h['fqdn'] for h in self.hosts])
项目:analyst-scripts    作者:Te-k    | 项目源码 | 文件源码
def send_request(host, params, get="GET", display=False):
    url_parts = list(host)
    url_parts[4] = urllib.urlencode(params)
    url = urlparse.urlunparse(url_parts)
    req = urllib2.Request(url)
    req.add_header('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8')
    req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:25.0) Gecko/20100101 Firefox/25.0')
    try:
        res = urllib2.urlopen(req)
        code = res.code
    except urllib2.HTTPError, e:
        code = e.code

    if display:
        print "Request: %s" % (url)+"\t/\t",
        print "Response: %i" % code
    return is_blocked(code)

#============================== Tampering functions ===========================
# TODO: handle encoding tricks
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def getlinkinfos(self):
        # File reading is done in __init__() routine.  Store parser in
        # local variable to indicate success of parsing.

        # If no parser was stored, fail.
        if not self.parser: return []

        rawlinks = self.parser.getlinks()
        base = urlparse.urljoin(self.url, self.parser.getbase() or "")
        infos = []
        for rawlink in rawlinks:
            t = urlparse.urlparse(rawlink)
            # DON'T DISCARD THE FRAGMENT! Instead, include
            # it in the tuples which are returned. See Checker.dopage().
            fragment = t[-1]
            t = t[:-1] + ('',)
            rawlink = urlparse.urlunparse(t)
            link = urlparse.urljoin(base, rawlink)
            infos.append((link, rawlink, fragment))

        return infos
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def getlinkinfos(self):
        # File reading is done in __init__() routine.  Store parser in
        # local variable to indicate success of parsing.

        # If no parser was stored, fail.
        if not self.parser: return []

        rawlinks = self.parser.getlinks()
        base = urlparse.urljoin(self.url, self.parser.getbase() or "")
        infos = []
        for rawlink in rawlinks:
            t = urlparse.urlparse(rawlink)
            # DON'T DISCARD THE FRAGMENT! Instead, include
            # it in the tuples which are returned. See Checker.dopage().
            fragment = t[-1]
            t = t[:-1] + ('',)
            rawlink = urlparse.urlunparse(t)
            link = urlparse.urljoin(base, rawlink)
            infos.append((link, rawlink, fragment))

        return infos
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:pscheduler    作者:perfsonar    | 项目源码 | 文件源码
def api_replace_host(url_text, replacement):
    "Replace the host portion of a URL"

    url = list(urlparse.urlparse(url_text))
    if replacement is not None:
        url[1] = __host_per_rfc_2732(replacement)
    return urlparse.urlunparse(url)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _remove_md5_fragment(location):
    if not location:
        return ''
    parsed = urlparse(location)
    if parsed[-1].startswith('md5='):
        return urlunparse(parsed[:-1] + ('',))
    return location
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _remove_md5_fragment(location):
    if not location:
        return ''
    parsed = urlparse(location)
    if parsed[-1].startswith('md5='):
        return urlunparse(parsed[:-1] + ('',))
    return location
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def login_url(login_view, next_url=None, next_field='next'):
    '''
    Creates a URL for redirecting to a login page. If only `login_view` is
    provided, this will just return the URL for it. If `next_url` is provided,
    however, this will append a ``next=URL`` parameter to the query string
    so that the login view can redirect back to that URL.

    :param login_view: The name of the login view. (Alternately, the actual
                       URL to the login view.)
    :type login_view: str
    :param next_url: The URL to give the login view for redirection.
    :type next_url: str
    :param next_field: What field to store the next URL in. (It defaults to
                       ``next``.)
    :type next_field: str
    '''
    if login_view.startswith(('https://', 'http://', '/')):
        base = login_view
    else:
        base = url_for(login_view)

    if next_url is None:
        return base

    parts = list(urlparse(base))
    md = url_decode(parts[4])
    md[next_field] = make_next_param(base, next_url)
    parts[4] = url_encode(md, sort=True)
    return urlunparse(parts)
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def addroot(self, root, add_to_do = 1):
        if root not in self.roots:
            troot = root
            scheme, netloc, path, params, query, fragment = \
                    urlparse.urlparse(root)
            i = path.rfind("/") + 1
            if 0 < i < len(path):
                path = path[:i]
                troot = urlparse.urlunparse((scheme, netloc, path,
                                             params, query, fragment))
            self.roots.append(troot)
            self.addrobot(root)
            if add_to_do:
                self.newlink((root, ""), ("<root>", root))
项目:v2ex-tornado-2    作者:coderyy    | 项目源码 | 文件源码
def get_callback_url(self):
        if self.callback and self.verifier:
            # Append the oauth_verifier.
            parts = urlparse.urlparse(self.callback)
            scheme, netloc, path, params, query, fragment = parts[:6]
            if query:
                query = '%s&oauth_verifier=%s' % (query, self.verifier)
            else:
                query = 'oauth_verifier=%s' % self.verifier
            return urlparse.urlunparse((scheme, netloc, path, params,
                query, fragment))
        return self.callback
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)