Python urlparse 模块,urlsplit() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urlparse.urlsplit()

项目:kinect-2-libras    作者:inessadl    | 项目源码 | 文件源码
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
项目:recipebook    作者:dpapathanasiou    | 项目源码 | 文件源码
def getOtherRecipeLinks(self):
        """Return a list of other recipes found in the page: while single recipe
        pages do not have links, the various categories at
        http://www.williams-sonoma.com/recipe/ do.

        For example,
        http://www.williams-sonoma.com/search/results.html?activeTab=recipes&words=winter_weeknight_dinners
        has a collection of individual recipe links, and this method will find them.

        """
        data = []
        for link in self.tree.xpath('//ul[@class="recipe-list"]/li/a'):
            if 'href' in link.keys():
                href = urlsplit(link.get('href'))
                if 'cm_src=RECIPESEARCH' == href.query:
                    data.append(href.scheme + '://' + href.netloc + href.path)
        return data
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def run(self):
        ind=self.qu.get()
        url=self.url+str(ind)
        soup =bs.BeautifulSoup(''.join( ul.urlopen(url).readlines() ))
        bu = up.urlsplit(self.url)
        print 'started with the ' ,str(url).split('/')[-1],
        for i in  soup.find_all(attrs = { "class" : "recipe-title"}):
            sp = up.urlsplit(i.a.get('href'))
            path = sp.path
            print path
            if re.search(pat, path):
                path = bu.scheme+'://'+bu.netloc+path
                filename = str(path).split('/')[-2]
                filename = op.join(op.abspath(op.curdir),filename+'.py') # recipe will be stored in given location
#                filename = op.join(op.abspath(op.curdir),filename+'.html')
#uncomment the above line if downloading the web page for teh recipe
                print path
                self.q.put((path,filename))
        self.fetch_data()
        time.sleep(1)
        self.qu.task_done()
        self.q.join()
        print 'done with the ' ,str(url).split('/')[-1],
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def get_version_from_url(url):

    components = urlparse.urlsplit(url)

    path = components.path
    pos = path.find('/')

    ver = ''
    if pos == 0:
        path = path[1:]
        i = path.find('/')
        if i >= 0:
            ver = path[:i]
        else:
            ver = path
    elif pos > 0:
        ver = path[:pos]
    else:
        ver = path

    return ver
项目:trio2o    作者:openstack    | 项目源码 | 文件源码
def remove_trailing_version_from_href(href):
    """Removes the api version from the href.

    Given: 'http://www.nova.com/compute/v1.1'
    Returns: 'http://www.nova.com/compute'

    Given: 'http://www.nova.com/v1.1'
    Returns: 'http://www.nova.com'

    """
    parsed_url = urlparse.urlsplit(href)
    url_parts = parsed_url.path.rsplit('/', 1)

    # NOTE: this should match vX.X or vX
    expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
    if not expression.match(url_parts.pop()):
        raise ValueError('URL %s does not contain version' % href)

    new_path = url_join(*url_parts)
    parsed_url = list(parsed_url)
    parsed_url[2] = new_path
    return urlparse.urlunsplit(parsed_url)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def click(self, st):
        """Return a path which is the URL where a browser would presumably take
        you if you clicked on a link with an HREF as given.
        """
        scheme, netloc, path, query, fragment = urlparse.urlsplit(st)
        if not scheme:
            scheme = self.scheme
        if not netloc:
            netloc = self.netloc
            if not path:
                path = self.path
                if not query:
                    query = self.query
            elif path[0] != '/':
                l = self.pathList()
                l[-1] = path
                path = '/'.join(l)

        return URLPath(scheme,
                        netloc,
                        path,
                        query,
                        fragment)
项目:hostapd-mana    作者:adde88    | 项目源码 | 文件源码
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
项目:vspheretools    作者:devopshq    | 项目源码 | 文件源码
def _checkFrom(self, pyobj):
        '''WS-Address From,
        XXX currently not checking the hostname, not forwarding messages.
        pyobj  -- From server returned.
        '''
        if pyobj is None: return
        value = pyobj._Address
        if value != self._addressTo:
            scheme,netloc,path,query,fragment = urlparse.urlsplit(value)
            schemeF,netlocF,pathF,queryF,fragmentF = urlparse.urlsplit(self._addressTo)
            if scheme==schemeF and path==pathF and query==queryF and fragment==fragmentF:
                netloc = netloc.split(':') + ['80']
                netlocF = netlocF.split(':') + ['80']
                if netloc[1]==netlocF[1] and (socket.gethostbyname(netlocF[0]) in
                    ('127.0.0.1', socket.gethostbyname(netloc[0]))):
                    return

            raise WSActionException('wrong WS-Address From(%s), expecting %s'%(value,self._addressTo))
项目:pyconjp-website    作者:pyconjp    | 项目源码 | 文件源码
def change_locale(request):
    """
    Redirect to a given url while changing the locale in the path
    The url and the locale code need to be specified in the
    request parameters.
    """
    next = request.REQUEST.get('next', None)
    if not next:
        referrer = request.META.get('HTTP_REFERER', None)
        if referrer:
            next = urlsplit(referrer)[2]
    if not next:
        next = '/'
    _, path = utils.strip_path(next)
    if request.method == 'POST':
        locale = request.POST.get('locale', None)
        if locale and check_for_language(locale):
            if localeurl_settings.USE_SESSION:
                request.session['django_language'] = locale
            path = utils.locale_path(path, locale)

    response = http.HttpResponseRedirect(path)
    return response
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:devtools-parser    作者:WPO-Foundation    | 项目源码 | 文件源码
def get_base_page_info(self, page_data):
        """Find the reverse-ip info for the base page"""
        domain = urlparse.urlsplit(page_data['final_url']).hostname
        try:
            import socket
            addr = socket.gethostbyname(domain)
            host = str(socket.gethostbyaddr(addr)[0])
            page_data['base_page_ip_ptr'] = host
        except Exception:
            pass
        # keep moving up the domain until we can get a NS record
        while domain is not None and 'base_page_dns_soa' not in page_data:
            try:
                import dns.resolver
                dns_servers = dns.resolver.query(domain, "NS")
                dns_server = str(dns_servers[0].target).strip('. ')
                page_data['base_page_dns_ns'] = dns_server
            except Exception:
                pass
            pos = domain.find('.')
            if pos > 0:
                domain = domain[pos + 1:]
            else:
                domain = None
项目:linkchecker-gui    作者:linkcheck    | 项目源码 | 文件源码
def can_view_parent_source (self, url_data):
        """Determine if parent URL source can be retrieved."""
        if not url_data.valid:
            return False
        parent = url_data.parent_url
        if not parent:
            return False
        # Directory contents are dynamically generated, so it makes
        # no sense in viewing/editing them.
        if parent.startswith(u"file:"):
            path = urlparse.urlsplit(parent)[2]
            return not os.path.isdir(get_os_filename(path))
        if parent.startswith((u"ftp:", u"ftps:")):
            path = urlparse.urlsplit(parent)[2]
            return bool(path) and not path.endswith(u'/')
        # Only HTTP left
        return parent.startswith((u"http:", u"https:"))
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:touch-pay-client    作者:HackPucBemobi    | 项目源码 | 文件源码
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urlparse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = u''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urlparse.urlunsplit(parts)
    else:
        return url
项目:reportIT    作者:stevekm    | 项目源码 | 文件源码
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
项目:reportIT    作者:stevekm    | 项目源码 | 文件源码
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
项目:reportIT    作者:stevekm    | 项目源码 | 文件源码
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
项目:reportIT    作者:stevekm    | 项目源码 | 文件源码
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
项目:reportIT    作者:stevekm    | 项目源码 | 文件源码
def download_file(my_URL, my_outfile = ''):
    # function to download a file from a URL
    # !! This will overwrite the output file
    # https://gist.github.com/hughdbrown/c145b8385a2afa6570e2

    import urllib2
    import urlparse
    import os

    URL_basename = os.path.basename(urlparse.urlsplit(my_URL).path)

    # if no output file specified, save to URL filename in current dir
    if my_outfile == '':
        my_outfile = URL_basename

    my_URL = urllib2.urlopen(my_URL)
    with open(my_outfile, 'wb') as output:
        while True:
            data = my_URL.read(4096) # download in chunks
            if data:
                output.write(data)
            else:
                break
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def create_http_request(self, method, url, headers, body, timeout, **kwargs):
        scheme, netloc, path, query, _ = urlparse.urlsplit(url)
        if netloc.rfind(':') <= netloc.rfind(']'):
            # no port number
            host = netloc
            port = 443 if scheme == 'https' else 80
        else:
            host, _, port = netloc.rpartition(':')
            port = int(port)
        if query:
            path += '?' + query
        if 'Host' not in headers:
            headers['Host'] = host
        if body and 'Content-Length' not in headers:
            headers['Content-Length'] = str(len(body))
        ConnectionType = httplib.HTTPSConnection if scheme == 'https' else httplib.HTTPConnection
        connection = ConnectionType(netloc, timeout=timeout)
        connection.request(method, path, body=body, headers=headers)
        response = connection.getresponse()
        return response
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def filter(self, handler):
        path = urlparse.urlsplit(handler.path).path
        if path.startswith('/'):
            path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
            if os.path.isdir(path):
                index_file = os.path.join(path, self.index_file)
                if not os.path.isfile(index_file):
                    content = self.format_index_html(path).encode('UTF-8')
                    headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
                else:
                    path = index_file
            if os.path.isfile(path):
                content_type = 'application/octet-stream'
                try:
                    import mimetypes
                    content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
                    if os.path.splitext(path)[1].endswith(('crt', 'pem')):
                        content_type = 'application/x-x509-ca-cert'
                except StandardError as e:
                    logging.error('import mimetypes failed: %r', e)
                with open(path, 'rb') as fp:
                    content = fp.read()
                    headers = {'Connection': 'close', 'Content-Type': content_type}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _CalculateRequestSize(self, req):
    """Calculates the request size.

    Args:
      req: A tuple of (uri, method name, request body, header map)
    Returns:
      the size of the request, in bytes.
    """
    uri, method, body, headers = req
    (unused_scheme,
     unused_host_port, url_path,
     unused_query, unused_fragment) = urlparse.urlsplit(uri)
    size = len('%s %s HTTP/1.1\n' % (method, url_path))
    size += self._CalculateHeaderSize(headers)


    if body:
      size += len(body)
    return size
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _parse_relative_url(relative_url):
  """Parses a relative URL and splits it into its path and query string.

  Args:
    relative_url: The relative URL, starting with a '/'.

  Returns:
    Tuple (path, query) where:
      path: The path in the relative URL.
      query: The query string in the URL without the '?' character.

  Raises:
    _RelativeUrlError if the relative_url is invalid for whatever reason.
  """
  if not relative_url:
    raise _RelativeUrlError('Relative URL is empty')
  (scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
  if scheme or netloc:
    raise _RelativeUrlError('Relative URL may not have a scheme or location')
  if fragment:
    raise _RelativeUrlError('Relative URL may not specify a fragment')
  if not path or path[0] != '/':
    raise _RelativeUrlError('Relative URL path must start with "/"')
  return path, query
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def reduce_uri(self, uri, default_port=True):
        """Accept authority or URI and extract only the authority and path."""
        # note HTTP URLs do not have a userinfo component
        parts = urlparse.urlsplit(uri)
        if parts[1]:
            # URI
            scheme = parts[0]
            authority = parts[1]
            path = parts[2] or '/'
        else:
            # host or host:port
            scheme = None
            authority = uri
            path = '/'
        host, port = splitport(authority)
        if default_port and port is None and scheme is not None:
            dport = {"http": 80,
                     "https": 443,
                     }.get(scheme)
            if dport is not None:
                authority = "%s:%d" % (host, dport)
        return authority, path
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _CalculateRequestSize(self, req):
    """Calculates the request size.

    May be overriden to support different types of requests.

    Args:
      req: A urllib2.Request.

    Returns:
      the size of the request, in bytes.
    """
    (unused_scheme,
     unused_host_port, url_path,
     unused_query, unused_fragment) = urlparse.urlsplit(req.get_full_url())
    size = len('%s %s HTTP/1.1\n' % (req.get_method(), url_path))
    size += self._CalculateHeaderSize(req.headers)
    size += self._CalculateHeaderSize(req.unredirected_hdrs)


    data = req.get_data()
    if data:
      size += len(data)
    return size
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _parse_relative_url(relative_url):
  """Parses a relative URL and splits it into its path and query string.

  Args:
    relative_url: The relative URL, starting with a '/'.

  Returns:
    Tuple (path, query) where:
      path: The path in the relative URL.
      query: The query string in the URL without the '?' character.

  Raises:
    _RelativeUrlError if the relative_url is invalid for whatever reason.
  """
  if not relative_url:
    raise _RelativeUrlError('Relative URL is empty')
  (scheme, netloc, path, query, fragment) = urlparse.urlsplit(relative_url)
  if scheme or netloc:
    raise _RelativeUrlError('Relative URL may not have a scheme or location')
  if fragment:
    raise _RelativeUrlError('Relative URL may not specify a fragment')
  if not path or path[0] != '/':
    raise _RelativeUrlError('Relative URL path must start with "/"')
  return path, query
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def join_url(base_url, path):
    """Joins base url and path removing extra slashes.
    Removes trailing slashes. Joins queries.
    E.g.: See unit tests.
    :param base_url: Base url.
    :param path: Path.
    :return: Joined url.
    """
    # Example of usages see in unittests
    base_url = urlparse.urlsplit(base_url, allow_fragments=False)
    path = urlparse.urlsplit(path, allow_fragments=False)
    full_path = _join_paths(base_url.path, path.path)
    full_query = _join_queries(base_url.query, path.query)
    return urlparse.urlunsplit(
        (base_url.scheme, base_url.netloc, full_path, full_query,
         base_url.fragment))
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def check_registry_status(url=DEFAULT_IMAGES_URL, _v2=False):
    """
    Performs api check for registry health status.

    :params url: registry url
    :raises RegistryError: if registry is not available
    """
    url = urlsplit(url)._replace(path='/v2/' if _v2 else '/v1/_ping').geturl()

    with raise_registry_error(url):
        response = requests.get(url, timeout=PING_REQUEST_TIMEOUT,
                                verify=False)
        need_v2 = not _v2 and response.status_code == 404 and \
            response.headers.get(API_VERSION_HEADER) == 'registry/2.0'
        if need_v2:
            check_registry_status(url, _v2=True)
        elif response.status_code == 401:
            return  # user is not authorized, but registry is available
        else:
            response.raise_for_status()
项目:webkit-crawler    作者:dozymoe    | 项目源码 | 文件源码
def url_join(*parts, **kwargs):
    """
    Normalize url parts and join them with a slash.
    adapted from: http://codereview.stackexchange.com/q/13027
    """
    def concat_paths(sequence):
        result = []
        for path in sequence:
            result.append(path)
            if path.startswith('/'):
                break
        return '/'.join(reversed(result))

    schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts)))
    scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http'))
    netloc = next((x for x in netlocs if x), '')
    path = concat_paths(paths)
    query = queries[0]
    fragment = fragments[0]
    return urlunsplit((scheme, netloc, path, query, fragment))
项目:WebScraping    作者:liinnux    | 项目源码 | 文件源码
def url_to_path(self, url):
        """Create file system path for this URL
        """
        components = urlparse.urlsplit(url)
        # when empty path set to /index.html
        path = components.path
        if not path:
            path = '/index.html'
        elif path.endswith('/'):
            path += 'index.html'
        filename = components.netloc + path + components.query
        # replace invalid characters
        filename = re.sub('[^/0-9a-zA-Z\-.,;_ ]', '_', filename)
        # restrict maximum number of characters
        filename = '/'.join(segment[:255] for segment in filename.split('/'))
        return os.path.join(self.cache_dir, filename)
项目:dati-ckan-docker    作者:italia    | 项目源码 | 文件源码
def resource_dictize(res, context):
    model = context['model']
    resource = d.table_dictize(res, context)
    extras = resource.pop("extras", None)
    if extras:
        resource.update(extras)
    # some urls do not have the protocol this adds http:// to these
    url = resource['url']
    ## for_edit is only called at the times when the dataset is to be edited
    ## in the frontend. Without for_edit the whole qualified url is returned.
    if resource.get('url_type') == 'upload' and not context.get('for_edit'):
        cleaned_name = munge.munge_filename(url)
        resource['url'] = h.url_for(controller='package',
                                    action='resource_download',
                                    id=resource['package_id'],
                                    resource_id=res.id,
                                    filename=cleaned_name,
                                    qualified=True)
    elif resource['url'] and not urlparse.urlsplit(url).scheme and not context.get('for_edit'):
        resource['url'] = u'http://' + url.lstrip('/')
    return resource
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def do_GET(self):
        # /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e

        qs = urllib_parse.urlsplit(self.path).query
        url_vars = urllib_parse.parse_qs(qs)

        oauth_token = url_vars['oauth_token'][0]
        oauth_verifier = url_vars['oauth_verifier'][0]

        if six.PY2:
            self.server.oauth_token = oauth_token.decode('utf-8')
            self.server.oauth_verifier = oauth_verifier.decode('utf-8')
        else:
            self.server.oauth_token = oauth_token
            self.server.oauth_verifier = oauth_verifier

        assert (isinstance(self.server.oauth_token, six.string_types))
        assert (isinstance(self.server.oauth_verifier, six.string_types))

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        self.wfile.write(html.auth_okay_html)
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:true_review_web2py    作者:lucadealfaro    | 项目源码 | 文件源码
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urlparse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = u''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urlparse.urlunsplit(parts)
    else:
        return url
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urlparse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = u''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urlparse.urlunsplit(parts)
    else:
        return url
项目:weevely3-stealth    作者:edibledinos    | 项目源码 | 文件源码
def _get_env_info(self, script_url):

        script_folder = ModuleExec('system_info', [ '-info', 'script_folder' ]).load_result_or_run('script_folder')
        if not script_folder: return

        script_url_splitted = urlparse.urlsplit(script_url)
        script_url_path_folder, script_url_path_filename = os.path.split(
            script_url_splitted.path)

        url_folder_pieces = script_url_path_folder.split(os.sep)
        folder_pieces = script_folder.split(os.sep)

        for pieceurl, piecefolder in zip(reversed(url_folder_pieces), reversed(folder_pieces)):
            if pieceurl == piecefolder:
                folder_pieces.pop()
                url_folder_pieces.pop()
            else:
                break

        base_url_path_folder = os.sep.join(url_folder_pieces)
        self.base_folder_url = urlparse.urlunsplit(
            script_url_splitted[:2] + (base_url_path_folder, ) + script_url_splitted[3:])
        self.base_folder_path = os.sep.join(folder_pieces)
项目:Crunchyroll-XML-Decoder    作者:jaw20    | 项目源码 | 文件源码
def gethtml(url):
    with open('cookies') as f:
        cookies = requests.utils.cookiejar_from_dict(pickle.load(f))
        session = requests.session()
        session.cookies = cookies
        del session.cookies['c_visitor']
        if not forceusa and localizecookies:
            session.cookies['c_locale']={u'Español (Espana)' : 'esES', u'Français (France)' : 'frFR', u'Português (Brasil)' : 'ptBR',
                                        u'English' : 'enUS', u'Español' : 'esLA', u'Türkçe' : 'enUS', u'Italiano' : 'itIT',
                                        u'???????' : 'arME' , u'Deutsch' : 'deDE'}[lang]
        if forceusa:
            try:
                session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text
            except:
                sleep(10)  # sleep so we don't overload crunblocker
                session.cookies['sess_id'] = requests.get('http://www.crunblocker.com/sess_id.php').text
    parts = urlparse.urlsplit(url)
    if not parts.scheme or not parts.netloc:
        print 'Apparently not a URL'
        sys.exit()
    data = {'Referer': 'http://crunchyroll.com/', 'Host': 'www.crunchyroll.com',
            'User-Agent': 'Mozilla/5.0  Windows NT 6.1; rv:26.0 Gecko/20100101 Firefox/26.0'}
    res = session.get(url, params=data)
    res.encoding = 'UTF-8'
    return res.text
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:strack_python_api    作者:cine-use    | 项目源码 | 文件源码
def __init__(self, base_url, login, api_key):
        if not base_url.endswith("/"):
            base_url += "/"
        self.__base_url = base_url
        self.__api_key = api_key
        self.__login = login
        self._api_version = "api/v1/"
        self.__unique_code = self.get_unique_code()
        self._scheme, self._server, self._api_base, _, _ = urlparse.urlsplit(base_url)
        self.__sign_code = None
        self.__entity_list = []
        self.__general_doc_dict = None
        self.__logger = None

        # self.function_list = Command(self, "console/FunctionList", [])
        entity_list_params = [
            {"attr": "entity",
             "type": "list",
             "need": False}
        ]
        self._entities_detail = Command(self, "console/entity", entity_list_params)
        self.__init_entities()
项目:gooderp_org    作者:osbzr    | 项目源码 | 文件源码
def from_html(self, cr, uid, model, field, element, context=None):
        url = element.find('img').get('src')

        url_object = urlparse.urlsplit(url)
        if url_object.path.startswith('/website/image'):
            # url might be /website/image/<model>/<id>[_<checksum>]/<field>[/<width>x<height>]
            fragments = url_object.path.split('/')
            query = dict(urlparse.parse_qsl(url_object.query))
            model = query.get('model', fragments[3])
            oid = query.get('id', fragments[4].split('_')[0])
            field = query.get('field', fragments[5])
            item = self.pool[model].browse(cr, uid, int(oid), context=context)
            return item[field]

        if self.local_url_re.match(url_object.path):
            return self.load_local_url(url)

        return self.load_remote_url(url)
项目:gooderp_org    作者:osbzr    | 项目源码 | 文件源码
def load_local_url(self, url):
        match = self.local_url_re.match(urlparse.urlsplit(url).path)

        rest = match.group('rest')
        for sep in os.sep, os.altsep:
            if sep and sep != '/':
                rest.replace(sep, '/')

        path = openerp.modules.get_module_resource(
            match.group('module'), 'static', *(rest.split('/')))

        if not path:
            return None

        try:
            with open(path, 'rb') as f:
                # force complete image load to ensure it's valid image data
                image = I.open(f)
                image.load()
                f.seek(0)
                return f.read().encode('base64')
        except Exception:
            logger.exception("Failed to load local image %r", url)
            return None
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -