Python urllib.parse 模块,urlsplit() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.urlsplit()

项目:zmirror    作者:aploium    | 项目源码 | 文件源码
def extract_url_path_and_query(full_url=None, no_query=False):
    """
    Convert http://foo.bar.com/aaa/p.html?x=y to /aaa/p.html?x=y

    :param no_query:
    :type full_url: str
    :param full_url: full url
    :return: str
    """
    if full_url is None:
        full_url = request.url
    split = urlsplit(full_url)
    result = split.path or "/"
    if not no_query and split.query:
        result += '?' + split.query
    return result


# ################# End Client Request Handler #################


# ################# Begin Middle Functions #################
项目:pyconjp-website    作者:pyconjp    | 项目源码 | 文件源码
def change_locale(request):
    """
    Redirect to a given url while changing the locale in the path
    The url and the locale code need to be specified in the
    request parameters.
    """
    next = request.REQUEST.get('next', None)
    if not next:
        referrer = request.META.get('HTTP_REFERER', None)
        if referrer:
            next = urlsplit(referrer)[2]
    if not next:
        next = '/'
    _, path = utils.strip_path(next)
    if request.method == 'POST':
        locale = request.POST.get('locale', None)
        if locale and check_for_language(locale):
            if localeurl_settings.USE_SESSION:
                request.session['django_language'] = locale
            path = utils.locale_path(path, locale)

    response = http.HttpResponseRedirect(path)
    return response
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:linkchecker-gui    作者:linkcheck    | 项目源码 | 文件源码
def can_view_parent_source (self, url_data):
        """Determine if parent URL source can be retrieved."""
        if not url_data.valid:
            return False
        parent = url_data.parent_url
        if not parent:
            return False
        # Directory contents are dynamically generated, so it makes
        # no sense in viewing/editing them.
        if parent.startswith(u"file:"):
            path = urlparse.urlsplit(parent)[2]
            return not os.path.isdir(get_os_filename(path))
        if parent.startswith((u"ftp:", u"ftps:")):
            path = urlparse.urlsplit(parent)[2]
            return bool(path) and not path.endswith(u'/')
        # Only HTTP left
        return parent.startswith((u"http:", u"https:"))
项目:simple-web-crawler    作者:fikander    | 项目源码 | 文件源码
def crawl():
    try:
        depth_limit = int(request.values['depth'])
    except ValueError as e:
        return "Depth parameter must be a number", 400
    except:
        depth_limit = 1

    if 'url' in request.values:
        url = request.values['url']
        parsed_url = urlparse.urlsplit(url)
        if parsed_url.scheme not in ['http', 'https']:
            return "Only http and https protocols are supported", 400
        if parsed_url.netloc == '':
            return "Missing domain", 400
        allowed_domains = [ parsed_url.netloc ]
        crawler = Crawler(allowed_domains, depth_limit)
        crawler.crawl(url)
        return jsonify(**crawler.crawled)
    else:
        return "Missing url parameter", 400
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:gprime    作者:GenealogyCollective    | 项目源码 | 文件源码
def upload(url, filename=None):
    from urllib.request import Request, urlopen
    from urllib.parse import urlsplit
    import shutil
    def getFilename(url,openUrl):
        if 'Content-Disposition' in openUrl.info():
            # If the response has Content-Disposition, try to get filename from it
            cd = dict([x.strip().split('=') if '=' in x else (x.strip(),'')
                                        for x in openUrl.info().split(';')])
            if 'filename' in cd:
                fname = cd['filename'].strip("\"'")
                if fname: return fname
        # if no filename was found above, parse it out of the final URL.
        return os.path.basename(urlsplit(openUrl.url)[2])
    r = urlopen(Request(url))
    success = None
    try:
        filename = filename or "/tmp/%s" % getFilename(url,r)
        with open(filename, 'wb') as f:
            shutil.copyfileobj(r,f)
        success = filename
    finally:
        r.close()
    return success
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def url_to_path_and_args(url, no_query_string=False):
    if no_query_string:
        url = url.replace('?', '%3F').replace('#', '%23')
    components = urlsplit(url)
    path = components.path
    if no_query_string:
        path = unquote(path)
        # ??????? CEIBA ? %3F ??????????
        # ??????? CEIBA ? %253F ??????????
        # ?? ceiba_dl.Request ?????????????????????
        quote_test = path.replace('?', '').replace('#', '').replace(' ', '')
        if quote(quote_test) != quote_test:
            path = path.replace('?', '%3F').replace('#', '%23')
        args = {}
    else:
        query_string = components.query
        args = parse_qs(query_string, keep_blank_values=True)
        for key, value in args.items():
            if isinstance(value, list):
                assert len(value) == 1
                args[key] = value[0]
    return (path, args)

# lxml ????????? None??????????
项目:webkit-crawler    作者:dozymoe    | 项目源码 | 文件源码
def url_join(*parts, **kwargs):
    """
    Normalize url parts and join them with a slash.
    adapted from: http://codereview.stackexchange.com/q/13027
    """
    def concat_paths(sequence):
        result = []
        for path in sequence:
            result.append(path)
            if path.startswith('/'):
                break
        return '/'.join(reversed(result))

    schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts)))
    scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http'))
    netloc = next((x for x in netlocs if x), '')
    path = concat_paths(paths)
    query = queries[0]
    fragment = fragments[0]
    return urlunsplit((scheme, netloc, path, query, fragment))
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def do_GET(self):
        # /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e

        qs = urllib_parse.urlsplit(self.path).query
        url_vars = urllib_parse.parse_qs(qs)

        oauth_token = url_vars['oauth_token'][0]
        oauth_verifier = url_vars['oauth_verifier'][0]

        if six.PY2:
            self.server.oauth_token = oauth_token.decode('utf-8')
            self.server.oauth_verifier = oauth_verifier.decode('utf-8')
        else:
            self.server.oauth_token = oauth_token
            self.server.oauth_verifier = oauth_verifier

        assert (isinstance(self.server.oauth_token, six.string_types))
        assert (isinstance(self.server.oauth_verifier, six.string_types))

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        self.wfile.write(html.auth_okay_html)
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:HAHA-NO-UR    作者:DamourYouKnow    | 项目源码 | 文件源码
def _handle_solo_scout(self):
        """
        Handles a solo scout

        :return: Path of scout image
        """
        card = await self._scout_cards()

        # Send error message if no card was returned
        if not card:
            self.results = []
            return None

        card = card[0]

        if card["card_image"] is None:
            url = "http:" + card["card_idolized_image"]
        else:
            url = "http:" + card["card_image"]

        fname = basename(urlsplit(url).path)
        image_path = idol_img_path.joinpath(fname)
        bytes_ = await get_one_img(
            url, image_path, self._bot.session_manager)
        return ScoutImage(bytes_, fname)
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def determine_ftp_filename(host, furl) -> (bool, str):
    try:
        fsize = host.path.getsize(urlsplit(furl).path)
        fname = os.path.basename(urlsplit(furl).path)
        while True:
            if not os.path.exists(dlDir+fname):
                return True, fname  # needDownload=True
            elif os.path.getsize(dlDir + fname) == fsize:
                # same name same size
                return False, fname  # needDownload=False
            # same name different size, change name by appending "_1"
            ftitle, fext = os.path.splitext(fname)
            m = re.search(r'(.+)_(\d+)', ftitle)
            if m:
                ftitle = '%s_%s' % (m.group(1), int(m.group(2))+1)
                fname = ftitle + fext
            else:
                fname = ftitle + '_1' + fext
    except BaseException as ex:
        traceback.print_exc()
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:ecs    作者:ecs-org    | 项目源码 | 文件源码
def test_submission_form_copy(self):
        '''Tests if a submissionform can be copied. Compares initial version against copied version.
        '''

        submission_form = create_submission_form(presenter=self.user)
        response = self.client.get(reverse('ecs.core.views.submissions.copy_latest_submission_form', kwargs={'submission_pk': submission_form.submission.pk}))
        self.assertEqual(response.status_code, 302)
        url = reverse('ecs.core.views.submissions.copy_submission_form', kwargs={'submission_form_pk': submission_form.pk})
        self.assertEqual(url, urlsplit(response['Location']).path)

        response = self.client.get(url)
        self.assertEqual(response.status_code, 302)
        target_url = response['Location']
        response = self.client.get(target_url)
        self.assertEqual(response.status_code, 200)
        self.assertEqual(response.context['form'].initial.get('project_title'), submission_form.project_title)
项目:mybookshelf2    作者:izderadicka    | 项目源码 | 文件源码
def check_cors(origin):
    url=urlsplit(origin)
    if current_app.config.get('CORS_SECURE'):
        if url.scheme!='https':
            return False

    hp=url.netloc.split(':')
    host=hp[0]
    port=int(hp[1]) if len(hp)>1 else 443 if url.scheme == 'https' else 80

    if current_app.config.get('CORS_HOSTS') != '*' and host not in current_app.config.get('CORS_HOSTS', []):
        return False
    allowed_ports=current_app.config.get('CORS_PORTS')
    if allowed_ports and isinstance(allowed_ports, tuple) and (port < allowed_ports[0] or\
        port > allowed_ports[1]):
        return False
    elif allowed_ports and isinstance(allowed_ports, list) and port not in allowed_ports:
        return False

    return True
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def append_query_params(self, url, **kwargs):

        uri = urlsplit(url)
        query = parse_qs(uri.query)

        for key in kwargs:
            if key in query:
                query[key].append(kwargs[key])
            else:
                query[key] = kwargs[key]

        query_string = urlencode(query, doseq=True)

        uri_new = uri._replace(query=query_string)

        return urlunsplit(uri_new)
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def test_get_without_verify_token(self):

        session = self.client.session
        session[SESSKEY_OAUTH_NEXT_URI] = "/test?type=set"
        session.save()

        getData = {
            "code": "code_will_not_be_checked_anyway"
        }

        res = self.client.get(reverse("instagram:connect"), getData)

        self.assertEqual(302, res.status_code)

        redirect_uri = urlsplit(res['Location'])
        self.assertEqual("/test", redirect_uri.path)
        self.assertDictEqual({"status": ["error"],
                              "type": ["set", "api"],
                              "detail": ["verify_token_not_set"]},
                             parse_qs(redirect_uri.query))
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def test_get_with_no_verify_token_in_session(self):

        session = self.client.session
        session[SESSKEY_OAUTH_NEXT_URI] = "/test"
        session.save()

        getData = {
            "code": "code_will_not_be_checked_anyway",
            "verify_token": "token_will_not_be_checked_anyway"
        }

        res = self.client.get(reverse("instagram:connect"), getData)

        self.assertEqual(302, res.status_code)

        redirect_uri = urlsplit(res['Location'])
        self.assertEqual("/test", redirect_uri.path)
        self.assertDictEqual({"status": ["error"],
                              "type": ["internal"],
                              "detail": ["no_verify_token_in_session"]},
                             parse_qs(redirect_uri.query))
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def test_get_with_valid_code_and_invalid_verify_token(self):

        sessionValue = "correctvalue"
        session = self.client.session
        session[SESSKEY_OAUTH_NEXT_URI] = "/test"
        session[SESSKEY_OAUTH_VERIFY_TOKEN] = sessionValue
        session.save()

        getData = {
            "verify_token": "someothervaluethaninthesession",
            "code": "code_will_not_be_checked_anyway"
        }

        res = self.client.get(reverse("instagram:connect"), getData)

        self.assertEqual(302, res.status_code)
        redirect_uri = urlsplit(res['Location'])
        self.assertEqual("/test", redirect_uri.path)
        self.assertDictEqual({"status": ["error"],
                              "type": ["api"],
                              "detail": ["invalid_verify_token"]},
                             parse_qs(redirect_uri.query))
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def append_query_params(self, url, **kwargs):

        uri = urlsplit(url)
        query = parse_qs(uri.query)

        for key in kwargs:
            if key in query:
                query[key].append(kwargs[key])
            else:
                query[key] = kwargs[key]

        query_string = urlencode(query, doseq=True)

        uri_new = uri._replace(query=query_string)

        return urlunsplit(uri_new)
项目:editolido    作者:flyingeek    | 项目源码 | 文件源码
def get_gramet_image_url(url_or_fp):
    img_src = ''
    if isinstance(url_or_fp, io.IOBase):
        # noinspection PyUnresolvedReferences
        data = url_or_fp.read()
        u = urlsplit(OGIMET_URL)
    else:
        u = urlsplit(url_or_fp)
        import requests
        r = requests.get(url_or_fp)
        data = r.text
    if data:
        m = re.search(r'<img src="([^"]+/gramet_[^"]+)"', data)
        if m:
            img_src = "{url.scheme}://{url.netloc}{path}".format(
                url=u, path=m.group(1))
    return img_src
项目:sanic    作者:channelcat    | 项目源码 | 文件源码
def test_adds_other_supplied_values_as_query_string():
    app = Sanic('passes')

    @app.route(COMPLEX_PARAM_URL)
    def passes():
        return text('this should pass')

    new_kwargs = dict(PASSING_KWARGS)
    new_kwargs['added_value_one'] = 'one'
    new_kwargs['added_value_two'] = 'two'

    url = app.url_for('passes', **new_kwargs)

    query = dict(parse_qsl(urlsplit(url).query))

    assert query['added_value_one'] == 'one'
    assert query['added_value_two'] == 'two'
项目:chunnel    作者:obmarg    | 项目源码 | 文件源码
def connect(self):
        if self.connected:
            raise Exception("Already connected!")

        transport_class = self.TRANSPORTS[urlsplit(self.url).scheme]
        self.transport = transport_class(
            self.url, self.params, self._incoming, self._outgoing
        )
        transport_task = asyncio.ensure_future(self.transport.run())

        await self.transport.ready

        self._transport_task = transport_task
        self._done_recv = asyncio.Future()
        self._recv_task = asyncio.ensure_future(self._recv_loop())
        # TODO: Ok, so this is cool - but how to tell if our transport_task has
        # failed.
        self.connected = True
项目:undercrawler    作者:TeamHG-Memex    | 项目源码 | 文件源码
def analyze_file(name, f, verbose=False):
    urls = []
    Doc = namedtuple('Doc', ['item', 'min_hash'])
    documents = {} # key -> Doc
    lsh = MinHashLSH(threshold=0.9, num_perm=128)
    too_common = get_too_common_shingles(f, name, limit=300)
    for i, item in enumerate(item_reader(f, name)):
        urls.append(item['url'])
        min_hash = get_min_hash(item['extracted_text'], too_common)
        key = 'item_{}'.format(i)
        item = {'url': item['url']}
        documents[key] = Doc(item, min_hash)
        if key in lsh:
            lsh.remove(key)
        lsh.insert(key, min_hash)
    paths = [''.join([p.netloc, p.path]) for p in map(urlsplit, urls)]
    duplicates = get_duplicates(lsh, documents, verbose=verbose)
    print(name.ljust(40), '\t'.join(map(str, [
        len(urls), len(set(urls)), len(set(paths)),
        n_unique(documents, duplicates),
        ])))
项目:undercrawler    作者:TeamHG-Memex    | 项目源码 | 文件源码
def media_request(self, url):
        kwargs = dict(
            url=url,
            priority=-2,
            meta={'download_slot': (
                '{} documents'.format(urlsplit(url).netloc)),
            },
        )
        if using_splash(self.crawler.settings):
            return SplashRequest(
                endpoint='execute',
                args={'lua_source': self.lua_source},
                slot_policy=SlotPolicy.SCRAPY_DEFAULT,
                **kwargs)
        else:
            return Request(**kwargs)
项目:directory-tests    作者:uktrade    | 项目源码 | 文件源码
def sign_request(self, api_key, prepared_request):
        url = urlsplit(prepared_request.path_url)
        path = bytes(url.path, 'utf8')
        if url.query:
            path += bytes("?{}".format(url.query), 'utf8')

        salt = bytes(api_key, 'utf8')
        body = prepared_request.body or b""

        if isinstance(body, str):
            body = bytes(body, 'utf8')

        signature = sha256(path + body + salt).hexdigest()
        prepared_request.headers["X-Signature"] = signature

        return prepared_request
项目:pytorch    作者:ezyang    | 项目源码 | 文件源码
def download_file(url, binary=True):
    if sys.version_info < (3,):
        from urlparse import urlsplit
        import urllib2
        request = urllib2
        error = urllib2
    else:
        from urllib.parse import urlsplit
        from urllib import request, error

    filename = os.path.basename(urlsplit(url)[2])
    data_dir = os.path.join(os.path.dirname(__file__), 'data')
    path = os.path.join(data_dir, filename)

    if os.path.exists(path):
        return path
    try:
        data = request.urlopen(url, timeout=15).read()
        with open(path, 'wb' if binary else 'w') as f:
            f.write(data)
        return path
    except error.URLError:
        msg = "could not download test file '{}'".format(url)
        warnings.warn(msg, RuntimeWarning)
        raise unittest.SkipTest(msg)
项目:ddt4all    作者:cedricp    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:websauna    作者:websauna    | 项目源码 | 文件源码
def merge_url_qs(url: str, **kw) -> str:
    """Merge the query string elements of a URL with the ones in ``kw``.

    If any query string element exists in ``url`` that also exists in ``kw``, replace it.

    :param url: An URL.
    :param kw: Dictionary with keyword arguments.
    :return: An URL with keyword arguments merged into the query string.
    """
    segments = urlsplit(url)
    extra_qs = [
        (k, v)
        for (k, v) in parse_qsl(segments.query, keep_blank_values=1)
        if k not in kw
    ]
    qs = urlencode(sorted(kw.items()))
    if extra_qs:
        qs += '&' + urlencode(extra_qs)
    return urlunsplit((segments.scheme, segments.netloc, segments.path, qs, segments.fragment))
项目:mt7687-serial-uploader    作者:will127534    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:acmpv    作者:Vayn    | 项目源码 | 文件源码
def get_mgtv_real_url(url):
        """str->list of str
        Give you the real URLs."""
        content = loads(get_content(url))
        m3u_url = content['info']
        split = urlsplit(m3u_url)

        base_url = "{scheme}://{netloc}{path}/".format(scheme = split[0],
                                                      netloc = split[1],
                                                      path = dirname(split[2]))

        content = get_content(content['info'])  #get the REAL M3U url, maybe to be changed later?
        segment_list = []
        segments_size = 0
        for i in content.split():
            if not i.startswith('#'):  #not the best way, better we use the m3u8 package
                segment_list.append(base_url + i)
            # use ext-info for fast size calculate
            elif i.startswith('#EXT-MGTV-File-SIZE:'):
                segments_size += int(i[i.rfind(':')+1:])

        return m3u_url, segments_size, segment_list
项目:acmpv    作者:Vayn    | 项目源码 | 文件源码
def get_mgtv_real_url(url):
        """str->list of str
        Give you the real URLs."""
        content = loads(get_content(url))
        m3u_url = content['info']
        split = urlsplit(m3u_url)

        base_url = "{scheme}://{netloc}{path}/".format(scheme = split[0],
                                                      netloc = split[1],
                                                      path = dirname(split[2]))

        content = get_content(content['info'])  #get the REAL M3U url, maybe to be changed later?
        segment_list = []
        segments_size = 0
        for i in content.split():
            if not i.startswith('#'):  #not the best way, better we use the m3u8 package
                segment_list.append(base_url + i)
            # use ext-info for fast size calculate
            elif i.startswith('#EXT-MGTV-File-SIZE:'):
                segments_size += int(i[i.rfind(':')+1:])

        return m3u_url, segments_size, segment_list
项目:grasp    作者:textgain    | 项目源码 | 文件源码
def serialize(url='', data={}):
    """ Returns a URL with a query string of the given data.
    """
    p = urlparse.urlsplit(url)
    q = urlparse.parse_qsl(p.query)
    q.extend((b(k), b(v)) for k, v in sorted(data.items()))
    q = urlencode(q, doseq=True)
    p = p.scheme, p.netloc, p.path, q, p.fragment
    s = urlparse.urlunsplit(p)
    s = s.lstrip('?')
    return s

# print(serialize('http://www.google.com', {'q': 'cats'})) # http://www.google.com?q=cats

#---- REQUESTS & STREAMS --------------------------------------------------------------------------
# The download(url) function returns the HTML (JSON, image data, ...) at the given url.
# If this fails it will raise NotFound (404), Forbidden (403) or TooManyRequests (420).
项目:quant    作者:yutiansut    | 项目源码 | 文件源码
def safe_url(url, remove_empty_query=True):
    scheme, netloc, path, query, fragment = urlsplit(url)

    if not query:
        return url.rstrip('/')

    # Sort all the queries
    queries = []
    for q in query.split('&'):
        if '=' not in q:
            return url

        key, value = q.split('=')
        if remove_empty_query and not value:
            continue

        queries.append((key, value))

    queries.sort(key=lambda x: x[0])
    query = urlencode(queries)

    return urlunsplit((scheme, netloc, path, query, fragment)).rstrip('/')
项目:goblin-legacy    作者:ZEROFAIL    | 项目源码 | 文件源码
def __call__(self, value):
        try:
            super(URLValidator, self).__call__(value)
        except ValidationError as e:
            # Trivial case failed. Try for possible IDN domain
            if value:
                value = text_type(value)
                scheme, netloc, path, query, fragment = urlsplit(value)
                try:
                    # IDN -> ACE
                    netloc = netloc.encode('idna').decode('ascii')
                except UnicodeError:  # invalid domain part
                    raise ValidationError(self.message.format(value),
                                          code=self.code)
                url = urlunsplit((scheme, netloc, path, query, fragment))
                return super(URLValidator, self).__call__(url)
            else:
                raise ValidationError(self.message.format(value),
                                      code=self.code)
        return value
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:Jackal_Velodyne_Duke    作者:MengGuo    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:pytorch    作者:pytorch    | 项目源码 | 文件源码
def download_file(url, binary=True):
    if sys.version_info < (3,):
        from urlparse import urlsplit
        import urllib2
        request = urllib2
        error = urllib2
    else:
        from urllib.parse import urlsplit
        from urllib import request, error

    filename = os.path.basename(urlsplit(url)[2])
    data_dir = os.path.join(os.path.dirname(__file__), 'data')
    path = os.path.join(data_dir, filename)

    if os.path.exists(path):
        return path
    try:
        data = request.urlopen(url, timeout=15).read()
        with open(path, 'wb' if binary else 'w') as f:
            f.write(data)
        return path
    except error.URLError:
        msg = "could not download test file '{}'".format(url)
        warnings.warn(msg, RuntimeWarning)
        raise unittest.SkipTest(msg)
项目:domain-discovery-crawler    作者:TeamHG-Memex    | 项目源码 | 文件源码
def test_batch_softmax_high_prob(server, priority=10000):
    q = make_queue(server, BatchSoftmaxQueue, settings={'QUEUE_BATCH_SIZE': 50})
    for domain_n in range(100):
        for url_n in range(5):
            q.push(Request(
                url='http://domain-{}.com/{}'.format(domain_n, url_n),
                priority=priority
                if (domain_n in [42, 43] and url_n == 1) else 0,
            ))
    res = q.pop_multi()
    urls = {r.url for r in res}
    assert 'http://domain-42.com/1' in urls
    assert 'http://domain-43.com/1' in urls
    assert len({urlsplit(r.url).netloc for r in res}) > 10
    assert len(res) == 50


# FIXME - broken in ebd4cb651050fcdae5427383f3d07b094f853155
# TODO - add a test for the infinite loop fixed in ^^