Python urllib.parse 模块,quote_plus() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.quote_plus()

项目:toshi-id-service    作者:toshiapp    | 项目源码 | 文件源码
def test_username_query_sql_inject_attampt(self):

        username = "bobsmith"
        inject_attempt = quote_plus("x'; delete from users; select * from users")

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS)

        resp = await self.fetch("/search/user?query={}".format(inject_attempt), method="GET")
        self.assertEqual(resp.code, 200)
        body = json_decode(resp.body)
        self.assertEqual(len(body['results']), 0)

        async with self.pool.acquire() as con:
            row = await con.fetchrow("SELECT COUNT(*) AS count FROM users")

        self.assertEqual(row['count'], 1)
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def strencode (data):
    if not data: return data

    if data.find ('%') != -1 or (data.find ('+') != -1 and data.find (' ') == -1):
        return data

    d = []
    for x in data.split('&'):
        try: k, v = x.split('=', 1)
        except ValueError: d.append ((k, None))
        else:
            v = quote_plus (v)
            d.append ((k, v))
    d2 = []
    for k, v in d:
        if v == None:
            d2.append (k)
        else:
            d2.append ('%s=%s' % (k, v))

    return '&'.join (d2)
项目:aquests    作者:hansroh    | 项目源码 | 文件源码
def strdecode (data, value_quote = 0):
    if not data: return []
    do_quote = 1
    if data.find('%') > -1 or data.find('+') > -1:
        do_quote = 0
    if not value_quote:
        do_quote = 0

    d = []
    for x in data.split(';'):
        try: k, v = x.split('=', 1)
        except ValueError: pass
        else:
            if do_quote:
                v = quote_plus (v.strip())
            d.append((k.strip(), v.strip()))
    return d
项目:DSMChatbot    作者:devArtoria    | 项目源码 | 文件源码
def present():
    url = 'http://api.openweathermap.org/data/2.5/weather?q=daejeon,kr&units=metric'
    service_key = '709f54e9062fdbadbe73863ff0ac30b5'

    queryParams = '&' + urlencode({quote_plus('APPID'): service_key})
    request = Request(url + queryParams)
    request.get_method = lambda: 'GET'
    response_body = (urlopen(request).read()).decode("utf-8")
    WeatherData = json.loads(response_body)

    # ??? ???? ?????. JSON ?? ? ??? ????? ??????
    weather = WeatherData['weather'][0]
    weather = weather['description']
    temp_min = WeatherData['main']['temp_min']
    temp_max = WeatherData['main']['temp_max']

    humidity = WeatherData['main']['humidity']
    temp = WeatherData['main']['temp']
    present_weather = [weather, temp, temp_max, temp_min, humidity]

    return present_weather
项目:DSMChatbot    作者:devArtoria    | 项目源码 | 文件源码
def week():
    url = "http://api.openweathermap.org/data/2.5/forecast?q=daejeon,kr&units=metric"
    service_key = '709f54e9062fdbadbe73863ff0ac30b5'

    queryParams = '&' + urlencode({quote_plus('APPID'): service_key})
    request = Request(url + queryParams)
    request.get_method = lambda: 'GET'
    response_body = (urlopen(request).read()).decode("utf-8")
    WeatherData = json.loads(response_body)
    day1 = WeatherData["list"][5]
    day2 = WeatherData["list"][12]
    day3 = WeatherData["list"][19]
    day4 = WeatherData["list"][26]
    day5 = WeatherData['list'][34]

    day1 = [day1['main']['temp'], day1['weather'][0]["description"]]
    day2 = [day2['main']['temp'], day2['weather'][0]["description"]]
    day3 = [day3['main']['temp'], day3['weather'][0]["description"]]
    day4 = [day4['main']['temp'], day4['weather'][0]["description"]]
    day5 = [day5['main']['temp'], day5['weather'][0]["description"]]

    days = [day1, day2, day3, day4, day5]

    return days
项目:flasky    作者:RoseOu    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:pib    作者:datawire    | 项目源码 | 文件源码
def handle_unexpected_errors(f):
    """Decorator that catches unexpected errors."""

    @wraps(f)
    def call_f(*args, **kwargs):
        try:
            return f(*args, **kwargs)
        except Exception as e:
            errorf = StringIO()
            print_exc(file=errorf)
            error = errorf.getvalue()
            click.echo(
                "Looks like there's a bug in our code. Sorry about that!"
                " Here's the traceback:\n" + error)
            if click.confirm(
                    "Would you like to file an issue in our issue tracker?",
                    default=True, abort=True):
                url = "https://github.com/datawire/pib/issues/new?body="
                body = quote_plus(BUG_REPORT_TEMPLATE.format(
                    os.getcwd(), __version__, python_version,
                    run_result(["uname", "-a"]), error))
                webbrowser.open_new(url + body)

    return call_f
项目:isp-data-pollution    作者:essandess    | 项目源码 | 文件源码
def get_websearch(self,query):
        """
        HTTP GET of a websearch, then add any embedded links.
        :param query: 
        :return: 
        """
        self.select_random_search_engine()
        url = uprs.urlunparse(uprs.urlparse(self.SafeSearch.search_url)._replace(query='{}={}{}&{}'.format(
            self.SafeSearch.query_parameter,uprs.quote_plus(query),
            self.SafeSearch.additional_parameters,self.SafeSearch.safe_parameter)))
        if self.verbose: self.print_url(url)
        @self.phantomjs_timeout
        def phantomjs_get(): self.driver.get(url)  # selenium driver
        phantomjs_get()
        @self.phantomjs_short_timeout
        def phantomjs_page_source(): self.data_usage += len(self.driver.page_source)
        phantomjs_page_source()
        new_links = self.websearch_links()
        if self.link_count() < self.max_links_cached: self.add_url_links(new_links,url)
项目:tumanov_castleoaks    作者:Roamdev    | 项目源码 | 文件源码
def calc_signature(self):
        hash_params = []
        for fieldname in self.SIGNATURE_FIELDS:
            value = self._get_value(fieldname)
            if value is None:
                value = ''
            hash_params.append(str(value))

        hash_params.append(self.PASSWD)

        # extra
        for key in sorted(conf.EXTRA_PARAMS):
            value = self._get_value(key)
            if value is None:
                value = ''
            value = quote_plus(str(value))
            hash_params.append('%s=%s' % (key, value))

        hash_data = ':'.join(map(str, hash_params))
        hash_value = md5(hash_data.encode()).hexdigest().upper()
        return hash_value
项目:pyTSon_plugins    作者:Bluscream    | 项目源码 | 文件源码
def commandLookup(self, schid, targetMode, toID, fromID, params=""):
        try:
            from PythonQt.QtNetwork import QNetworkAccessManager, QNetworkRequest
            from urllib.parse import quote_plus
            lookupAPI = "https://api.opencnam.com/v3/phone/"
            lookupSID = "ACda22b69608b743328772059d32b63f26"
            lookupAuthToken = "AUc9d9217f20194053bf2989c7cb75a368"
            if params.startswith("00"): params = params.replace("00", "+", 1)
            params = quote_plus(params)
            url = "{0}{1}?format=json&casing=title&service_level=plus&geo=rate&account_sid={2}&auth_token={3}".format(lookupAPI, params, lookupSID, lookupAuthToken)
            if self.cfg.getboolean("general", "debug"): ts3lib.printMessageToCurrentTab("Requesting: {0}".format(url))
            self.nwmc = QNetworkAccessManager()
            self.nwmc.connect("finished(QNetworkReply*)", self.lookupReply)
            self.cmdevent = {"event": "", "returnCode": "", "schid": schid, "targetMode": targetMode, "toID": toID, "fromID": fromID, "params": params}
            self.nwmc.get(QNetworkRequest(QUrl(url)))
        except: from traceback import format_exc;ts3lib.logMessage(format_exc(), ts3defines.LogLevel.LogLevel_ERROR, "pyTSon", 0)
项目:pyTSon_plugins    作者:Bluscream    | 项目源码 | 文件源码
def commandWhois(self, schid, targetMode, toID, fromID, params=""):
        try:
            from PythonQt.QtNetwork import QNetworkAccessManager, QNetworkRequest
            from urllib.parse import quote_plus
            params = quote_plus(params)
            url = "https://jsonwhois.com/api/v1/whois?domain={0}".format(params)
            token = "fe1abe2646bdc7fac3d36a688d1685fc"
            if self.cfg.getboolean("general", "debug"): ts3lib.printMessageToCurrentTab("Requesting: {0}".format(url))
            request = QNetworkRequest()
            request.setHeader( QNetworkRequest.ContentTypeHeader, "application/json" );
            request.setRawHeader("Authorization", "Token token={0}".format(token));
            request.setUrl(QUrl(url))
            self.nwmc = QNetworkAccessManager()
            self.nwmc.connect("finished(QNetworkReply*)", self.whoisReply)
            self.cmdevent = {"event": "", "returnCode": "", "schid": schid, "targetMode": targetMode, "toID": toID, "fromID": fromID, "params": params}
            self.nwmc.get(request)
        except: from traceback import format_exc;ts3lib.logMessage(format_exc(), ts3defines.LogLevel.LogLevel_ERROR, "pyTSon", 0)
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def test_no_signing_federation():
    # UNINETT RP
    uninett_rp = 'https://foodle.uninett.no'
    _path = os.path.join('ms', quote_plus(OA['sunet']))

    signer = Signer(None, _path, 'register')
    _kj = build_keyjar(KEYDEFS)[1]
    rp_fed_ent = FederationEntity(None, keyjar=_kj, iss=uninett_rp,
                                  signer=signer, fo_bundle=fo_keybundle)

    rp = Client(federation_entity=rp_fed_ent, fo_priority=list(FO.values()))
    rp.federation = FO['swamid']

    req = rp.federated_client_registration_request(
        redirect_uris='https://foodle.uninett.no/authz',
        scope=['openid', 'email', 'phone']
    )

    assert set(req['metadata_statements'].keys()) == {FO['swamid']}
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def test_no_signing_provider():
    # UNINETT RP
    uninett_rp = 'https://foodle.uninett.no'
    _path = os.path.join('ms', quote_plus(OA['sunet']))

    signer = Signer(None, _path, 'register')
    _kj = build_keyjar(KEYDEFS)[1]
    rp_fed_ent = FederationEntity(None, keyjar=_kj, iss=uninett_rp,
                                  signer=signer, fo_bundle=fo_keybundle)

    rp = Client(federation_entity=rp_fed_ent, fo_priority=list(FO.values()))
    rp.provider_federations = [LessOrEqual(iss=x) for x in
                               [FO['swamid'], FO['feide']]]

    req = rp.federated_client_registration_request(
        redirect_uris='https://foodle.uninett.no/authz',
        scope=['openid', 'email', 'phone']
    )

    assert set(req['metadata_statements'].keys()) == {FO['swamid'], FO['feide']}
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def test_pack_metadata_statement():
    jb = FSJWKSBundle('', None, 'fo_jwks',
                      key_conv={'to': quote_plus, 'from': unquote_plus})
    _keyjar = build_keyjar(KEYDEFS)[1]
    op = Operator(keyjar=_keyjar, jwks_bundle=jb, iss='https://example.com/')
    req = MetadataStatement(issuer='https://example.org/op')
    sms = op.pack_metadata_statement(req)
    assert sms  # Should be a signed JWT
    _jwt = factory(sms)
    assert _jwt
    assert _jwt.jwt.headers['alg'] == 'RS256'
    _body = json.loads(as_unicode(_jwt.jwt.part[1]))
    assert _body['iss'] == op.iss
    assert _body['issuer'] == 'https://example.org/op'
    # verify signature
    r = _jwt.verify_compact(sms, _keyjar.get_signing_key())
    assert r
项目:fedoidc    作者:OpenIDC    | 项目源码 | 文件源码
def test_unpack_metadata_statement_uri():
    s = signer[OA['sunet']]
    req = MetadataStatement(issuer='https://example.org/op')
    # Not intermediate
    ms = s.create_signed_metadata_statement(req, 'discovery', single=True)

    jb = FSJWKSBundle('', None, 'fo_jwks',
                      key_conv={'to': quote_plus, 'from': unquote_plus})

    mds = MetaDataStore('msd')
    op = Operator(jwks_bundle=jb)
    op.httpcli = MockHTTPClient(mds)
    res = op.unpack_metadata_statement(jwt_ms=ms)
    assert len(res.parsed_statement) == 3
    loel = op.evaluate_metadata_statement(res.result)
    assert len(loel) == 3
    assert set([l.fo for l in loel]) == {'https://swamid.sunet.se',
                                         'https://edugain.com',
                                         'https://www.feide.no'}
项目:Pay    作者:iakisey    | 项目源码 | 文件源码
def trade_app_pay(self, out_trade_no, total_amount, subject):
        """APP??

        :out_trade_no: str  ?????
        :total_amount: str  ???
        :subject:      str  ??
        :returns:      str  ????????

        """
        self.__parameters['notify_url'] = self.__notify_url
        self.gen_parameters(
            'alipay.trade.app.pay',
            {
                'subject': subject,
                'out_trade_no': out_trade_no,
                'total_amount': total_amount,
                'product_code': 'QUICK_MSECURITY_PAY'
            }
        )
        sign = self.__ali_sign(self.gen_str())
        return self.gen_str(1) + '&sign=' + quote_plus(sign)
项目:Pay    作者:iakisey    | 项目源码 | 文件源码
def trade_refund(self, out_trade_no, refund_amount):
        """??????????

        :out_trade_no:  str    ?????
        :refund_amount: str    ????
        :returns:       tuple  (url, data, method)

        """
        self.gen_parameters(
            'alipay.trade.refund',
            {
                'out_trade_no': out_trade_no,
                'refund_amount': refund_amount,
                # 'refund_reason': refund_reason,
            }
        )
        sign = self.__ali_sign(self.gen_str())
        data = self.gen_str(1) + '&sign=' + quote_plus(sign)
        return self.__ali_url, data, 'GET'
项目:respeaker_virtualenv    作者:respeaker    | 项目源码 | 文件源码
def _get_geocoding(self, key, location):
        """Lookup the Google geocoding API information for `key`"""

        url = self._location_query_base % quote_plus(key)
        data = self._read_from_url(url)
        response = json.loads(data)
        if response['status'] == 'OK':
            formatted_address = response['results'][0]['formatted_address']
            pos = formatted_address.find(',')
            if pos == -1:
                location.name = formatted_address
                location.region = ''
            else:
                location.name = formatted_address[:pos].strip()
                location.region = formatted_address[pos + 1:].strip()

            l = response['results'][0]['geometry']['location']
            location.latitude = float(l['lat'])
            location.longitude = float(l['lng'])
        else:
            raise AstralError('GoogleGeocoder: Unable to locate %s' % key)
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def search(query, num=3):
    url_t = url_home + 'search?hl=en&q=%(query)s&num=%(num)d&btnG=Google+Search&tbs=0&safe=off&tbm='
    query = quote_plus(query)
    ret = []
    html = await get_page(url_t % vars())
    hashes = set()
    soup = BeautifulSoup(html, 'html.parser')
    anchors = soup.find(id='search').findAll('a')
    for a in anchors:
        # Get the URL from the anchor tag.
        try:
            link = a['href']
        except KeyError:
            continue
        # Filter invalid links and links pointing to Google itself.
        link = await filter_result(link)
        if not link:
            continue
        # Discard repeated results.
        h = hash(link)
        if h in hashes:
            continue
        hashes.add(h)
        ret.append(link)
    return ret
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_quoting_space(self):
        # Make sure quote() and quote_plus() handle spaces as specified in
        # their unique way
        result = urllib_parse.quote(' ')
        self.assertEqual(result, hexescape(' '),
                         "using quote(): %r != %r" % (result, hexescape(' ')))
        result = urllib_parse.quote_plus(' ')
        self.assertEqual(result, '+',
                         "using quote_plus(): %r != +" % result)
        given = "a b cd e f"
        expect = given.replace(' ', hexescape(' '))
        result = urllib_parse.quote(given)
        self.assertEqual(expect, result,
                         "using quote(): %r != %r" % (expect, result))
        expect = given.replace(' ', '+')
        result = urllib_parse.quote_plus(given)
        self.assertEqual(expect, result,
                         "using quote_plus(): %r != %r" % (expect, result))
项目:rabbit    作者:sopython    | 项目源码 | 文件源码
def _post(self, url, params=None):
        """
        Send a POST message using some predetermined headers and params that are always necessary when talking to SO.
        using this instead of requests.post will save you the effort of adding the fkey, cookie, etc yourself every time.
        """
        logger.debug("Posting to {}".format(url))
        if params is None:
            params = {}
        params["fkey"] = self.fkey
        s = "&".join("{}={}".format(name, quote_plus(str(value))) for name, value in params.items())
        header={
            "Content-Length": str(len(s)),
            "Content-Type": "application/x-www-form-urlencoded",
            "Cookie": self.cookie
        }
        return requests.post(url, headers=header, data=s)
项目:_    作者:zengchunyun    | 项目源码 | 文件源码
def webwxsync():
    global SyncKey

    url = base_uri + '/webwxsync?lang=zh_CN&skey=%s&sid=%s&pass_ticket=%s' % (
        BaseRequest['Skey'], BaseRequest['Sid'], quote_plus(pass_ticket))
    params = {
        'BaseRequest': BaseRequest,
        'SyncKey': SyncKey,
        'rr': ~int(time.time()),
    }

    request = getRequest(url=url, data=json.dumps(params))
    request.add_header('ContentType', 'application/json; charset=UTF-8')
    response = wdf_urllib.urlopen(request)
    data = response.read().decode('utf-8', 'replace')

    # print(data)

    dic = json.loads(data)
    SyncKey = dic['SyncKey']

    state = responseState('webwxsync', dic['BaseResponse'])
    return state
项目:_    作者:zengchunyun    | 项目源码 | 文件源码
def list_show_uri(self, obj_types, verb):
        obj_type = self.args[0]
        assert_usage(obj_type in obj_types,
                     "Don't know how to {0} {1}".format(verb, obj_type))
        obj_info = obj_types[obj_type]
        uri = "/%s" % obj_type
        query = []
        if obj_info['vhost'] and self.options.vhost:
            uri += "/%s" % quote_plus(self.options.vhost)
        cols = self.args[1:]
        if cols == [] and 'cols' in obj_info and self.use_cols():
            cols = obj_info['cols']
        if cols != []:
            query.append("columns=" + ",".join(cols))
        sort = self.options.sort
        if sort:
            query.append("sort=" + sort)
        if self.options.sort_reverse:
            query.append("sort_reverse=true")
        query = "&".join(query)
        if query != "":
            uri += "?" + query
        return (uri, obj_info, cols)
项目:ShuoshuoMonitor    作者:aploium    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:nya-chan-bot    作者:OSAlt    | 项目源码 | 文件源码
def fetch_image(self, ctx, channel, randomize: bool = False, tags: list = []):
        guild = ctx.message.guild
        search = "https://konachan.com/post.json?limit=1&tags="
        tag_search = "{} ".format(" ".join(tags))
        if randomize:
            tag_search += " order:random"
        search += parse.quote_plus(tag_search)
        message = await channel.send("Fetching kona image...")
        async with aiohttp.ClientSession() as session:
            async with session.get(search) as r:
                website = await r.json()
        if website != []:
            imageURL = "https:{}".format(website[0].get("file_url")).replace(' ', '+')
            await message.edit(content="Requested by {}\n{}".format(ctx.message.author.mention, imageURL))
        else:
            await message.delete()
项目:python3-linkedin    作者:DEKHTIARJonathan    | 项目源码 | 文件源码
def authorization_url(self):
        qd = {'response_type': 'code',
              'client_id': self.key,
              'scope': (' '.join(self.permissions)).strip(),
              'state': self.state or self._make_new_state(),
              'redirect_uri': self.redirect_uri}
        # urlencode uses quote_plus when encoding the query string so,
        # we ought to be encoding the qs by on our own.
        # we need to not return this as a string and instead return as bytes for urlib.parse
        qsl = []
        for k, v in list(qd.items()):
            qsl.append('%s=%s' % (quote(k), quote(v)))

        return urljoin(self.AUTHORIZATION_URL,
                       '?' + '&'.join(qsl),
                       allow_fragments=True
                       )
项目:python3-linkedin    作者:DEKHTIARJonathan    | 项目源码 | 文件源码
def get_profile(self, member_id=None, member_url=None, selectors=None, params=None, headers=None, member_email=None):
        if member_id:
            if type(member_id) is list:
                # Batch request, ids as CSV.
                url = '%s::(%s)' % (ENDPOINTS.PEOPLE,
                                    ','.join(member_id))
            else:
                url = '%s/id=%s' % (ENDPOINTS.PEOPLE, str(member_id))
        elif member_url:
            url = '%s/url=%s' % (ENDPOINTS.PEOPLE, quote_plus(member_url))
        elif member_email:
            url = '%s/email=%s' % (ENDPOINTS.PEOPLE, quote_plus(member_email))
        else:
            url = '%s/~' % ENDPOINTS.PEOPLE
        if selectors:
            url = '%s:(%s)' % (url, LinkedInSelector.parse(selectors))

        if params is None:
            params = dict()
        params.update({'format': 'json'})

        response = self.make_request('GET', url, params=params, headers=headers)
        raise_for_error(response)
        return response.json()
项目:python3-linkedin    作者:DEKHTIARJonathan    | 项目源码 | 文件源码
def get_memberships(self, member_id=None, member_url=None, group_id=None,
                        selectors=None, params=None, headers=None):
        if member_id:
            url = '%s/id=%s/group-memberships' % (ENDPOINTS.PEOPLE, str(member_id))
        elif member_url:
            url = '%s/url=%s/group-memberships' % (ENDPOINTS.PEOPLE,
                                                   quote_plus(member_url))
        else:
            url = '%s/~/group-memberships' % ENDPOINTS.PEOPLE

        if group_id:
            url = '%s/%s' % (url, str(group_id))

        if selectors:
            url = '%s:(%s)' % (url, LinkedInSelector.parse(selectors))

        response = self.make_request('GET', url, params=params, headers=headers)
        raise_for_error(response)
        return response.json()
项目:chromecast-player    作者:wa4557    | 项目源码 | 文件源码
def local_sub(self, filename, mimetype):
        """serve a local subtitle file"""

        if os.path.isfile(filename):
            filename = os.path.abspath(filename)
        else:
            return None

        webserver_ip =[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1]

        req_handler = local_server.SubtitleRequestHandler

        # create a webserver to handle a single request on a free port or a specific port if passed in the parameter   
        port = 0    

        self.subtitleserver = http.server.HTTPServer((webserver_ip, port), req_handler)
        self.subtitlethread = threading.Thread(target=self.subtitleserver.handle_request)
        self.subtitlethread.start()    

        url = "http://%s:%s%s" % (webserver_ip, str(self.subtitleserver.server_port), quote_plus(filename, "/"))
        return url
项目:IGitt    作者:GitMateIO    | 项目源码 | 文件源码
def base(self) -> GitLabCommit:
        """
        Retrieves the base commit as a GitLabCommit object.

        >>> from os import environ
        >>> pr = GitLabMergeRequest(
        ...     GitLabOAuthToken(environ['GITLAB_TEST_TOKEN']),
        ...     'gitmate-test-user/test', 2
        ... )
        >>> pr.base.sha
        '198dd16f8249ea98ed41876efe27d068b69fa215'

        :return: A GitLabCommit object.
        """
        return GitLabCommit(self._token, self._repository, sha=None,
                            branch=quote_plus(self.base_branch_name))
项目:IGitt    作者:GitMateIO    | 项目源码 | 文件源码
def head(self) -> GitLabCommit:
        """
        Retrieves the head commit as a GitLabCommit object.

         >>> from os import environ
        >>> pr = GitLabMergeRequest(
        ...     GitLabOAuthToken(environ['GITLAB_TEST_TOKEN']),
        ...     'gitmate-test-user/test', 2
        ... )
        >>> pr.head.sha
        '99f484ae167dcfcc35008ba3b5b564443d425ee0'

        :return: A GitLabCommit object.
        """
        return GitLabCommit(self._token, self.source_repository.full_name,
                            sha=None, branch=quote_plus(self.head_branch_name))
项目:IGitt    作者:GitMateIO    | 项目源码 | 文件源码
def __init__(self, token: Union[GitLabOAuthToken, GitLabPrivateToken],
                 repository: Union[str, int]):
        """
        Creates a new GitLabRepository object with the given credentials.

        :param token: A Token object to be used for authentication.
        :param repository: Full name or unique identifier of the repository,
                           e.g. ``sils/baritone``.
        """
        self._token = token
        self._repository = repository
        try:
            repository = int(repository)
            self._repository = None
            self._url = '/projects/{}'.format(repository)
        except ValueError:
            self._url = '/projects/' + quote_plus(repository)
项目:IGitt    作者:GitMateIO    | 项目源码 | 文件源码
def create_merge_request(self, title:str, base:str, head:str,
                             body: Optional[str]=None,
                             target_project_id: Optional[int]=None,
                             target_project: Optional[str]=None):
        """
        Create a new merge request in Repository
        """
        url = self._url + '/merge_requests'
        data = {
            'title' : title,
            'target_branch' : base,
            'source_branch' : head,
            'id' : quote_plus(self.full_name),
            'target_project_id' : target_project_id
        }
        json = post(self._token, url=url, data=data)

        from IGitt.GitLab.GitLabMergeRequest import GitLabMergeRequest
        return GitLabMergeRequest.from_data(json, self._token,
                                            repository=target_project,
                                            number=json['iid'])
项目:IGitt    作者:GitMateIO    | 项目源码 | 文件源码
def __init__(self,
                 token: Union[GitLabOAuthToken, GitLabPrivateToken],
                 repository: str,
                 sha: Optional[str],
                 branch: Optional[str]=None):
        """
        Creates a new GitLabCommit object.

        :param token: A Token object to be used for authentication.
        :param repository: The full repository name.
        :param sha: The full commit SHA, if None given provide a branch.
        :param branch: A branch name if SHA is unavailable. Note that lazy
                       loading won't work in that case.
        """
        assert sha or branch, "Either full SHA or branch name has to be given!"
        self._token = token
        self._repository = repository
        self._sha = sha
        self._branch = branch
        self._url = '/projects/{id}/repository/commits/{sha}'.format(
            id=quote_plus(repository), sha=sha if sha else branch)
项目:IGitt    作者:GitMateIO    | 项目源码 | 文件源码
def get_statuses(self) -> Set[CommitStatus]:
        """
        Retrieves the all commit statuses.

        :return: A (frozen)set of CommitStatus objects.
        :raises RuntimeError: If something goes wrong (network, auth...).
        """
        # rebuild the url with full sha because gitlab doesn't work that way
        url = '/projects/{repo}/repository/commits/{sha}/statuses'.format(
            repo=quote_plus(self._repository), sha=self.sha)
        statuses = get(self._token, url)

        # Only the first of each context is the one we want
        result = set()
        contexts = set()
        for status in statuses:
            if status['name'] not in contexts:
                result.add(CommitStatus(
                    INV_GL_STATE_TRANSLATION[status['status']],
                    status['description'], status['name'],
                    status['target_url']))
                contexts.add(status['name'])

        return result
项目:ieml    作者:IEMLdev    | 项目源码 | 文件源码
def get_derived_terms_from_wiktionnary(descriptor):
    """

    :param descriptor:
    :return: a dict language -> set of str representing the drived t$erm in that language
    """
    result = {}

    try:
        response = urllib.request.urlopen('https://en.wiktionary.org/w/index.php?title=' \
                                          + quote_plus(descriptor.rstrip('\n\r')) + '&printable=yes')
    except IOError:
        return result
    else:
        html = response.read()
        soup = BeautifulSoup(html, 'html.parser')

        for language in ("English","French","Portuguese","Spanish"):
            titles_list = [el.attrs["title"] for el in soup.select('ul li span a[href$="#%s"]'%language) \
                           if "title" in el.attrs]
            if titles_list:  # checks if the list is empty
                result[language.lower()] = set(titles_list)
    return result
项目:astral    作者:sffjunkie    | 项目源码 | 文件源码
def _get_geocoding(self, key, location):
        """Lookup the Google geocoding API information for `key`"""

        url = self._location_query_base % quote_plus(key)
        data = self._read_from_url(url)
        response = json.loads(data)
        if response['status'] == 'OK':
            formatted_address = response['results'][0]['formatted_address']
            pos = formatted_address.find(',')
            if pos == -1:
                location.name = formatted_address
                location.region = ''
            else:
                location.name = formatted_address[:pos].strip()
                location.region = formatted_address[pos + 1:].strip()

            l = response['results'][0]['geometry']['location']
            location.latitude = float(l['lat'])
            location.longitude = float(l['lng'])
        else:
            raise AstralError('GoogleGeocoder: Unable to locate %s' % key)
项目:social-vuln-scanner    作者:Betawolf    | 项目源码 | 文件源码
def download(self, record):
    results = []
    n_id = record['url']
    #params for main grab
    params = {}
    namefields = ['id','first-name','last-name','maiden-name',
        'formatted-name','phonetic-first-name','phonetic-last-name','formatted-phonetic-name']
    occfields = ['headline','industry','positions']
    locfields = ['location:(name,country:(code))']
    activfields = ['current-share']
    degreefields = ['num-connections','num-connections-capped']
    descrifields = ['summary','specialties']
    visualfields = ['picture-url']
    selectors = namefields+occfields+locfields+activfields+degreefields+descrifields+visualfields
    results.append(self.get_bundled(self.apiroot+quote_plus(n_id)+':('+','.join(selectors)+')', params))
    return results
项目:DDNS    作者:NewFuture    | 项目源码 | 文件源码
def signature(params):
    """
    ????,??????????
    """
    params.update({
        'Format': 'json',
        'Version': '2015-01-09',
        'AccessKeyId': ID,
        'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'),
        'SignatureMethod': 'HMAC-SHA1',
        'SignatureNonce': uuid.uuid4(),
        'SignatureVersion': "1.0",
    })
    query = urllib.urlencode(sorted(params.items()))
    log.debug(query)
    sign = API_METHOD + "&" + \
        urllib.quote_plus("/") + "&" + urllib.quote(query, safe='')
    log.debug("signString: %s", sign)

    sign = hmac.new((TOKEN + "&").encode('utf-8'),
                    sign.encode('utf-8'), hashlib.sha1).digest()
    sign = base64.b64encode(sign).strip()
    params["Signature"] = sign
    # sign.decode('utf-8').encode("base64").strip()
    return params
项目:crayon    作者:torrvision    | 项目源码 | 文件源码
def __init_from_existing(self):
        query = "/data?xp={}".format(quote_plus(self.xp_name))
        r = requests.get(self.client.url + query)

        if not r.ok:
            msg = "Something went wrong. Server sent: {}."
            raise ValueError(msg.format(r.text))

        # Retrieve the current step for existing metrics
        content = json.loads(r.text)
        self.__update_steps(content["scalars"],
                            self.scalar_steps,
                            self.get_scalar_values)
        self.__update_steps(content["histograms"],
                            self.hist_steps,
                            self.get_histogram_values)
项目:crayon    作者:torrvision    | 项目源码 | 文件源码
def to_zip(self, filename=None):
        query = "/backup?xp={}".format(quote_plus(self.xp_name))
        r = requests.get(self.client.url + query)

        if not r.ok:
            msg = "Something went wrong. Server sent: {}."
            raise ValueError(msg.format(r.text))

        if not filename:
            filename = "backup_" + self.xp_name + "_" + str(time.time())
        out = open(filename + ".zip", "wb")
        out.write(r.content)
        out.close()
        return filename + ".zip"

    # Helper methods
项目:amazon_order_history_scraper    作者:drewctate    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:webapp    作者:superchilli    | 项目源码 | 文件源码
def do_command(self, verb, args):
        conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout)
        try:
            body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8'))
            for i in range(len(args)):
                body += '&' + unicode(i+1) + '=' + \
                        urllib_parse.quote_plus(unicode(args[i]).encode('utf-8'))
            if (None != self.sessionId):
                body += "&sessionId=" + unicode(self.sessionId)
            headers = {
                "Content-Type":
                "application/x-www-form-urlencoded; charset=utf-8"
            }
            conn.request("POST", "/selenium-server/driver/", body, headers)

            response = conn.getresponse()
            data = unicode(response.read(), "UTF-8")
            if (not data.startswith('OK')):
                raise Exception(data)
            return data
        finally:
            conn.close()
项目:ghdata    作者:OSSHealth    | 项目源码 | 文件源码
def linking_websites(self, owner, repo):
        """
        Finds the repo's popularity on the internet

        :param owner: The username of a project's owner
        :param repo: The name of the repository
        :return: DataFrame with the issues' id the date it was
                 opened, and the date it was first responded to
        """

        # Find websites that link to that repo
        repo_url = "https://github.com/{owner}/{repo}".format(owner=owner, repo=repo)
        query = '<a+href%3D"{repourl}"'.format(repourl=url.quote_plus(repo_url))
        req = 'https://publicwww.com/websites/{query}/?export=csv&apikey={apikey}'
        req.format(query=query, apikey=self.__api_key)
        result = pd.read_csv(req, delimiter=';', header=None, names=['url', 'rank'])
        return result
项目:CVProject    作者:hieuxinhe94    | 项目源码 | 文件源码
def _translate_single_text(self, text, target_language, source_lauguage):
        assert _is_bytes(text)
        def split_text(text):
            start = 0
            text = quote_plus(text)
            length = len(text)
            while (length - start) > self._MAX_LENGTH_PER_QUERY:
                for seperator in self._SEPERATORS:
                    index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY)
                    if index != -1:
                        break
                else:
                    raise Error('input too large')
                end = index + len(seperator)
                yield unquote_plus(text[start:end])
                start = end

            yield unquote_plus(text[start:])

        def make_task(text):
            return lambda: self._basic_translate(text, target_language, source_lauguage)[0]

        results = list(self._execute(make_task(i) for i in split_text(text)))
        return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
项目:alexa-sl    作者:clearminds    | 项目源码 | 文件源码
def _generate_deviation(deviations):
    speech_reply =  []
    card_reply =  []
    if deviations and tts_host:
        for d in deviations:
            deviation = quote_plus(d['Deviation']['Text'].encode('utf-8'))
            speech_reply.append('<s>%s <audio src="%s%s"/></s>' % (d['StopInfo']['TransportMode'].capitalize(),
                                                            tts_host, deviation))
            card_reply.append('%s - %s' % (d['StopInfo']['TransportMode'].capitalize(), d['Deviation']['Text']))
    elif deviations and not tts_host:
        speech_reply.append('<s>There are some deviations right now.</s>')
        for d in deviations:
            deviation = quote_plus(d['Deviation']['Text'].encode('utf-8'))
            card_reply.append('%s - %s' % (d['StopInfo']['TransportMode'].capitalize(), d['Deviation']['Text']))
    else:
        speech_reply.append(u'<s>There are no known deviations right now</s>')
        card_reply = speech_reply

    speech_text = ''.join(speech_reply)
    card_text = '\n'.join(card_reply)

    return speech_text, card_text
项目:aweasome_learning    作者:Knight-ZXW    | 项目源码 | 文件源码
def url_escape(value, plus=True):
    """Returns a URL-encoded version of the given value.

    If ``plus`` is true (the default), spaces will be represented
    as "+" instead of "%20".  This is appropriate for query strings
    but not for the path component of a URL.  Note that this default
    is the reverse of Python's urllib module.

    .. versionadded:: 3.1
        The ``plus`` argument
    """
    quote = urllib_parse.quote_plus if plus else urllib_parse.quote
    return quote(utf8(value))


# python 3 changed things around enough that we need two separate
# implementations of url_unescape.  We also need our own implementation
# of parse_qs since python 3's version insists on decoding everything.
项目:zenchmarks    作者:squeaky-pl    | 项目源码 | 文件源码
def url_escape(value, plus=True):
    """Returns a URL-encoded version of the given value.

    If ``plus`` is true (the default), spaces will be represented
    as "+" instead of "%20".  This is appropriate for query strings
    but not for the path component of a URL.  Note that this default
    is the reverse of Python's urllib module.

    .. versionadded:: 3.1
        The ``plus`` argument
    """
    quote = urllib_parse.quote_plus if plus else urllib_parse.quote
    return quote(utf8(value))


# python 3 changed things around enough that we need two separate
# implementations of url_unescape.  We also need our own implementation
# of parse_qs since python 3's version insists on decoding everything.
项目:subsonic    作者:lmEshoo    | 项目源码 | 文件源码
def grab_albumart(search=''):
    search = qp(search)
    site = "https://www.google.com/search?site=&tbm=isch&source=hp&biw=1112&bih=613&q="+search+"&oq=backst&gs_l=img.3.0.0l10.1011.3209.0.4292.8.7.1.0.0.0.246.770.0j3j1.4.0..3..0...1.1.64.img..3.5.772.KyXkrVfTLT4#tbm=isch&q=back+street+boys+I+want+it+that+way"
    hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11',
           'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
           'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3',
           'Accept-Encoding': 'none',
           'Accept-Language': 'en-US,en;q=0.8',
           'Connection': 'keep-alive'}
    req = makeRequest(site, hdr)

    content = str(req.content)
    end =  content.find('jpg')
    start= content[:end].rfind('http')

    return content[start:end+3]