Python urllib.parse 模块,quote() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.quote()

项目:mooder    作者:phith0n    | 项目源码 | 文件源码
def get(self, request, *args, **kwargs):
        post = get_object_or_404(self.get_queryset(), pk=self.kwargs['pk'])
        if request.user.is_superuser or request.user.has_perm('archives.change_post') or post.author_id == request.user.id:
            pass
        elif post.visible == 'private' or post.visible == 'sell' and not post.buyers.filter(id=request.user.id).exists():
            raise Http404

        chunk_size = 8192
        response = StreamingHttpResponse(FileWrapper(open(post.attachment.path, 'rb'), chunk_size),
                                         content_type='application/octet-stream')
        response['Content-Length'] = post.attachment.size

        filename = post.attachment_filename if post.attachment_filename else 'attachment'
        response["Content-Disposition"] = \
            "attachment; " \
            "filenane={ascii_filename};" \
            "filename*=UTF-8''{utf_filename}".format(
                ascii_filename=quote(filename),
                utf_filename=quote(filename)
            )
        return response
项目:graph    作者:noxern    | 项目源码 | 文件源码
def slack(text: hug.types.text):
    """Returns JSON containing an attachment with an image url for the Slack integration"""
    title = text

    if text == 'top250':
        top250_res = requests.get(IMDB_URL + '/chart/toptv', headers={'Accept-Language': 'en'})
        top250_page = html.fromstring(top250_res.text)
        candidates = top250_page.xpath('//*[@data-caller-name="chart-top250tv"]//tr/td[2]/a')

        title = random.choice(candidates).text

    return dict(
        response_type='in_channel',
        attachments=[
            dict(image_url=GRAPH_URL + f'/graph?title={quote(title)}&uuid={uuid.uuid4()}')
        ]
    )
项目:dabdabrevolution    作者:harryparkdotio    | 项目源码 | 文件源码
def urlparts(self):
        """ The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. """
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') \
             or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:pbtk    作者:marin-m    | 项目源码 | 文件源码
def produce(obj, pb, sep):
    for ds, val in pb.ListFields():
        for val in (val if ds.label == ds.LABEL_REPEATED else [val]):

            if ds.cpp_type == ds.CPPTYPE_MESSAGE:
                origlen = len(obj)
                produce(obj, val, sep)
                obj.insert(origlen, '%dm%d' % (ds.number, len(obj) - origlen))
                continue

            elif ds.type == ds.TYPE_STRING:
                if sep == '!':
                    val = val.replace('*', '*2A').replace('!', '*21')
                else:
                    val = quote(val, safe='~()*!.\'')

            elif ds.type == ds.TYPE_BYTES:
                val = urlsafe_b64encode(val).decode('ascii').strip('=')

            elif ds.type == ds.TYPE_BOOL:
                val = int(val)

            obj.append('%d%s%s' % (ds.number, types_enc[ds.type], val))

    return obj
项目:WikiExtractor_To_the_one_text    作者:j-min    | 项目源码 | 文件源码
def makeInternalLink(title, label):
    colon = title.find(':')
    if colon > 0 and title[:colon] not in acceptedNamespaces:
        return ''
    if colon == 0:
        # drop also :File:
        colon2 = title.find(':', colon + 1)
        if colon2 > 1 and title[colon + 1:colon2] not in acceptedNamespaces:
            return ''
    if Extractor.keepLinks:
        return '<a href="%s">%s</a>' % (quote(title.encode('utf-8')), label)
    else:
        return label


# ----------------------------------------------------------------------
# External links

# from: https://doc.wikimedia.org/mediawiki-core/master/php/DefaultSettings_8php_source.html
项目:AlexaPi    作者:alexa-pi    | 项目源码 | 文件源码
def code(self, var=None, **params):     # pylint: disable=unused-argument
        code = quote(cherrypy.request.params['code'])
        callback = cherrypy.url()
        payload = {
            "client_id": config['alexa']['Client_ID'],
            "client_secret": config['alexa']['Client_Secret'],
            "code": code,
            "grant_type": "authorization_code",
            "redirect_uri": callback
        }
        url = "https://api.amazon.com/auth/o2/token"
        response = requests.post(url, data=payload)
        resp = response.json()

        alexapi.config.set_variable(['alexa', 'refresh_token'], resp['refresh_token'])

        return "<h2>Success!</h2>" \
                "<p>The refresh token has been added to your config file.</p>" \
                "<p>Now:</p>" \
                "<ul>" \
                "<li>close your this browser window,</li>" \
                "<li>exit the setup script as indicated,</li>" \
                "<li>and follow the Post-installation steps.</li>" \
                "</ul>"
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _string_expansion(self, name, value, explode, prefix):
        if value is None:
            return None

        tuples, items = is_list_of_tuples(value)

        if list_test(value) and not tuples:
            return ','.join(quote(v, self.safe) for v in value)

        if dict_test(value) or tuples:
            items = items or sorted(value.items())
            format_str = '%s=%s' if explode else '%s,%s'

            return ','.join(
                format_str % (
                    quote(k, self.safe), quote(v, self.safe)
                ) for k, v in items
            )

        value = value[:prefix] if prefix else value
        return quote(value, self.safe)
项目:Mmrz-Sync    作者:zhanglintc    | 项目源码 | 文件源码
def urlparts(self):
        ''' The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. '''
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:ynm3k    作者:socrateslee    | 项目源码 | 文件源码
def urlparts(self):
        ''' The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. '''
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def urlparts(self):
        """ The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. """
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:warriorframework    作者:warriorframework    | 项目源码 | 文件源码
def urlparts(self):
        """ The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. """
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') \
             or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:Jobs-search    作者:Hopetree    | 项目源码 | 文件源码
def __init__(self,daname,mykey,mycity):
        self.dbname = daname
        self.key = mykey
        self.city = mycity
        self.start_url = "http://sou.zhaopin.com/jobs/searchresult.ashx?jl={}&kw={}&p=1".format(quote(self.city), quote(self.key))
        self.headers = {
            "Accept":"text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
            "Accept-Encoding":"gzip, deflate, sdch",
            "Accept-Language":"zh-CN,zh;q=0.8,mt;q=0.6",
            "Cache-Control":"max-age=0",
            "Connection":"keep-alive",
            "Host":"sou.zhaopin.com",
            "Referer":"http://www.zhaopin.com/",
            "Upgrade-Insecure-Requests":"1",
            "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/57.0.2987.110 Safari/537.36"
        }
        self.mysql = get_Mysql(self.dbname,self.key,self.city)
        self.mysql.create_table()

    # ?????????,???????????????????????
项目:Jobs-search    作者:Hopetree    | 项目源码 | 文件源码
def parse(self, response):
        baseurl = 'http://search.51job.com/jobsearch/search_result.php?fromJs=1&jobarea={}&keyword={}&keywordtype=2&' \
                  'lang=c&stype=2&postchannel=0000&fromType=1&confirmdate=9'
        city_dict = settings['CITY_DICT']
        jobname = settings['JOBNAME']
        citys = settings['CITYS']
        if len(citys) == 1:
            self.logger.info("??????{}".format(citys[0]))
            citynum = city_dict[citys[0]]
        elif len(citys) > 1:
            self.logger.info("??????{}".format("&".join(citys)))
            lis = [city_dict[c] for c in citys]
            citynum = ",".join(lis)
        else:
            self.logger.info("??????????????")
            citynum = ""
        the_url = baseurl.format(quote(citynum),quote(jobname))
        for each_url in [the_url]:
            yield scrapy.Request(url=each_url,callback=self.parse_urls)
项目:wikipedia_multilang    作者:ivanvladimir    | 项目源码 | 文件源码
def makeInternalLink(title, label):
    colon = title.find(':')
    if colon > 0 and title[:colon] not in options.acceptedNamespaces:
        return ''
    if colon == 0:
        # drop also :File:
        colon2 = title.find(':', colon + 1)
        if colon2 > 1 and title[colon + 1:colon2] not in options.acceptedNamespaces:
            return ''
    if options.keepLinks:
        return '<a href="%s">%s</a>' % (quote(title.encode('utf-8')), label)
    else:
        return label


# ----------------------------------------------------------------------
# External links

# from: https://doc.wikimedia.org/mediawiki-core/master/php/DefaultSettings_8php_source.html
项目:nzb-monkey    作者:nzblnk    | 项目源码 | 文件源码
def search_nzb_url(self):
        """Search for NZB Download URL and return the URL
        :return bool, str: """
        try:
            self.header = self.header.replace('_', ' ')
            res = requests.get(self.search_url.format(quote(self.header, encoding='utf-8')),
                               timeout=REQUESTS_TIMEOUT, headers={'Cookie': 'agreed=true'}, verify=False)
        except requests.exceptions.Timeout:
            print(Col.WARN + ' Timeout' + Col.OFF, flush=True)
            return False, None
        except requests.exceptions.ConnectionError:
            print(Col.WARN + ' Connection Error' + Col.OFF, flush=True)
            return False, None

        m = re.search(self.regex, res.text)
        if m is None:
            print(Col.WARN + ' NOT FOUND' + Col.OFF, flush=True)
            return False, None

        self.nzb_url = self.download_url.format(**m.groupdict())

        return True, self.nzb_url
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def url_to_path_and_args(url, no_query_string=False):
    if no_query_string:
        url = url.replace('?', '%3F').replace('#', '%23')
    components = urlsplit(url)
    path = components.path
    if no_query_string:
        path = unquote(path)
        # ??????? CEIBA ? %3F ??????????
        # ??????? CEIBA ? %253F ??????????
        # ?? ceiba_dl.Request ?????????????????????
        quote_test = path.replace('?', '').replace('#', '').replace(' ', '')
        if quote(quote_test) != quote_test:
            path = path.replace('?', '%3F').replace('#', '%23')
        args = {}
    else:
        query_string = components.query
        args = parse_qs(query_string, keep_blank_values=True)
        for key, value in args.items():
            if isinstance(value, list):
                assert len(value) == 1
                args[key] = value[0]
    return (path, args)

# lxml ????????? None??????????
项目:Inkxbot    作者:InkxtheSquid    | 项目源码 | 文件源码
def hearthwiki(self, title, ctx):
        """Returns a hearthstone wiki page: ,hearthwiki 'card name'"""
        url = 'http://hearthstone.wikia.com/wiki/' + urlquote(title)

        typetochan = ctx.message.channel
        async with aiohttp.get(url) as resp:
            if resp.status == 404:
                await self.bot.send_typing(typetochan)
                await asyncio.sleep(1)
                await self.bot.say('Could not find your page. Try a search:\n{0.url}'.format(resp))
            elif resp.status == 200:
                await self.bot.send_typing(typetochan)
                await asyncio.sleep(1)
                await self.bot.say(resp.url)
            elif resp.status == 502:
                await self.bot.send_typing(typetochan)
                await asyncio.sleep(1)
                await self.bot.say('Seems like the Hearthstone Wiki is taking too long to respond. Try again later.')
            else:
                await self.bot.send_typing(typetochan)
                await self.bot.say('An error has occurred of status code {0.status} happened. Tell Inkx.'.format(resp))
项目:Inkxbot    作者:InkxtheSquid    | 项目源码 | 文件源码
def deswiki(self, title, ctx):
        """Returns a Destinypedia page: ,deswiki 'Ghost'"""
        url = 'http://destiny.wikia.com/wiki/' + urlquote(title)
        typetochan = ctx.message.channel
        async with aiohttp.get(url) as resp:
            if resp.status == 404:
                await self.bot.send_typing(typetochan)
                await asyncio.sleep(1)
                await self.bot.say('Could not find your page. Try a search:\n{0.url}'.format(resp))
            elif resp.status == 200:
                await self.bot.send_typing(typetochan)
                await asyncio.sleep(1)
                await self.bot.say(resp.url)
            elif resp.status == 502:
                await self.bot.send_typing(typetochan)
                await asyncio.sleep(1)
                await self.bot.say('Seems like the Destinypedia is taking too long to respond. Try again later.')
            else:
                await self.bot.send_typing(typetochan)
                await self.bot.say('An error has occurred of status code {0.status} happened. Tell Inkx.'.format(resp))
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def build_request_url(self, token):
        parameters = {}
        if token:
            parameters['auth'] = token
        for param in list(self.build_query):
            if type(self.build_query[param]) is str:
                parameters[param] = quote('"' + self.build_query[param] + '"')
            elif type(self.build_query[param]) is bool:
                parameters[param] = "true" if self.build_query[param] else "false"
            else:
                parameters[param] = self.build_query[param]
        # reset path and build_query for next query
        request_ref = '{0}{1}.json?{2}'.format(self.database_url, self.path, urlencode(parameters))
        self.path = ""
        self.build_query = {}
        return request_ref
项目:pandas-profiling    作者:JosPolfliet    | 项目源码 | 文件源码
def histogram(series, **kwargs):
    """Plot an histogram of the data.

    Parameters
    ----------
    series: Series, default None
        The data to plot.

    Returns
    -------
    str, The resulting image encoded as a string.
    """
    imgdata = BytesIO()
    plot = _plot_histogram(series, **kwargs)
    plot.figure.subplots_adjust(left=0.15, right=0.95, top=0.9, bottom=0.1, wspace=0, hspace=0)
    plot.figure.savefig(imgdata)
    imgdata.seek(0)
    result_string = 'data:image/png;base64,' + quote(base64.b64encode(imgdata.getvalue()))
    # TODO Think about writing this to disk instead of caching them in strings
    plt.close(plot.figure)
    return result_string
项目:django-eventstream    作者:fanout    | 项目源码 | 文件源码
def publish_event(channel, event_type, data, pub_id, pub_prev_id,
        skip_user_ids=[]):
    content_filters = []
    if pub_id:
        event_id = '%I'
        content_filters.append('build-id')
    else:
        event_id = None
    content = sse_encode_event(event_type, data, event_id=event_id, escape=bool(pub_id))
    meta = {}
    if skip_user_ids:
        meta['skip_users'] = ','.join(skip_user_ids)
    publish(
        'events-%s' % quote(channel),
        HttpStreamFormat(content, content_filters=content_filters),
        id=pub_id,
        prev_id=pub_prev_id,
        meta=meta)
项目:twittershade    作者:nicolavic98    | 项目源码 | 文件源码
def encode_params(self, base_url, method, params):
        params = params.copy()

        if self.token:
            params['oauth_token'] = self.token

        params['oauth_consumer_key'] = self.consumer_key
        params['oauth_signature_method'] = 'HMAC-SHA1'
        params['oauth_version'] = '1.0'
        params['oauth_timestamp'] = str(int(time()))
        params['oauth_nonce'] = str(getrandbits(64))

        enc_params = urlencode_noplus(sorted(params.items()))

        key = self.consumer_secret + "&" + urllib_parse.quote(self.token_secret, safe='~')

        message = '&'.join(
            urllib_parse.quote(i, safe='~') for i in [method.upper(), base_url, enc_params])

        signature = (base64.b64encode(hmac.new(
                    key.encode('ascii'), message.encode('ascii'), hashlib.sha1)
                                      .digest()))
        return enc_params + "&" + "oauth_signature=" + urllib_parse.quote(signature, safe='~')
项目:twittershade    作者:nicolavic98    | 项目源码 | 文件源码
def __call__(self, twitter, options):
        # We need to be pointing at search.twitter.com to work, and it is less
        # tangly to do it here than in the main()
        twitter.domain = "search.twitter.com"
        twitter.uriparts = ()
        # We need to bypass the TwitterCall parameter encoding, so we
        # don't encode the plus sign, so we have to encode it ourselves
        query_string = "+".join(
            [quote(term)
             for term in options['extra_args']])

        results = twitter.search(q=query_string)['results']
        f = get_formatter('search', options)
        for result in results:
            resultStr = f(result, options)
            if resultStr.strip():
                printNicely(resultStr)
项目:mugen    作者:PeterDing    | 项目源码 | 文件源码
def form_encode(data):
    '''
    form-encode data
    '''

    assert isinstance(data, dict), 'data must be dict like'

    enc_data = '&'.join(
        ['{}={}'.format(
            k, url_quote(
                v if isinstance(v, str)
                else json.dumps(v, ensure_ascii=False)
                )) for k, v in data.items()
        ]
    )
    return enc_data
项目:fgc    作者:mpaulweeks    | 项目源码 | 文件源码
def urlparts(self):
        ''' The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. '''
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:deb-python-lesscpy    作者:openstack    | 项目源码 | 文件源码
def sformat(self, string, *args):
        """ String format.
        args:
            string (str): string to format
            args (list): format options
        returns:
            str
        """
        format = string
        items = []
        m = re.findall('(%[asdA])', format)
        if m and not args:
            raise SyntaxError('Not enough arguments...')
        i = 0
        for n in m:
            v = {
                '%A': urlquote,
                '%s': utility.destring,
            }.get(n, str)(args[i])
            items.append(v)
            i += 1
        format = format.replace('%A', '%s')
        format = format.replace('%d', '%s')
        return format % tuple(items)
项目:Orator-Google-App-Engine    作者:MakarenaLabs    | 项目源码 | 文件源码
def urlparts(self):
        ''' The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. '''
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def compare_by_title(self, title: str) -> bool:

        r = requests.get(urljoin(constants.main_url, 'search/') + quote(title),
                         headers=self.settings.requests_headers, timeout=self.settings.timeout_timer)

        r.encoding = 'utf-8'
        soup_1 = BeautifulSoup(r.text, 'html.parser')

        matches_links = set()

        # content-row manga row
        for gallery in soup_1.find_all("div", class_=re.compile("content-row")):
            link_container = gallery.find("a", class_="content-title")
            if link_container:
                matches_links.add(urljoin(constants.main_url, link_container['href']))

        self.gallery_links = list(matches_links)
        if len(self.gallery_links) > 0:
            self.found_by = self.name
            return True
        else:
            return False
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def archive_download(request: HttpRequest, pk: int) -> HttpResponse:
    try:
        archive = Archive.objects.get(pk=pk)
    except Archive.DoesNotExist:
        raise Http404("Archive does not exist")
    if not archive.public and not request.user.is_authenticated:
        raise Http404("Archive is not public")
    if 'HTTP_X_FORWARDED_HOST' in request.META:
        response = HttpResponse()
        response["Content-Type"] = "application/zip"
        response["Content-Disposition"] = 'attachment; filename*=UTF-8\'\'{0}'.format(
            archive.pretty_name)
        response['X-Accel-Redirect'] = "/download/{0}".format(quote(archive.zipped.name)).encode('utf-8')
        return response
    else:
        return HttpResponseRedirect(archive.zipped.url)
项目:NebulaSolarDash    作者:toddlerya    | 项目源码 | 文件源码
def urlparts(self):
        """ The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. """
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') \
             or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:bottle_beginner    作者:denzow    | 项目源码 | 文件源码
def urlparts(self):
        """ The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. """
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') \
             or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:MCSManager-fsmodule    作者:Suwings    | 项目源码 | 文件源码
def urlparts(self):
        """ The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. """
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') \
             or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:wego    作者:wegostudio    | 项目源码 | 文件源码
def get_code_url(self, redirect_url, state):
        """
        Get the url which 302 jump back and bring a code.

        :param redirect_url: Jump back url
        :param state: Jump back state
        :return: url
        """

        state = quote(state)
        redirect_url = quote(self.settings.REGISTER_URL + redirect_url[1:])

        url = ('https://open.weixin.qq.com/connect/oauth2/authorize?' +
               'appid=%s&redirect_uri=%s' +
               '&response_type=code' +
               '&scope=snsapi_userinfo' +
               '&state=%s#wechat_redirect') % (self.settings.APP_ID, redirect_url, state)

        return url
项目:hubot-no-js    作者:elespike    | 项目源码 | 文件源码
def execute(**kwargs):
    command   = kwargs['command'  ]
    arguments = kwargs['arguments']
    direct    = kwargs['direct'   ]

    if not direct or not arguments:
        return

    data = ' '.join(arguments[:-1])
    algo = arguments[-1].lower()

    BASE64 = 'base64'
    URL    = 'url'
    algorithms = {
        BASE64: b64e,
        URL   : quote,
    }

    if algo not in algorithms:
        print('Unknown algorithm: {}'.format(algo))
        return

    print(algorithms[algo](data))
项目:voucherify-python-sdk    作者:voucherifyio    | 项目源码 | 文件源码
def redeem(self, code, tracking_id=None):
        context = {}

        if code and isinstance(code, dict):
            context = code
            code = context['voucher']
            del context['voucher']

        path = '/vouchers/' + quote(code) + '/redemption'

        if tracking_id:
            path = path + '?' + urlencode({'tracking_id': tracking_id})

        return self.request(
            path,
            method='POST',
            data=json.dumps(context),
        )
项目:backpack.py    作者:Zwork101    | 项目源码 | 文件源码
def add_listing_alert(self, intent, type, item_raw_name, blanket=1, craftable=True):
        url = Notifications.ITEM_ALERT+ type +'/'+ parse.quote(item_raw_name) + '/Tradable/'
        data = {
            "user-id": self.cookies['user-id'],
            "item_name":type + ' ' + item_raw_name,
            "intent":intent,
            "blanket":blanket
        }
        if craftable:
            url += 'Craftable'
        else:
            url += 'Non-Craftable'
        headers = Notifications.gen_headers('/classifieds/subscriptions', url, 'PUT')
        r = requests.Request('PUT', Notifications.ITEM_ALERT, data=data, headers=headers, cookies=self.cookies)
        prepped = r.prepare()
        return self._session.send(prepped)
项目:socialauth    作者:emilyhorsman    | 项目源码 | 文件源码
def get_access_token(self):
        qs = 'client_id={}&redirect_uri={}&client_secret={}&code={}'
        qs = qs.format(
            self.client_id,
            quote(self.request_url),
            self.client_secret,
            self.params.get('code'))

        url = 'https://graph.facebook.com/{}/oauth/access_token?{}'
        url = url.format(self.api_version, qs)

        resp, content = httplib2.Http().request(url, 'GET')
        if resp.status != 200:
            raise Error('{} from Facebook'.format(resp.status))

        res = json.loads(content.decode('utf-8'))
        access_token = res.get('access_token', None)
        if access_token is None:
            raise Error('No access token from Facebook')

        self.access_token = access_token
        return access_token
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def application_uri(environ):
    """Return the application's base URI (no PATH_INFO or QUERY_STRING)"""
    url = environ['wsgi.url_scheme']+'://'
    from urllib.parse import quote

    if environ.get('HTTP_HOST'):
        url += environ['HTTP_HOST']
    else:
        url += environ['SERVER_NAME']

        if environ['wsgi.url_scheme'] == 'https':
            if environ['SERVER_PORT'] != '443':
                url += ':' + environ['SERVER_PORT']
        else:
            if environ['SERVER_PORT'] != '80':
                url += ':' + environ['SERVER_PORT']

    url += quote(environ.get('SCRIPT_NAME') or '/')
    return url
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def http_error_302(self, req, fp, code, msg, headers):
        """store "Location" HTTP response header
        :return: http
        """
        self.location = headers.get('Location', '')
        uprint("headers['Location']=" + self.location)

        def squote(s):
            return urllib.parse.quote(s, ';/?:&=+,$[]%^')
        try:
            self.location.encode('ascii')
        except UnicodeEncodeError:
            scheme, netloc, path, params, query, fragment = \
                urllib.parse.urlparse(self.location)
            self.location = urllib.parse.urlunparse((
                scheme, netloc, urllib.parse.quote(path), squote(params), squote(query),
                fragment))
            headers.replace_header('Location', self.location)
            uprint("pquoted headers['Location']=" + self.location)
        return urllib.request.HTTPRedirectHandler.http_error_302(
            self, req, fp, code, msg, headers)
项目:DLink_Harvester    作者:MikimotoH    | 项目源码 | 文件源码
def main():
    global executor
    executor=ThreadPoolExecutor()

    os.makedirs(localstor, exist_ok=True)

    with open('us_dlink_filelist.csv', 'w') as fout:
        cw = csv.writer(fout)
        cw.writerow(['model', 'rev', 'fw_ver', 'fw_url', 'fsize', 'fdate', 'sha1', 'md5'])

    start_url="http://support.dlink.com/AllPro.aspx?type=all"
    d = pq(url=start_url)
    # all 442 models
    models = [_.text_content().strip() for _ in d('tr > td:nth-child(1) > .aRedirect')]

    for model in models:
        prod_url = "http://support.dlink.com/ProductInfo.aspx?m=%s"%parse.quote(model)
        crawl_prod(prod_url, model)
    executor.shutdown(True)
项目:pyTSon_plugins    作者:Bluscream    | 项目源码 | 文件源码
def clientURL(self, schid=None, clid=0, uid=None, nickname=None, encodednick=None):
        if schid == None:
            try: schid = ts3lib.getCurrentServerConnectionHandlerID()
            except: pass
        if uid == None:
            try: (error, uid) = ts3lib.getClientVariableAsString(schid, clid, ts3defines.ClientProperties.CLIENT_UNIQUE_IDENTIFIER)
            except: pass
        if nickname == None:
            try: (error, nickname) = ts3lib.getClientVariableAsString(schid, clid, ts3defines.ClientProperties.CLIENT_NICKNAME)
            except: nickname = uid
        if encodednick == None:
            try: encodednick = urlencode(nickname)
            except: pass
        return "[url=client://%s/%s~%s]%s[/url]" % (clid, uid, encodednick, nickname)

    # YOUR COMMANDS HERE:
项目:base1k    作者:xiaq    | 项目源码 | 文件源码
def urlparts(self):
        ''' The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.
            The tuple contains (scheme, host, path, query_string and fragment),
            but the fragment is always empty because it is not visible to the
            server. '''
        env = self.environ
        http = env.get('HTTP_X_FORWARDED_PROTO') or env.get('wsgi.url_scheme', 'http')
        host = env.get('HTTP_X_FORWARDED_HOST') or env.get('HTTP_HOST')
        if not host:
            # HTTP 1.1 requires a Host-header. This is for HTTP/1.0 clients.
            host = env.get('SERVER_NAME', '127.0.0.1')
            port = env.get('SERVER_PORT')
            if port and port != ('80' if http == 'http' else '443'):
                host += ':' + port
        path = urlquote(self.fullpath)
        return UrlSplitResult(http, host, path, env.get('QUERY_STRING'), '')
项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def scrapeDoi(url):
    env = os.environ.copy()
    cmd_line = ['timeout', '30s', 'google-chrome-unstable', '--headless', '--dump-dom', url]
    p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
                         stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
                         env=env)
    out, err = p.communicate()
    if p.returncode:
        print('UTOH')
        return None
    elif b'ERROR:headless_shell.cc' in out:
        print(out)
        raise IOError('Something is wrong...')
    qurl = quote(url, '')
    if len(qurl) > 200:
        qurl = qurl[:200]
    with open(os.path.expanduser(f'~/files/scibot/{qurl}'), 'wb') as f:
        f.write(out)
    both = BeautifulSoup(out, 'lxml')
    doi = getDoi(both, both)
    return doi
项目:sopel-modules    作者:phixion    | 项目源码 | 文件源码
def run_omdb_query(params, verify_ssl, add_url=True):
    uri = "https://theimdbapi.org/api/find/movie?"
    if 'i' in params:
        uri = uri.replace("/find","")
        data = requests.get(uri + "movie_id={}".format(params['i']), timeout=30,
                        verify=verify_ssl)
    elif 'y' in params:
        data = requests.get(uri + "title={}&year={}".format(quote(params['t']), params['y']), timeout=30,
                        verify=verify_ssl)
    else:
        data = requests.get(uri + "title={}".format(quote(params['t'])), timeout=30,
                        verify=verify_ssl)

    if data.text == 'null':
        message = '[MOVIE] Nothing found'
    else:
        data = data.json()
        if 't' in params:
            data = data[0]
        message = '[MOVIE] Title: ' + data['title'] + \
                  ' | Year: ' + data['year'] + \
                  ' | Rating: ' + data['rating'] + \
                  ' | Genre: ' + '/'.join(data['genre']) + \
                  ' | Plot: {}'
        if add_url:
            message += ' | IMDB Link: http://imdb.com/title/' + data['imdb_id']

        plot = data['description']
        if len(message.format(plot)) > 300:
            cliplen = 300 - (len(message) - 2 + 3) # remove {} add […]
            plot = plot[:cliplen] + '[…]'
        message = message.format(plot)

    return message
项目:segno    作者:heuer    | 项目源码 | 文件源码
def make_make_email_data(to, cc=None, bcc=None, subject=None, body=None):
    """\
    Creates either a simple "mailto:" URL or complete e-mail message with
    (blind) carbon copies and a subject and a body.

    :param str|iterable to: The email address (recipient). Multiple
            values are allowed.
    :param str|iterable|None cc: The carbon copy recipient. Multiple
            values are allowed.
    :param str|iterable|None bcc: The blind carbon copy recipient.
            Multiple values are allowed.
    :param str|None subject: The subject.
    :param str|None body: The message body.
    """
    def multi(val):
        if not val:
            return ()
        if isinstance(val, str_type):
            return (val,)
        return tuple(val)

    delim = '?'
    data = ['mailto:']
    if not to:
        raise ValueError('"to" must not be empty or None')
    data.append(','.join(multi(to)))
    for key, val in (('cc', cc), ('bcc', bcc)):
        vals = multi(val)
        if vals:
            data.append('{0}{1}={2}'.format(delim, key, ','.join(vals)))
            delim = '&'
    for key, val in (('subject', subject), ('body', body)):
        if val is not None:
            data.append('{0}{1}={2}'.format(delim, key, quote(val.encode('utf-8'))))
        delim = '&'
    return ''.join(data)
项目:segno    作者:heuer    | 项目源码 | 文件源码
def as_svg_data_uri(matrix, version, scale=1, border=None, color='#000',
                    background=None, xmldecl=False, svgns=True, title=None,
                    desc=None, svgid=None, svgclass='segno',
                    lineclass='qrline', omitsize=False, unit='',
                    encoding='utf-8', svgversion=None, nl=False,
                    encode_minimal=False, omit_charset=False):
    """\
    Converts the matrix to a SVG data URI.

    The XML declaration is omitted by default (set ``xmldecl`` to ``True``
    to enable it), further the newline is omitted by default (set ``nl`` to
    ``True`` to enable it).

    Aside from the missing ``out`` parameter and the different ``xmldecl``
    and ``nl`` default values and the additional parameter ``encode_minimal``
    and ``omit_charset`` this function uses the same parameters as the
    usual SVG serializer.

    :param bool encode_minimal: Indicates if the resulting data URI should
                    use minimal percent encoding (disabled by default).
    :param bool omit_charset: Indicates if the ``;charset=...`` should be omitted
                    (disabled by default)
    :rtype: str
    """
    encode = partial(quote, safe=b"") if not encode_minimal else partial(quote, safe=b" :/='")
    buff = io.BytesIO()
    write_svg(matrix, version, buff, scale=scale, color=color, background=background,
              border=border, xmldecl=xmldecl, svgns=svgns, title=title,
              desc=desc, svgclass=svgclass, lineclass=lineclass,
              omitsize=omitsize, encoding=encoding, svgid=svgid, unit=unit,
              svgversion=svgversion, nl=nl)
    return 'data:image/svg+xml{0},{1}' \
                .format(';charset=' + encoding if not omit_charset else '',
                        # Replace " quotes with ' and URL encode the result
                        # See also https://codepen.io/tigt/post/optimizing-svgs-in-data-uris
                        encode(_replace_quotes(buff.getvalue())))
项目:slack-shogi    作者:setokinto    | 项目源码 | 文件源码
def slack(text):
    if settings.WEBHOOK_URL:
        payload = ("payload={\"text\": \"" + parse.quote(text) +
                   "\", \"username\": \"Mr.deploy\"}").encode("utf-8")
        request.urlopen(url=settings.WEBHOOK_URL, data=payload)
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _quote(value, safe, prefix=None):
    if prefix is not None:
        return quote(str(value)[:prefix], safe)
    return quote(str(value), safe)
项目:post-review    作者:ericforbes    | 项目源码 | 文件源码
def _url_encoded_path(self):
        # https://docs.gitlab.com/ce/api/README.html#namespaced-path-encoding
        return quote('%s/%s' % (self.namespace, self.project), safe='')
项目:CorpBot.py    作者:corpnewt    | 项目源码 | 文件源码
def define(self, ctx, *, word : str):
        """Gives the definition of the word passed."""

        # Check if we're suppressing @here and @everyone mentions
        if self.settings.getServerStat(ctx.message.server, "SuppressMentions").lower() == "yes":
            suppress = True
        else:
            suppress = False

        if not word:
            msg = 'Usage: `{}define [word]`'.format(ctx.prefix)
            await self.bot.send_message(ctx.message.channel, msg)
            return
        url = "http://api.urbandictionary.com/v0/define?term={}".format(quote(word))
        msg = 'I couldn\'t find a definition for "{}"...'.format(word) 
        r = requests.get(url, headers = {'User-agent': self.ua})
        theJSON = r.json()["list"]
        if len(theJSON):
            # Got it - let's build our response
            ourWord = theJSON[0]
            msg = '__**{}:**__\n\n{}'.format(string.capwords(ourWord["word"]), ourWord["definition"])
            if ourWord["example"]:
                msg = '{}\n\n__Example(s):__\n\n*{}*'.format(msg, ourWord["example"])

        # await self.bot.send_message(ctx.message.channel, msg)
        # Check for suppress
        if suppress:
            msg = Nullify.clean(msg)
        await Message.say(self.bot, msg, ctx.message.channel, ctx.message.author)