Python urllib.parse 模块,unquote_plus() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.unquote_plus()

项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:mongodb-monitoring    作者:jruaux    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:geocoder-ie    作者:devgateway    | 项目源码 | 文件源码
def download(dest_path, url):
    try:
        file_name = url.split('/')[-1]
        path = os.path.realpath(os.path.join(dest_path, unquote_plus(file_name)))
        if not os.path.exists(path):
            f = urlopen(url)
            headers = f.headers['content-type'].split('/')
            md = 'w'
            if 'html' in headers:
                file_name = '{}.html'.format(uuid.uuid1())
            else:
                md = 'wb'
                with open(path, md) as local_file:
                    local_file.write(f.read())

        if os.path.exists(path):
            return path
        else:
            logger.info("Wasn't able to find the file....!")
            return None
    except Exception as error:
        logger.error('download error %s', error)
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def test_pack_metadata_statement():
    jb = FSJWKSBundle('', None, 'fo_jwks',
                      key_conv={'to': quote_plus, 'from': unquote_plus})
    _keyjar = build_keyjar(KEYDEFS)[1]
    op = Operator(keyjar=_keyjar, jwks_bundle=jb, iss='https://example.com/')
    req = MetadataStatement(issuer='https://example.org/op')
    sms = op.pack_metadata_statement(req)
    assert sms  # Should be a signed JWT
    _jwt = factory(sms)
    assert _jwt
    assert _jwt.jwt.headers['alg'] == 'RS256'
    _body = json.loads(as_unicode(_jwt.jwt.part[1]))
    assert _body['iss'] == op.iss
    assert _body['issuer'] == 'https://example.org/op'
    # verify signature
    r = _jwt.verify_compact(sms, _keyjar.get_signing_key())
    assert r
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def test_unpack_metadata_statement_uri():
    s = signer[OA['sunet']]
    req = MetadataStatement(issuer='https://example.org/op')
    # Not intermediate
    ms = s.create_signed_metadata_statement(req, 'discovery', single=True)

    jb = FSJWKSBundle('', None, 'fo_jwks',
                      key_conv={'to': quote_plus, 'from': unquote_plus})

    mds = MetaDataStore('msd')
    op = Operator(jwks_bundle=jb)
    op.httpcli = MockHTTPClient(mds)
    res = op.unpack_metadata_statement(jwt_ms=ms)
    assert len(res.parsed_statement) == 3
    loel = op.evaluate_metadata_statement(res.result)
    assert len(loel) == 3
    assert set([l.fo for l in loel]) == {'https://swamid.sunet.se',
                                         'https://edugain.com',
                                         'https://www.feide.no'}
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib_parse.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %r != %r" % (expect, result))
            result = urllib_parse.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %r != %r" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib_parse.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
        self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, None)
        self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, ())
        with support.check_warnings(('', BytesWarning), quiet=True):
            self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, bytes(b''))
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:covar_me_app    作者:CovarMe    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:gitmate-2    作者:GitMateIO    | 项目源码 | 文件源码
def get_analysis_result(request):
    """
    Retrieves the analysis report for a given commit. Do provide the `repo`,
    `provider` and `sha` GET parameters.
    """
    repo = unquote_plus(request.GET.get('repo', ''))
    provider = request.GET.get('provider')
    sha = request.GET.get('sha')
    if not all((repo, sha, provider)):
        return Response(
            data={
                'detail': '`repo`, `provider` or `sha` GET parameter missing'
            },
            status=HTTP_400_BAD_REQUEST)
    db_repo = get_object_or_404(
        Repository,
        Q(full_name=repo) | Q(identifier=repo if repo.isdigit() else 0),
        provider=provider)
    result = get_object_or_404(AnalysisResults, repo=db_repo, sha=sha)
    return Response(data=ResultsSerializer(result).data)
项目:jikji    作者:Prev    | 项目源码 | 文件源码
def list(self, details=False) :
        """ Listing cache files
        """
        clist = os.listdir(self.cachedir)

        if details :
            nlist = []

            for file in clist :
                filepath = os.path.join(self.cachedir, file)
                nlist.append('%s (%s bytes)' % (unquote_plus(file), os.path.getsize(filepath) ) )

            return nlist

        else :
            return clist
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:kekescan    作者:xiaoxiaoleo    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:CVProject    作者:hieuxinhe94    | 项目源码 | 文件源码
def _translate_single_text(self, text, target_language, source_lauguage):
        assert _is_bytes(text)
        def split_text(text):
            start = 0
            text = quote_plus(text)
            length = len(text)
            while (length - start) > self._MAX_LENGTH_PER_QUERY:
                for seperator in self._SEPERATORS:
                    index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY)
                    if index != -1:
                        break
                else:
                    raise Error('input too large')
                end = index + len(seperator)
                yield unquote_plus(text[start:end])
                start = end

            yield unquote_plus(text[start:])

        def make_task(text):
            return lambda: self._basic_translate(text, target_language, source_lauguage)[0]

        results = list(self._execute(make_task(i) for i in split_text(text)))
        return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:flask-zhenai-mongo-echarts    作者:Fretice    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:Data-visualization    作者:insta-code1    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:althingi-rest-api    作者:busla    | 项目源码 | 文件源码
def session_spider(request, spider):
    """
    Scraped sessions from althingi
    """
    payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider])}
    r = requests.get('http://localhost:9080/crawl.json?', params=payload)
    spider_response = r.json()['items']

    for item in spider_response:
        date_from = datetime.datetime.strptime(item['date_from'], '%d.%m.%Y').date()
        if item['date_to']:
            date_to = datetime.datetime.strptime(item['date_to'], '%d.%m.%Y').date()
        else:
            date_to = ''

        Session.objects.update_or_create(
            session_id=item['session_id'],
            defaults={'date_to': date_from, 'date_from': date_from}
            )        

    return Response(r.json()['items'])
项目:althingi-rest-api    作者:busla    | 项目源码 | 文件源码
def committee_spider(request, spider, session):
    """
    Scraped committees from althingi
    """
    payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider] % session)}
    r = requests.get('http://localhost:9080/crawl.json?', params=payload)
    spider_response = r.json()['items']

    session_obj = Session.objects.get(session_id=int(session))

    for item in spider_response:

        committee_obj, created = Committee.objects.update_or_create(
            committee_id=item['committee_id'],
            defaults = {
                'name': item['name'], 
                'short_abbr': item['short_abbr'], 
                'long_abbr': item['long_abbr'], 
                }
            )        
        committee_obj.session.add(session_obj)

    return Response(r.json()['items'])
项目:althingi-rest-api    作者:busla    | 项目源码 | 文件源码
def committee_meeting_spider(request, spider, session):
    """
    Scraped committee meetings from althingi
    """
    payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider] % session)}
    r = requests.get('http://localhost:9080/crawl.json?', params=payload)
    spider_response = r.json()['items']

    session_obj = Session.objects.get(session_id=session)

    for item in spider_response:
        committee_obj = Committee.objects.get(committee_id=item['committee'])
        if (item['start_time'] and item['end_time']):

            CommitteeMeeting.objects.update_or_create(
                meeting_id=item['meeting_id'],
                defaults = {
                    'session': session_obj, 
                    'committee': committee_obj, 
                    'date_from': datetime.datetime.strptime(item['start_time'], '%Y-%m-%dT%H:%M:%S'),
                    'date_to': datetime.datetime.strptime(item['end_time'], '%Y-%m-%dT%H:%M:%S'),
                    }
                )            

    return Response(r.json()['items'])
项目:althingi-rest-api    作者:busla    | 项目源码 | 文件源码
def issue_spider(request, spider, session):
    """
    Scraped issues from althingi
    """

    payload = {'spider_name': spider, 'url': unquote_plus(utils.spider_url[spider] % session)}
    r = requests.get('http://localhost:9080/crawl.json?', params=payload)
    spider_response = r.json()['items']

    session_obj = Session.objects.get(session_id=session)

    for item in spider_response:
        issue_obj, created = Issue.objects.update_or_create(
            issue_id=item['issue_id'], 
            session=session_obj,            
            defaults={'name': item['name'], 'url': item['url'], 'issue_id': item['issue_id']})        

    return Response(r.json()['items'])
项目:aweasome_learning    作者:Knight-ZXW    | 项目源码 | 文件源码
def url_unescape(value, encoding='utf-8', plus=True):
        """Decodes the given value from a URL.

        The argument may be either a byte or unicode string.

        If encoding is None, the result will be a byte string.  Otherwise,
        the result is a unicode string in the specified encoding.

        If ``plus`` is true (the default), plus signs will be interpreted
        as spaces (literal plus signs must be represented as "%2B").  This
        is appropriate for query strings and form-encoded values but not
        for the path component of a URL.  Note that this default is the
        reverse of Python's urllib module.

        .. versionadded:: 3.1
           The ``plus`` argument
        """
        unquote = (urllib_parse.unquote_plus if plus else urllib_parse.unquote)
        if encoding is None:
            return unquote(utf8(value))
        else:
            return unicode_type(unquote(utf8(value)), encoding)
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def url_unescape(value, encoding='utf-8', plus=True):
        """Decodes the given value from a URL.

        The argument may be either a byte or unicode string.

        If encoding is None, the result will be a byte string.  Otherwise,
        the result is a unicode string in the specified encoding.

        If ``plus`` is true (the default), plus signs will be interpreted
        as spaces (literal plus signs must be represented as "%2B").  This
        is appropriate for query strings and form-encoded values but not
        for the path component of a URL.  Note that this default is the
        reverse of Python's urllib module.

        .. versionadded:: 3.1
           The ``plus`` argument
        """
        unquote = (urllib_parse.unquote_plus if plus else urllib_parse.unquote)
        if encoding is None:
            return unquote(utf8(value))
        else:
            return unicode_type(unquote(utf8(value)), encoding)
项目:browser_vuln_check    作者:lcatro    | 项目源码 | 文件源码
def url_unescape(value, encoding='utf-8', plus=True):
        """Decodes the given value from a URL.

        The argument may be either a byte or unicode string.

        If encoding is None, the result will be a byte string.  Otherwise,
        the result is a unicode string in the specified encoding.

        If ``plus`` is true (the default), plus signs will be interpreted
        as spaces (literal plus signs must be represented as "%2B").  This
        is appropriate for query strings and form-encoded values but not
        for the path component of a URL.  Note that this default is the
        reverse of Python's urllib module.

        .. versionadded:: 3.1
           The ``plus`` argument
        """
        unquote = (urllib_parse.unquote_plus if plus else urllib_parse.unquote)
        if encoding is None:
            return unquote(utf8(value))
        else:
            return unicode_type(unquote(utf8(value)), encoding)
项目:TornadoWeb    作者:VxCoder    | 项目源码 | 文件源码
def url_unescape(value, encoding='utf-8', plus=True):
        """Decodes the given value from a URL.

        The argument may be either a byte or unicode string.

        If encoding is None, the result will be a byte string.  Otherwise,
        the result is a unicode string in the specified encoding.

        If ``plus`` is true (the default), plus signs will be interpreted
        as spaces (literal plus signs must be represented as "%2B").  This
        is appropriate for query strings and form-encoded values but not
        for the path component of a URL.  Note that this default is the
        reverse of Python's urllib module.

        .. versionadded:: 3.1
           The ``plus`` argument
        """
        unquote = (urllib_parse.unquote_plus if plus else urllib_parse.unquote)
        if encoding is None:
            return unquote(utf8(value))
        else:
            return unicode_type(unquote(utf8(value)), encoding)
项目:rowgenerators    作者:Metatab    | 项目源码 | 文件源码
def _decompose_fragment(self, frag):
        """Parse the fragment component"""
        from urllib.parse import unquote_plus

        if isinstance(frag, (list, tuple)):
            return frag

        if not frag:
            return None, None

        frag_parts = unquote_plus(frag).split(';')

        if not frag_parts:
            file, segment = None, None
        elif len(frag_parts) == 1:
            file = frag_parts[0]
            segment = None
        elif len(frag_parts) >= 2:
            file = frag_parts[0]
            segment = frag_parts[1]

        return file, segment
项目:Easy-Namuwiki-Extractor    作者:j-min    | 项目源码 | 文件源码
def _clean_link(content):
    def replacement(match):
        groups = match.groups('')
        if len(groups) < 2:
            replacement = groups[0]
        else:
            replacement = groups[1] or groups[0]

        if replacement:
            if replacement == '../':
                replacement = ''

            replacement = replacement.replace('/', ' ')

        return unquote_plus(replacement)

    for pattern in _patterns['link']:
        content = pattern.sub(replacement, content)

    return content
项目:hudl-bugbounty    作者:lewislabs    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:hudl-bugbounty    作者:lewislabs    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:hudl-bugbounty    作者:lewislabs    | 项目源码 | 文件源码
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
项目:hudl-bugbounty    作者:lewislabs    | 项目源码 | 文件源码
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
项目:ProgrameFacil    作者:Gpzim98    | 项目源码 | 文件源码
def url_unescape(value, encoding='utf-8', plus=True):
        """Decodes the given value from a URL.

        The argument may be either a byte or unicode string.

        If encoding is None, the result will be a byte string.  Otherwise,
        the result is a unicode string in the specified encoding.

        If ``plus`` is true (the default), plus signs will be interpreted
        as spaces (literal plus signs must be represented as "%2B").  This
        is appropriate for query strings and form-encoded values but not
        for the path component of a URL.  Note that this default is the
        reverse of Python's urllib module.

        .. versionadded:: 3.1
           The ``plus`` argument
        """
        unquote = (urllib_parse.unquote_plus if plus else urllib_parse.unquote)
        if encoding is None:
            return unquote(utf8(value))
        else:
            return unicode_type(unquote(utf8(value)), encoding)
项目:ProgrameFacil    作者:Gpzim98    | 项目源码 | 文件源码
def url_unescape(value, encoding='utf-8', plus=True):
        """Decodes the given value from a URL.

        The argument may be either a byte or unicode string.

        If encoding is None, the result will be a byte string.  Otherwise,
        the result is a unicode string in the specified encoding.

        If ``plus`` is true (the default), plus signs will be interpreted
        as spaces (literal plus signs must be represented as "%2B").  This
        is appropriate for query strings and form-encoded values but not
        for the path component of a URL.  Note that this default is the
        reverse of Python's urllib module.

        .. versionadded:: 3.1
           The ``plus`` argument
        """
        unquote = (urllib_parse.unquote_plus if plus else urllib_parse.unquote)
        if encoding is None:
            return unquote(utf8(value))
        else:
            return unicode_type(unquote(utf8(value)), encoding)
项目:sketch-components    作者:ibhubs    | 项目源码 | 文件源码
def lambda_handler(event, context):
    for record in event['Records']:
        bucket = record['s3']['bucket']['name']
        # key = record['s3']['object']['key']
        key = unquote_plus(record['s3']['object']['key'])
        try:
            download_path = '/tmp/{}{}'.format(uuid.uuid4(), key)
            upload_path = '/tmp/out/{}'.format(key)

            s3_client.download_file(bucket, key, download_path)
            if not os.path.exists(download_path):
                raise Exception('ERROR: Unable to save file {} at {}'.format(
                    key, download_path))
            if download_path.endswith('.zip'):
                print('Processing zipfile')
                process_sketch_archive(zip_path=download_path,
                                       compress_zip=True,
                                       output_path=upload_path)
            elif download_path.endswith('.json'):
                process_sketch_dump(download_path, compress_zip=True,
                                    output_path=upload_path)
            else:
                raise Exception(
                    'ERROR: Expected zip or json file but found {}'.format(
                        key))
            if target_bucket is not None:
                s3_client.upload_file(upload_path, target_bucket, key)
                print('Uploaded {} to {}/{}'.format(upload_path, target_bucket,
                                                    key))
        except Exception as e:
            print(e)
            print('Error processing on object {} from bucket {}. '
                  'Make sure they exist and your bucket is in the '
                  'same region as this function.'.format(key, bucket))
            traceback.print_exc(file=sys.stdout)
            raise e
项目:ubi-virtual-assistant    作者:Alzemand    | 项目源码 | 文件源码
def name_file(imgUrl):
    fileName = unquote_plus(unquote_plus(unquote_plus(basename(imgUrl))))
    if os.path.exists(fileName):
        base,ext = os.path.splitext(fileName)
        print(fileName)
        print(base+'*'+ext)
        nfiles = len(glob.glob(base+'*'+ext))
        fileName = base+'_'+str(nfiles)+ext
    return fileName
项目:ubi-virtual-assistant    作者:Alzemand    | 项目源码 | 文件源码
def name_file(imgUrl):
    fileName = unquote_plus(unquote_plus(unquote_plus(basename(imgUrl))))
    if os.path.exists(fileName):
        base,ext = os.path.splitext(fileName)
        print(fileName)
        print(base+'*'+ext)
        nfiles = len(glob.glob(base+'*'+ext))
        fileName = base+'_'+str(nfiles)+ext
    return fileName
项目:Qyoutube-dl    作者:lzambella    | 项目源码 | 文件源码
def compat_urllib_parse_unquote_plus(string, encoding='utf-8', errors='replace'):
        """Like unquote(), but also replace plus signs by spaces, as required for
        unquoting HTML form values.

        unquote_plus('%7e/abc+def') -> '~/abc def'
        """
        string = string.replace('+', ' ')
        return compat_urllib_parse_unquote(string, encoding, errors)
项目:querystringsafe_base64    作者:ClearcodeHQ    | 项目源码 | 文件源码
def test_encode_result_is_agnostic_to_url_quoting(string_num):
    """Test if querystringsafe_base64.encode returns a string that does not have characters that must be quoted."""
    original = test_strings[string_num]

    # quoting and unquoting has no impact on base64dotted:
    safe_encoded = querystringsafe_base64.encode(original)
    assert safe_encoded == quote_plus(safe_encoded)
    assert safe_encoded == unquote_plus(safe_encoded)

    # base64 has to be quoted:
    url_encoded = b64encode(original)
    assert url_encoded != quote_plus(url_encoded)
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def unquote_str(value, encoding='utf-8'):
        # In python2, unquote() gives us a string back that has the urldecoded
        # bits, but not the unicode parts.  We need to decode this manually.
        # unquote has special logic in which if it receives a unicode object it
        # will decode it to latin1.  This is hard coded.  To avoid this, we'll
        # encode the string with the passed in encoding before trying to
        # unquote it.
        byte_string = value.encode(encoding)
        return unquote_plus(byte_string).decode(encoding)
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def compat_urllib_parse_unquote_plus(string, encoding='utf-8', errors='replace'):
        """Like unquote(), but also replace plus signs by spaces, as required for
        unquoting HTML form values.

        unquote_plus('%7e/abc+def') -> '~/abc def'
        """
        string = string.replace('+', ' ')
        return compat_urllib_parse_unquote(string, encoding, errors)
项目:AshsSDK    作者:thehappydinoa    | 项目源码 | 文件源码
def unquote_str(value, encoding='utf-8'):
        # In python2, unquote() gives us a string back that has the urldecoded
        # bits, but not the unicode parts.  We need to decode this manually.
        # unquote has special logic in which if it receives a unicode object it
        # will decode it to latin1.  This is hard coded.  To avoid this, we'll
        # encode the string with the passed in encoding before trying to
        # unquote it.
        byte_string = value.encode(encoding)
        return unquote_plus(byte_string).decode(encoding)
项目:optimalvibes    作者:littlemika    | 项目源码 | 文件源码
def compat_urllib_parse_unquote_plus(string, encoding='utf-8', errors='replace'):
        """Like unquote(), but also replace plus signs by spaces, as required for
        unquoting HTML form values.

        unquote_plus('%7e/abc+def') -> '~/abc def'
        """
        string = string.replace('+', ' ')
        return compat_urllib_parse_unquote(string, encoding, errors)
项目:tvalacarta    作者:tvalacarta    | 项目源码 | 文件源码
def compat_urllib_parse_unquote_plus(string, encoding='utf-8', errors='replace'):
        """Like unquote(), but also replace plus signs by spaces, as required for
        unquoting HTML form values.

        unquote_plus('%7e/abc+def') -> '~/abc def'
        """
        string = string.replace('+', ' ')
        return compat_urllib_parse_unquote(string, encoding, errors)
项目:isar    作者:ilbers    | 项目源码 | 文件源码
def apply_filter(self, filters, filter_value, **kwargs):
        """
        Apply a filter submitted in the querystring to the ToasterTable

        filters: (str) in the format:
          '<filter name>:<action name>'
        filter_value: (str) parameters to pass to the named filter

        <filter name> and <action name> are used to look up the correct filter
        in the ToasterTable's filter map; the <action params> are set on
        TableFilterAction* before its filter is applied and may modify the
        queryset returned by the filter
        """
        self.setup_filters(**kwargs)

        try:
            filter_name, action_name = filters.split(':')
            action_params = unquote_plus(filter_value)
        except ValueError:
            return

        if "all" in action_name:
            return

        try:
            table_filter = self.filter_map.get_filter(filter_name)
            action = table_filter.get_action(action_name)
            action.set_filter_params(action_params)
            self.queryset = action.filter(self.queryset)
        except KeyError:
            # pass it to the user - programming error here
            raise
项目:RSVPBot    作者:recursecenter    | 项目源码 | 文件源码
def narrow_url_to_stream_topic(url):
    parsed_url = urlparse(url)
    unquoted_fragment = unquote_plus(parsed_url.fragment.replace('.', '%'))
    split_fragment = re.split('/', unquoted_fragment)

    if len(split_fragment) < 5:
        return None, None

    stream = split_fragment[2]
    topic = split_fragment[4]
    return stream, topic
项目:cjworkbench    作者:CJWorkbench    | 项目源码 | 文件源码
def get_creds(request):
    flow = jsonpickle.decode(request.session['flow'])
    credential = flow.step2_exchange(request.GET.get('code', False))
    storage = DjangoORMStorage(GoogleCredentials, 'id', request.user, 'credential')
    storage.put(credential)
    if request.GET.get('state', False):
        return redirect(unquote_plus(request.GET.get('state')))
    else:
        return HttpResponse(status=200)
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def unquote_str(value, encoding='utf-8'):
        # In python2, unquote() gives us a string back that has the urldecoded
        # bits, but not the unicode parts.  We need to decode this manually.
        # unquote has special logic in which if it receives a unicode object it
        # will decode it to latin1.  This is hard coded.  To avoid this, we'll
        # encode the string with the passed in encoding before trying to
        # unquote it.
        byte_string = value.encode(encoding)
        return unquote_plus(byte_string).decode(encoding)
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def make_fs_jwks_bundle(iss, fo_liss, sign_keyjar, keydefs, base_path=''):
    """
    Given a list of Federation identifiers creates a FSJWKBundle containing all
    the signing keys.

    :param iss: The issuer ID of the entity owning the JWKSBundle
    :param fo_liss: List with federation identifiers as keys
    :param sign_keyjar: Keys that the JWKSBundel owner can use to sign
        an export version of the JWKS bundle.
    :param keydefs: What type of keys that should be created for each
        federation. The same for all of them.
    :param base_path: Where the pem versions of the keys are stored as files
    :return: A FSJWKSBundle instance.
    """
    jb = FSJWKSBundle(iss, sign_keyjar, 'fo_jwks',
                      key_conv={'to': quote_plus, 'from': unquote_plus})

    jb.clear()  # start from scratch

    # Need to save the private parts on disc
    jb.bundle.value_conv['to'] = keyjar_to_jwks_private

    for entity in fo_liss:
        _name = entity.replace('/', '_')
        try:
            _ = jb[entity]
        except KeyError:
            fname = os.path.join(base_path, 'keys', "{}.key".format(_name))
            _keydef = copy.deepcopy(keydefs)
            _keydef[0]['key'] = fname

            _jwks, _keyjar, _kidd = build_keyjar(_keydef)
            jb[entity] = _keyjar

    return jb
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def create_federation_entity(iss, jwks_dir, sup='', fo_jwks=None, ms_dir='',
                             entity=None, sig_keys=None, sig_def_keys=None):
    fname = os.path.join(ms_dir, quote_plus(sup))

    if fo_jwks:
        _keybundle = FSJWKSBundle('', fdir=fo_jwks,
                                  key_conv={'to': quote_plus,
                                            'from': unquote_plus})

        # Organisation information
        _kj = _keybundle[sup]
        signer = Signer(InternalSigningService(sup, _kj), ms_dir=fname)
    else:
        signer = Signer(ms_dir=fname)

    # And then the FOs public keys
    _public_keybundle = FSJWKSBundle('', fdir=jwks_dir,
                                     key_conv={'to': quote_plus,
                                               'from': unquote_plus})

    # The OPs own signing keys
    if sig_keys is None:
        sig_keys = build_keyjar(sig_def_keys)[1]

    return FederationEntity(entity, iss=iss, keyjar=sig_keys, signer=signer,
                            fo_bundle=_public_keybundle)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_unquoting_parts(self):
        # Make sure unquoting works when have non-quoted characters
        # interspersed
        given = 'ab%sd' % hexescape('c')
        expect = "abcd"
        result = urllib_parse.unquote(given)
        self.assertEqual(expect, result,
                         "using quote(): %r != %r" % (expect, result))
        result = urllib_parse.unquote_plus(given)
        self.assertEqual(expect, result,
                         "using unquote_plus(): %r != %r" % (expect, result))