Python urllib.parse 模块,urlunparse() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urlunparse()

项目:toshi-admin-service    作者:toshiapp    | 项目源码 | 文件源码
def force_https(request):
    host = request.headers.get('Host', '')
    # get scheme, first by checking the x-forwarded-proto (from nginx/heroku etc)
    # then falling back on whether or not there is an sslcontext
    scheme = request.headers.get(
        'x-forwarded-proto',
        "https" if request.transport.get_extra_info('sslcontext') else "http")
    if not host.startswith("localhost:") and scheme != "https":
        url = urlunparse((
            "https",
            host,
            request.path,
            None,
            request.query_string,
            None))
        return redirect(url)
项目:aiolocust    作者:kpidata    | 项目源码 | 文件源码
def __init__(self, base_url, *args, **kwargs):
        requests.Session.__init__(self, *args, **kwargs)

        self.base_url = base_url

        # Check for basic authentication
        parsed_url = urlparse(self.base_url)
        if parsed_url.username and parsed_url.password:
            netloc = parsed_url.hostname
            if parsed_url.port:
                netloc += ":%d" % parsed_url.port

            # remove username and password from the base_url
            self.base_url = urlunparse((parsed_url.scheme, netloc, parsed_url.path, parsed_url.params, parsed_url.query, parsed_url.fragment))
            # configure requests to use basic auth
            self.auth = HTTPBasicAuth(parsed_url.username, parsed_url.password)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:meg-server    作者:Argonne-National-Laboratory    | 项目源码 | 文件源码
def get_sendgrid_request_message(cfg, keyid, hex, user_email):
    url_prefix = urljoin(
        cfg.config.megserver_hostname_url,
        os.path.join(cfg.config.meg_url_prefix, "revoke")
    )
    params = urlencode([("keyid", keyid), ("token", hex)])
    parsed = list(urlparse(url_prefix))
    parsed[4] = params
    revocation_link = urlunparse(parsed)

    message = Mail()
    message.add_to(user_email)
    message.set_from(cfg.config.sendgrid.from_email)
    message.set_subject(cfg.config.sendgrid.subject)
    message.set_html(EMAIL_HTML.format(keyid=keyid, link=revocation_link))
    return message
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:wagtail_room_booking    作者:Tamriel    | 项目源码 | 文件源码
def handle_redirect_to_login(request, **kwargs):
    login_url = kwargs.get("login_url")
    redirect_field_name = kwargs.get("redirect_field_name")
    next_url = kwargs.get("next_url")
    if login_url is None:
        login_url = settings.ACCOUNT_LOGIN_URL
    if next_url is None:
        next_url = request.get_full_path()
    try:
        login_url = urlresolvers.reverse(login_url)
    except urlresolvers.NoReverseMatch:
        if callable(login_url):
            raise
        if "/" not in login_url and "." not in login_url:
            raise
    url_bits = list(urlparse(login_url))
    if redirect_field_name:
        querystring = QueryDict(url_bits[4], mutable=True)
        querystring[redirect_field_name] = next_url
        url_bits[4] = querystring.urlencode(safe="/")
    return HttpResponseRedirect(urlunparse(url_bits))
项目:oa_qian    作者:sunqb    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:isp-data-pollution    作者:essandess    | 项目源码 | 文件源码
def get_websearch(self,query):
        """
        HTTP GET of a websearch, then add any embedded links.
        :param query: 
        :return: 
        """
        self.select_random_search_engine()
        url = uprs.urlunparse(uprs.urlparse(self.SafeSearch.search_url)._replace(query='{}={}{}&{}'.format(
            self.SafeSearch.query_parameter,uprs.quote_plus(query),
            self.SafeSearch.additional_parameters,self.SafeSearch.safe_parameter)))
        if self.verbose: self.print_url(url)
        @self.phantomjs_timeout
        def phantomjs_get(): self.driver.get(url)  # selenium driver
        phantomjs_get()
        @self.phantomjs_short_timeout
        def phantomjs_page_source(): self.data_usage += len(self.driver.page_source)
        phantomjs_page_source()
        new_links = self.websearch_links()
        if self.link_count() < self.max_links_cached: self.add_url_links(new_links,url)
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def get_referer_url(request):
    """
        ?????????? ????????????? ???? REFERER, ???? ?? ? ???????? ?????.
        ?????, ?????????? MULTILANGUAGE_FALLBACK_URL
    """
    referer = request.META.get('HTTP_REFERER')
    if not referer:
        return resolve_url(options.MULTILANGUAGE_FALLBACK_URL)

    site = get_current_site(request)
    url_parts = list(urlparse(referer))
    if url_parts[1] != site.domain:
        return resolve_url(options.MULTILANGUAGE_FALLBACK_URL)

    url_parts[0] = ''
    url_parts[1] = ''
    return urlunparse(url_parts)
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def noredirect_url(url, forced_path=None):
    """
        ?????????? ? ???? ?????????, ???????????? ????????????
    """
    url_parts = list(urlparse(url))
    query = dict(parse_qsl(url_parts[4]))

    if forced_path:
        forced_path_parts = list(urlparse(forced_path))
        url_parts[2] = forced_path_parts[2]
        forced_path_query = dict(parse_qsl(forced_path_parts[4]))
        query.update(forced_path_query)

    query.update({
        options.MULTILANGUAGE_GET_PARAM: 1
    })
    url_parts[4] = urlencode(query)

    return urlunparse(url_parts)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def http_error_302(self, req, fp, code, msg, headers):
        """store "Location" HTTP response header
        :return: http
        """
        self.location = headers.get('Location', '')
        uprint("headers['Location']=" + self.location)

        def squote(s):
            return urllib.parse.quote(s, ';/?:&=+,$[]%^')
        try:
            self.location.encode('ascii')
        except UnicodeEncodeError:
            scheme, netloc, path, params, query, fragment = \
                urllib.parse.urlparse(self.location)
            self.location = urllib.parse.urlunparse((
                scheme, netloc, urllib.parse.quote(path), squote(params), squote(query),
                fragment))
            headers.replace_header('Location', self.location)
            uprint("pquoted headers['Location']=" + self.location)
        return urllib.request.HTTPRedirectHandler.http_error_302(
            self, req, fp, code, msg, headers)
项目:chihu    作者:yelongyu    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:HOPSTACK    作者:willpatterson    | 项目源码 | 文件源码
def get_data(self, save_directory):
        """
        Retrieves data from remote location
        saves data in: save_directory
        TODO:
            figure out how to handle local file paths
            consider directory downloads from html pages with hyperlinks
            ** Impliment custom URL schemes -- Now needs to be done in lasubway.py
            How does raw data fit into this function?
        """

        url = urlunparse(self)
        file_name = os.path.basename(os.path.normpath(self.path))
        save_path = os.path.join(save_directory, file_name)
        with closing(urlopen(url)) as request:
            with open(save_path, 'wb') as sfile:
                shutil.copyfileobj(request, sfile)
项目:pyetje    作者:rorlika    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:pyramid_notebook    作者:websauna    | 项目源码 | 文件源码
def serve_websocket(request, port):
    """Start UWSGI websocket loop and proxy."""
    env = request.environ

    # Send HTTP response 101 Switch Protocol downstream
    uwsgi.websocket_handshake(env['HTTP_SEC_WEBSOCKET_KEY'], env.get('HTTP_ORIGIN', ''))

    # Map the websocket URL to the upstream localhost:4000x Notebook instance
    parts = urlparse(request.url)
    parts = parts._replace(scheme="ws", netloc="localhost:{}".format(port))
    url = urlunparse(parts)

    # Proxy initial connection headers
    headers = [(header, value) for header, value in request.headers.items() if header.lower() in CAPTURE_CONNECT_HEADERS]

    logger.info("Connecting to upstream websockets: %s, headers: %s", url, headers)

    ws = ProxyClient(url, headers=headers)
    ws.connect()

    # TODO: Will complain loudly about already send headers - how to abort?
    return httpexceptions.HTTPOk()
项目:Flask-NvRay-Blog    作者:rui7157    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:python_ddd_flask    作者:igorvinnicius    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:django-icekit    作者:ic-labs    | 项目源码 | 文件源码
def get_draft_url(url):
    """
    Return the given URL with a draft mode HMAC in its querystring.
    """
    if verify_draft_url(url):
        # Nothing to do. Already a valid draft URL.
        return url
    # Parse querystring and add draft mode HMAC.
    url = urlparse.urlparse(url)
    salt = get_random_string(5)
    # QueryDict requires a bytestring as its first argument
    query = QueryDict(force_bytes(url.query), mutable=True)
    query['edit'] = '%s:%s' % (salt, get_draft_hmac(salt, url.path))
    # Reconstruct URL.
    parts = list(url)
    parts[4] = query.urlencode(safe=':')
    return urlparse.urlunparse(parts)
项目:desktop-stream-viewer    作者:AbiosGaming    | 项目源码 | 文件源码
def open_stream_in_browser(self, event):
        stream_url = urlparse(self.stream.url)

        netloc = stream_url.netloc
        path = stream_url.path

        if "twitch" in netloc:
            path = stream_url.path + "/chat"

        if "youtube" in netloc:
            path = path.replace("watch", "live_chat")

        correct_url = (stream_url.scheme, netloc, path, stream_url.params, stream_url.query, stream_url.fragment)

        url = urlunparse(correct_url)

        os = platform.system()
        if os == OS.WINDOWS:
            os2.startfile(url)
        else:
            webbrowser.open(url)
项目:StockRecommendSystem    作者:doncat99    | 项目源码 | 文件源码
def _BuildUrl(self, url, path_elements=None, extra_params=None):
        # Break url into constituent parts
        (scheme, netloc, path, params, query, fragment) = urlparse(url)

        # Add any additional path elements to the path
        if path_elements:
            # Filter out the path elements that have a value of None
            p = [i for i in path_elements if i]
            if not path.endswith('/'):
                path += '/'
            path += '/'.join(p)

        # Add any additional query parameters to the query string
        if extra_params and len(extra_params) > 0:
            extra_query = self._EncodeParameters(extra_params)
            # Add it to the existing query
            if query:
                query += '&' + extra_query
            else:
                query = extra_query

        # Return the rebuilt URL
        return urlunparse((scheme, netloc, path, params, query, fragment))
项目:structure_spider    作者:ShichaoMa    | 项目源码 | 文件源码
def url_item_arg_increment(partten, url, count):
    """
    ????url arguments??item?partten??????url????????????url
    @param partten: `keyword`=`begin_num`(eg: start=0)????partten?????
    @param url: http://www.ecco.com/abc?start=30
    @param count:  30???item???
    @return: http://www.ecco.com/abc?start=60
    """
    keyword, begin_num = partten.split("=")
    mth = re.search(r"%s=(\d+)"%keyword, url)
    if mth:
        start = int(mth.group(1))
    else:
        start = int(begin_num)
    parts = urlparse(url)
    if parts.query:
        query = dict(x.split("=") for x in parts.query.split("&"))
        query[keyword] = start + count
    else:
        query = {keyword: count+start}
    return urlunparse(parts._replace(query=urlencode(query)))
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def make_next_param(login_url, current_url):
    '''
    Reduces the scheme and host from a given URL so it can be passed to
    the given `login` URL more efficiently.

    :param login_url: The login URL being redirected to.
    :type login_url: str
    :param current_url: The URL to reduce.
    :type current_url: str
    '''
    l = urlparse(login_url)
    c = urlparse(current_url)

    if (not l.scheme or l.scheme == c.scheme) and \
            (not l.netloc or l.netloc == c.netloc):
        return urlunparse(('', '', c.path, c.params, c.query, ''))
    return current_url
项目:django-bootstrap4    作者:GabrielUlici    | 项目源码 | 文件源码
def url_replace_param(url, name, value):
     """
     Replace a GET parameter in an URL
     """
     url_components = urlparse(url)
     query_params = parse_qs(url_components.query)
     query_params[name] = value
     query = urlencode(query_params, doseq=True)
     return urlunparse([
         url_components.scheme,
         url_components.netloc,
         url_components.path,
         url_components.params,
         query,
         url_components.fragment,
     ])
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def removeURLHash(url: str) -> str:
    """Remove url hash."""
    urlObject, _ = urldefrag(url)
    return urlunparse(urlObject)
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _remove_md5_fragment(location):
    if not location:
        return ''
    parsed = urlparse(location)
    if parsed[-1].startswith('md5='):
        return urlunparse(parsed[:-1] + ('',))
    return location
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def _remove_md5_fragment(location):
    if not location:
        return ''
    parsed = urlparse(location)
    if parsed[-1].startswith('md5='):
        return urlunparse(parsed[:-1] + ('',))
    return location
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def login_url(login_view, next_url=None, next_field='next'):
    '''
    Creates a URL for redirecting to a login page. If only `login_view` is
    provided, this will just return the URL for it. If `next_url` is provided,
    however, this will append a ``next=URL`` parameter to the query string
    so that the login view can redirect back to that URL.

    :param login_view: The name of the login view. (Alternately, the actual
                       URL to the login view.)
    :type login_view: str
    :param next_url: The URL to give the login view for redirection.
    :type next_url: str
    :param next_field: What field to store the next URL in. (It defaults to
                       ``next``.)
    :type next_field: str
    '''
    if login_view.startswith(('https://', 'http://', '/')):
        base = login_view
    else:
        base = url_for(login_view)

    if next_url is None:
        return base

    parts = list(urlparse(base))
    md = url_decode(parts[4])
    md[next_field] = make_next_param(base, next_url)
    parts[4] = url_encode(md, sort=True)
    return urlunparse(parts)
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:bearychat.py    作者:bearyinnovative    | 项目源码 | 文件源码
def _build_url(self, api_method):
        path = '{}/{}'.format(self.base_url.path, api_method.lstrip('/'))
        url = self.base_url._replace(path=path)
        return urlparse.urlunparse(url).rstrip('/')
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def download(self, source, dest):
        """
        Download an archive file.

        :param str source: URL pointing to an archive file.
        :param str dest: Local path location to download archive file to.
        """
        # propogate all exceptions
        # URLError, OSError, etc
        proto, netloc, path, params, query, fragment = urlparse(source)
        if proto in ('http', 'https'):
            auth, barehost = splituser(netloc)
            if auth is not None:
                source = urlunparse((proto, barehost, path, params, query, fragment))
                username, password = splitpasswd(auth)
                passman = HTTPPasswordMgrWithDefaultRealm()
                # Realm is set to None in add_password to force the username and password
                # to be used whatever the realm
                passman.add_password(None, source, username, password)
                authhandler = HTTPBasicAuthHandler(passman)
                opener = build_opener(authhandler)
                install_opener(opener)
        response = urlopen(source)
        try:
            with open(dest, 'wb') as dest_file:
                dest_file.write(response.read())
        except Exception as e:
            if os.path.isfile(dest):
                os.unlink(dest)
            raise e

    # Mandatory file validation via Sha1 or MD5 hashing.
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def base_url(self, url):
        """Return url without querystring or fragment"""
        parts = list(self.parse_url(url))
        parts[4:] = ['' for i in parts[4:]]
        return urlunparse(parts)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _remove_md5_fragment(location):
    if not location:
        return ''
    parsed = urlparse(location)
    if parsed[-1].startswith('md5='):
        return urlunparse(parsed[:-1] + ('',))
    return location
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _remove_md5_fragment(location):
    if not location:
        return ''
    parsed = urlparse(location)
    if parsed[-1].startswith('md5='):
        return urlunparse(parsed[:-1] + ('',))
    return location
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def _remove_md5_fragment(location):
    if not location:
        return ''
    parsed = urlparse(location)
    if parsed[-1].startswith('md5='):
        return urlunparse(parsed[:-1] + ('',))
    return location
项目:phredutils    作者:doctaphred    | 项目源码 | 文件源码
def url(scheme='', netloc='', path='', params='', query=(), fragment=''):
    """Construct a URL from individual components.

    <scheme>://<netloc>/<path>;<params>?<query>#<fragment>

    Args:
        schema: "http", "https", etc
        netloc: aka "host"
        path: "/path/to/resource". Leading slash is automatic.
        params: Rarely used; you probably want query.
        query: May be a dict or a sequence of 2-tuples.
        fragment: Position on page; usually used only by the browser.
    """
    return urlunparse(
        (scheme, netloc, path, params, urlencode(query), fragment))
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def post(self, url: str, data: Mapping[str, str]) -> None:
        """ POST the given URL while enforcing a 60 second timeout and handling error pages """
        def timeoutHandler(signum, frame):
            raise TimeoutException("Timeout!")
        signal.signal(signal.SIGALRM, timeoutHandler)
        signal.alarm(60)
        try:
            parsed = urlparse(url)
            # In order to POST data, we have to go to an HTML page on the same domain and then use
            # JS to kick off the post
            self.driver.get(parsed.scheme + '://' + parsed.netloc)
            js = ("function post(path) {\n"
                  "    var form = document.createElement('form');\n"
                  "    form.setAttribute('action', path);\n"
                  "    form.setAttribute('method', 'post');\n"
                  "%s"
                  "    document.body.appendChild(form);\n"
                  "    form.submit();\n"
                  "}\n")
            fields = ''
            for index, (key, val) in zip(range(len(data.items())), data.items()):
                i = str(index)
                field = ("    var a%s = document.createElement('input');\n"
                         "    a%s.setAttribute('name', '%s');\n"
                         "    a%s.setAttribute('value', '%s');\n"
                         "    form.appendChild(a%s);\n") % (i,
                                                            i,
                                                            key.replace("'", "\\'"),
                                                            i,
                                                            val.replace("'", "\\'"),
                                                            i)
                fields += field
            js = js % fields
            path = urlunparse(['', '', parsed.path, parsed.params, parsed.query, parsed.fragment])
            js += "post('%s');\n" % path
            self.driver.execute_script(js)
        except Exception as e:
            signal.alarm(0)
            raise e