Python urlparse 模块,unquote() 实例源码

我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用urlparse.unquote()

项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def do_request(self, url, params=None, timeout=None):
        """Performs the HTTP request, signed with OAuth.

        :param timeout: optional request timeout, in seconds.
        :type timeout: float

        @return: the response content
        """

        req = self.session.post(url,
                                data=params,
                                auth=self.oauth,
                                timeout=timeout or self.default_timeout)

        # check the response headers / status code.
        if req.status_code != 200:
            self.log.error('do_request: Status code %i received, content:', req.status_code)

            for part in req.text.split('&'):
                self.log.error('    %s', urllib_parse.unquote(part))

            raise exceptions.FlickrError('do_request: Status code %s received' % req.status_code)

        return req.content
项目:DjanGoat    作者:Contrast-Security-OSS    | 项目源码 | 文件源码
def api_index(request):
    if check_if_valid_token(request):
        try:
            token = urlparse.unquote(request.META['HTTP_AUTHORIZATION'])
            user_id = extrapolate_user(token)
            user = User.objects.get(user_id=user_id)
            if user.is_admin:
                data = serializers.serialize("json", User.objects.all())
                return HttpResponse(data, content_type='application/json')
            else:
                data = serializers.serialize("json", User.objects.filter(user_id=user_id))
                return HttpResponse(data, content_type='application/json')
        except User.DoesNotExist:
            return HttpResponse("null", content_type='application/json')
    else:
        return HttpResponse('Unauthorized', status=401)
项目:DjanGoat    作者:Contrast-Security-OSS    | 项目源码 | 文件源码
def check_if_valid_token(request):
    if 'HTTP_AUTHORIZATION' not in request.META:
        return False
    else:
        token = urlparse.unquote(request.META['HTTP_AUTHORIZATION'])
        regex = re.compile("(.*?)-(.*)")
        split_token = token.split('=')[1]
        regex_groups = regex.search(split_token)
        if regex_groups.group(1):
            id = regex_groups.group(1)
        else:
            return False
        if regex_groups.group(2):
            hash = regex_groups.group(2)
        else:
            return False

    sha = SHA.new()
    sha.update(ACCESS_TOKEN_SALT + ":" + str(id))
    return hash == sha.hexdigest()
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def mkfn (text):
    text = unquote (text)
    return NAFN.sub ("_", text)
项目:refextract    作者:inspirehep    | 项目源码 | 文件源码
def identify_and_tag_DOI(line):
    """takes a single citation line and attempts to locate any DOI references.
       DOI references are recognised in both http (url) format and also the
       standard DOI notation (DOI: ...)
       @param line: (string) the reference line in which to search for DOI's.
       @return: the tagged line and a list of DOI strings (if any)
    """
    # Used to hold the DOI strings in the citation line
    doi_strings = []

    # Run the DOI pattern on the line, returning the re.match objects
    matched_doi = re_doi.finditer(line)
    # For each match found in the line
    for match in reversed(list(matched_doi)):
        # Store the start and end position
        start = match.start()
        end = match.end()
        # Get the actual DOI string (remove the url part of the doi string)
        doi_phrase = match.group('doi')
        if '%2f' in doi_phrase.lower():
            doi_phrase = unquote(doi_phrase)

        # Replace the entire matched doi with a tag
        line = line[0:start] + "<cds.DOI />" + line[end:]
        # Add the single DOI string to the list of DOI strings
        doi_strings.append(doi_phrase)

    doi_strings.reverse()
    return line, doi_strings
项目:Deploy_XXNET_Server    作者:jzp820927    | 项目源码 | 文件源码
def CgiDictFromParsedUrl(url):
  """Extract CGI variables from a parsed url into a dict.

  Returns a dict containing the following CGI variables for the provided url:
  SERVER_PORT, QUERY_STRING, SERVER_NAME and PATH_INFO.

  Args:
    url: An instance of urlparse.SplitResult.

  Returns:
    A dict containing the CGI variables derived from url.
  """
  environ = {}
  if url.port is not None:
    environ['SERVER_PORT'] = str(url.port)
  elif url.scheme == 'https':
    environ['SERVER_PORT'] = '443'
  elif url.scheme == 'http':
    environ['SERVER_PORT'] = '80'
  environ['QUERY_STRING'] = url.query
  environ['SERVER_NAME'] = url.hostname
  if url.path:
    environ['PATH_INFO'] = urlparse.unquote(url.path)
  else:
    environ['PATH_INFO'] = '/'
  return environ
项目:deb-python-httpretty    作者:openstack    | 项目源码 | 文件源码
def unquote_utf8(qs):
        if isinstance(qs, text_type):
            qs = qs.encode('utf-8')
        s = unquote(qs)
        if isinstance(s, byte_type):
            return s.decode("utf-8")
        else:
            return s
项目:amqppy    作者:marceljanerfont    | 项目源码 | 文件源码
def parse_url(url):
    scheme = urlparse.urlparse(url).scheme
    schemeless = url[len(scheme) + 3:]
    # parse with HTTP URL semantics
    parts = urlparse.urlparse('http://' + schemeless)
    path = parts.path or ''
    path = path[1:] if path and path[0] == '/' else path
    return dict(
        transport=scheme,
        host=urlparse.unquote(parts.hostname or '') or None,
        port=parts.port or amqppy.DEFAULT_PORT,
        username=urlparse.unquote(parts.username or '') or None,
        password=urlparse.unquote(parts.password or '') or None,
        virtual_host=urlparse.unquote(path or '') or "/",
        **dict(urlparse.parse_qsl(parts.query)))
项目:plugin.video.9anime    作者:DxCx    | 项目源码 | 文件源码
def PLAY_SOURCE(payload):
    return control.play_source(urlparse.unquote(payload))
项目:DjanGoat    作者:Contrast-Security-OSS    | 项目源码 | 文件源码
def api(request, id_number):
    if check_if_valid_token(request):
        token = urlparse.unquote(request.META['HTTP_AUTHORIZATION'])
        user_id = extrapolate_user(token)
        data = serializers.serialize("json", User.objects.filter(user_id=user_id))
        return HttpResponse(data, content_type='application/json')
    else:
        return HttpResponse('Unauthorized', status=401)


# This is purposely vulnerable see - https://github.com/OWASP/railsgoat/wiki/Extras:-Broken-Regular-Expression
项目:kel-identity    作者:kelproject    | 项目源码 | 文件源码
def db_url_parse(url, engine=None, conn_max_age=0):
    """
    Parses a database URL.
    """
    if url == "sqlite://:memory:":
        # urlparse will choke on :memory:
        return {
            "ENGINE": DATABASE_ENGINE_SCHEMES["sqlite"],
            "NAME": ":memory:",
        }

    config = {}
    url = urlparse.urlparse(url)

    # split query strings from path
    path = url.path[1:]
    if "?" in path and not url.query:
        path, query = path.split("?", 2)
    else:
        path, query = path, url.query
    query = urlparse.parse_qs(query)

    # sqlite with no path should assume :memory: (sqlalchemy behavior)
    if url.scheme == "sqlite" and path == "":
        path = ":memory:"

    # handle postgresql percent-encoded paths
    hostname = url.hostname or ""
    if "%2f" in hostname.lower():
        hostname = hostname.replace("%2f", "/").replace("%2F", "/")

    config.update({
        "NAME": urlparse.unquote(path or ""),
        "USER": urlparse.unquote(url.username or ""),
        "PASSWORD": urlparse.unquote(url.password or ""),
        "HOST": hostname,
        "PORT": url.port or "",
        "CONN_MAX_AGE": conn_max_age,
    })

    engine = DATABASE_ENGINE_SCHEMES[url.scheme] if engine is None else engine

    # pass the query string into OPTIONS
    options = {}
    for key, values in query.items():
        if url.scheme == "mysql" and key == "ssl-ca":
            options["ssl"] = {"ca": values[-1]}
            continue
        options[key] = values[-1]

    # postgresql schema URLs
    if "currentSchema" in options and engine == "django.db.backends.postgresql_psycopg2":
        options["options"] = "-c search_path={0}".format(options["currentSchema"])

    if options:
        config["OPTIONS"] = options
    if engine:
        config["ENGINE"] = engine

    return config
项目:transfer    作者:viur-framework    | 项目源码 | 文件源码
def get(self, path="/", *args, **kwargs):
        self.response.headers['Content-Type'] = 'text/plain'
        # Prevent Hash-collision attacks
        assert len(self.request.arguments()) < 50
        # Fill the (surprisingly empty) kwargs dict with named request params
        tmpArgs = dict((k, self.request.get_all(k)) for k in self.request.arguments())
        for key in tmpArgs.keys()[:]:
            if len(tmpArgs[key]) == 0:
                continue
            if not key in kwargs.keys():
                if len(tmpArgs[key]) == 1:
                    kwargs[key] = tmpArgs[key][0]
                else:
                    kwargs[key] = tmpArgs[key]
            else:
                if isinstance(kwargs[key], list):
                    kwargs[key] = kwargs[key] + tmpArgs[key]
                else:
                    kwargs[key] = [kwargs[key]] + tmpArgs[key]
        del tmpArgs
        if "self" in kwargs.keys():  # self is reserved for bound methods
            raise NotImplementedError()
        path = urlparse.urlparse(path).path
        pathlist = [urlparse.unquote(x) for x in path.strip("/").split("/")]
        if len(pathlist) < 2:
            raise NotImplementedError()
        tfunc = pathlist[1]
        pathlist = pathlist[2:]
        if tfunc == "exportDb":
            self.response.write(self.exportDb(*pathlist, **kwargs))
        elif tfunc == "exportBlob":
            self.response.write(self.exportBlob(*pathlist, **kwargs))
        elif tfunc == "download":
            self.response.write(self.download(*pathlist, **kwargs))
        elif tfunc == "info":
            self.response.write(self.info(*pathlist, **kwargs))
        elif tfunc == "listCursors":
            self.response.write(self.listCursors(*pathlist, **kwargs))
        elif tfunc == "listKinds":
            self.response.write(self.listKinds(*pathlist, **kwargs))
        elif tfunc == "_ah":
            pass
        else:
            raise NotImplementedError()
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def do_upload(self, filename, url, params=None, fileobj=None, timeout=None):
        """Performs a file upload to the given URL with the given parameters, signed with OAuth.

        :param timeout: optional request timeout, in seconds.
        :type timeout: float

        @return: the response content
        """

        # work-around to allow non-ascii characters in file name
        # Flickr doesn't store the name but does use it as a default title
        if 'title' not in params:
            params['title'] = os.path.basename(filename).encode('utf8')

        # work-around for Flickr expecting 'photo' to be excluded
        # from the oauth signature:
        #   1. create a dummy request without 'photo'
        #   2. create real request and use auth headers from the dummy one
        dummy_req = requests.Request('POST', url, data=params,
                                     auth=self.oauth)

        prepared = dummy_req.prepare()
        headers = prepared.headers
        self.log.debug('do_upload: prepared headers = %s', headers)

        if not fileobj:
            fileobj = open(filename, 'rb')
        params['photo'] = ('dummy name', fileobj)
        m = MultipartEncoder(fields=params)
        auth = {'Authorization': headers.get('Authorization'),
                'Content-Type': m.content_type}
        self.log.debug('POST %s', auth)
        req = self.session.post(url, data=m, headers=auth, timeout=timeout or self.default_timeout)

        # check the response headers / status code.
        if req.status_code != 200:
            self.log.error('do_upload: Status code %i received, content:', req.status_code)

            for part in req.text.split('&'):
                self.log.error('    %s', urllib_parse.unquote(part))

            raise exceptions.FlickrError('do_upload: Status code %s received' % req.status_code)

        return req.content
项目:server    作者:viur-framework    | 项目源码 | 文件源码
def selectLanguage( self, path ):
        """
            Tries to select the best language for the current request.
        """
        if translations is None:
            # This project doesn't use the multi-language feature, nothing to do here
            return( path )
        if conf["viur.languageMethod"] == "session":
            # We store the language inside the session, try to load it from there
            if not session.current.getLanguage():
                if "X-Appengine-Country" in self.request.headers.keys():
                    lng = self.request.headers["X-Appengine-Country"].lower()
                    if lng in conf["viur.availableLanguages"]+list( conf["viur.languageAliasMap"].keys() ):
                        session.current.setLanguage( lng )
                        self.language = lng
                    else:
                        session.current.setLanguage( conf["viur.defaultLanguage"] )
            else:
                self.language = session.current.getLanguage()
        elif conf["viur.languageMethod"] == "domain":
            host = self.request.host_url.lower()
            host = host[ host.find("://")+3: ].strip(" /") #strip http(s)://
            if host.startswith("www."):
                host = host[ 4: ]
            if host in conf["viur.domainLanguageMapping"].keys():
                self.language = conf["viur.domainLanguageMapping"][ host ]
            else: # We have no language configured for this domain, try to read it from session
                if session.current.getLanguage():
                    self.language = session.current.getLanguage()
        elif conf["viur.languageMethod"] == "url":
            tmppath = urlparse.urlparse( path ).path
            tmppath = [ urlparse.unquote( x ) for x in tmppath.lower().strip("/").split("/") ]
            if len( tmppath )>0 and tmppath[0] in conf["viur.availableLanguages"]+list( conf["viur.languageAliasMap"].keys() ):
                self.language = tmppath[0]
                return( path[ len( tmppath[0])+1: ] ) #Return the path stripped by its language segment
            else: # This URL doesnt contain an language prefix, try to read it from session
                if session.current.getLanguage():
                    self.language = session.current.getLanguage()
                elif "X-Appengine-Country" in self.request.headers.keys():
                    lng = self.request.headers["X-Appengine-Country"].lower()
                    if lng in conf["viur.availableLanguages"] or lng in conf["viur.languageAliasMap"]:
                        self.language = lng
        return( path )