我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.quote_plus()。
def test_username_query_sql_inject_attampt(self): username = "bobsmith" inject_attempt = quote_plus("x'; delete from users; select * from users") async with self.pool.acquire() as con: await con.execute("INSERT INTO users (username, toshi_id) VALUES ($1, $2)", username, TEST_ADDRESS) resp = await self.fetch("/search/user?query={}".format(inject_attempt), method="GET") self.assertEqual(resp.code, 200) body = json_decode(resp.body) self.assertEqual(len(body['results']), 0) async with self.pool.acquire() as con: row = await con.fetchrow("SELECT COUNT(*) AS count FROM users") self.assertEqual(row['count'], 1)
def do_command(self, verb, args): conn = http_client.HTTPConnection(self.host, self.port, timeout=self.http_timeout) try: body = 'cmd=' + urllib_parse.quote_plus(unicode(verb).encode('utf-8')) for i in range(len(args)): body += '&' + unicode(i+1) + '=' + \ urllib_parse.quote_plus(unicode(args[i]).encode('utf-8')) if (None != self.sessionId): body += "&sessionId=" + unicode(self.sessionId) headers = { "Content-Type": "application/x-www-form-urlencoded; charset=utf-8" } conn.request("POST", "/selenium-server/driver/", body, headers) response = conn.getresponse() data = unicode(response.read(), "UTF-8") if (not data.startswith('OK')): raise Exception(data) return data finally: conn.close()
def strencode (data): if not data: return data if data.find ('%') != -1 or (data.find ('+') != -1 and data.find (' ') == -1): return data d = [] for x in data.split('&'): try: k, v = x.split('=', 1) except ValueError: d.append ((k, None)) else: v = quote_plus (v) d.append ((k, v)) d2 = [] for k, v in d: if v == None: d2.append (k) else: d2.append ('%s=%s' % (k, v)) return '&'.join (d2)
def strdecode (data, value_quote = 0): if not data: return [] do_quote = 1 if data.find('%') > -1 or data.find('+') > -1: do_quote = 0 if not value_quote: do_quote = 0 d = [] for x in data.split(';'): try: k, v = x.split('=', 1) except ValueError: pass else: if do_quote: v = quote_plus (v.strip()) d.append((k.strip(), v.strip())) return d
def present(): url = 'http://api.openweathermap.org/data/2.5/weather?q=daejeon,kr&units=metric' service_key = '709f54e9062fdbadbe73863ff0ac30b5' queryParams = '&' + urlencode({quote_plus('APPID'): service_key}) request = Request(url + queryParams) request.get_method = lambda: 'GET' response_body = (urlopen(request).read()).decode("utf-8") WeatherData = json.loads(response_body) # ??? ???? ?????. JSON ?? ? ??? ????? ?????? weather = WeatherData['weather'][0] weather = weather['description'] temp_min = WeatherData['main']['temp_min'] temp_max = WeatherData['main']['temp_max'] humidity = WeatherData['main']['humidity'] temp = WeatherData['main']['temp'] present_weather = [weather, temp, temp_max, temp_min, humidity] return present_weather
def week(): url = "http://api.openweathermap.org/data/2.5/forecast?q=daejeon,kr&units=metric" service_key = '709f54e9062fdbadbe73863ff0ac30b5' queryParams = '&' + urlencode({quote_plus('APPID'): service_key}) request = Request(url + queryParams) request.get_method = lambda: 'GET' response_body = (urlopen(request).read()).decode("utf-8") WeatherData = json.loads(response_body) day1 = WeatherData["list"][5] day2 = WeatherData["list"][12] day3 = WeatherData["list"][19] day4 = WeatherData["list"][26] day5 = WeatherData['list'][34] day1 = [day1['main']['temp'], day1['weather'][0]["description"]] day2 = [day2['main']['temp'], day2['weather'][0]["description"]] day3 = [day3['main']['temp'], day3['weather'][0]["description"]] day4 = [day4['main']['temp'], day4['weather'][0]["description"]] day5 = [day5['main']['temp'], day5['weather'][0]["description"]] days = [day1, day2, day3, day4, day5] return days
def handle_unexpected_errors(f): """Decorator that catches unexpected errors.""" @wraps(f) def call_f(*args, **kwargs): try: return f(*args, **kwargs) except Exception as e: errorf = StringIO() print_exc(file=errorf) error = errorf.getvalue() click.echo( "Looks like there's a bug in our code. Sorry about that!" " Here's the traceback:\n" + error) if click.confirm( "Would you like to file an issue in our issue tracker?", default=True, abort=True): url = "https://github.com/datawire/pib/issues/new?body=" body = quote_plus(BUG_REPORT_TEMPLATE.format( os.getcwd(), __version__, python_version, run_result(["uname", "-a"]), error)) webbrowser.open_new(url + body) return call_f
def get_websearch(self,query): """ HTTP GET of a websearch, then add any embedded links. :param query: :return: """ self.select_random_search_engine() url = uprs.urlunparse(uprs.urlparse(self.SafeSearch.search_url)._replace(query='{}={}{}&{}'.format( self.SafeSearch.query_parameter,uprs.quote_plus(query), self.SafeSearch.additional_parameters,self.SafeSearch.safe_parameter))) if self.verbose: self.print_url(url) @self.phantomjs_timeout def phantomjs_get(): self.driver.get(url) # selenium driver phantomjs_get() @self.phantomjs_short_timeout def phantomjs_page_source(): self.data_usage += len(self.driver.page_source) phantomjs_page_source() new_links = self.websearch_links() if self.link_count() < self.max_links_cached: self.add_url_links(new_links,url)
def calc_signature(self): hash_params = [] for fieldname in self.SIGNATURE_FIELDS: value = self._get_value(fieldname) if value is None: value = '' hash_params.append(str(value)) hash_params.append(self.PASSWD) # extra for key in sorted(conf.EXTRA_PARAMS): value = self._get_value(key) if value is None: value = '' value = quote_plus(str(value)) hash_params.append('%s=%s' % (key, value)) hash_data = ':'.join(map(str, hash_params)) hash_value = md5(hash_data.encode()).hexdigest().upper() return hash_value
def commandLookup(self, schid, targetMode, toID, fromID, params=""): try: from PythonQt.QtNetwork import QNetworkAccessManager, QNetworkRequest from urllib.parse import quote_plus lookupAPI = "https://api.opencnam.com/v3/phone/" lookupSID = "ACda22b69608b743328772059d32b63f26" lookupAuthToken = "AUc9d9217f20194053bf2989c7cb75a368" if params.startswith("00"): params = params.replace("00", "+", 1) params = quote_plus(params) url = "{0}{1}?format=json&casing=title&service_level=plus&geo=rate&account_sid={2}&auth_token={3}".format(lookupAPI, params, lookupSID, lookupAuthToken) if self.cfg.getboolean("general", "debug"): ts3lib.printMessageToCurrentTab("Requesting: {0}".format(url)) self.nwmc = QNetworkAccessManager() self.nwmc.connect("finished(QNetworkReply*)", self.lookupReply) self.cmdevent = {"event": "", "returnCode": "", "schid": schid, "targetMode": targetMode, "toID": toID, "fromID": fromID, "params": params} self.nwmc.get(QNetworkRequest(QUrl(url))) except: from traceback import format_exc;ts3lib.logMessage(format_exc(), ts3defines.LogLevel.LogLevel_ERROR, "pyTSon", 0)
def commandWhois(self, schid, targetMode, toID, fromID, params=""): try: from PythonQt.QtNetwork import QNetworkAccessManager, QNetworkRequest from urllib.parse import quote_plus params = quote_plus(params) url = "https://jsonwhois.com/api/v1/whois?domain={0}".format(params) token = "fe1abe2646bdc7fac3d36a688d1685fc" if self.cfg.getboolean("general", "debug"): ts3lib.printMessageToCurrentTab("Requesting: {0}".format(url)) request = QNetworkRequest() request.setHeader( QNetworkRequest.ContentTypeHeader, "application/json" ); request.setRawHeader("Authorization", "Token token={0}".format(token)); request.setUrl(QUrl(url)) self.nwmc = QNetworkAccessManager() self.nwmc.connect("finished(QNetworkReply*)", self.whoisReply) self.cmdevent = {"event": "", "returnCode": "", "schid": schid, "targetMode": targetMode, "toID": toID, "fromID": fromID, "params": params} self.nwmc.get(request) except: from traceback import format_exc;ts3lib.logMessage(format_exc(), ts3defines.LogLevel.LogLevel_ERROR, "pyTSon", 0)
def test_no_signing_federation(): # UNINETT RP uninett_rp = 'https://foodle.uninett.no' _path = os.path.join('ms', quote_plus(OA['sunet'])) signer = Signer(None, _path, 'register') _kj = build_keyjar(KEYDEFS)[1] rp_fed_ent = FederationEntity(None, keyjar=_kj, iss=uninett_rp, signer=signer, fo_bundle=fo_keybundle) rp = Client(federation_entity=rp_fed_ent, fo_priority=list(FO.values())) rp.federation = FO['swamid'] req = rp.federated_client_registration_request( redirect_uris='https://foodle.uninett.no/authz', scope=['openid', 'email', 'phone'] ) assert set(req['metadata_statements'].keys()) == {FO['swamid']}
def test_no_signing_provider(): # UNINETT RP uninett_rp = 'https://foodle.uninett.no' _path = os.path.join('ms', quote_plus(OA['sunet'])) signer = Signer(None, _path, 'register') _kj = build_keyjar(KEYDEFS)[1] rp_fed_ent = FederationEntity(None, keyjar=_kj, iss=uninett_rp, signer=signer, fo_bundle=fo_keybundle) rp = Client(federation_entity=rp_fed_ent, fo_priority=list(FO.values())) rp.provider_federations = [LessOrEqual(iss=x) for x in [FO['swamid'], FO['feide']]] req = rp.federated_client_registration_request( redirect_uris='https://foodle.uninett.no/authz', scope=['openid', 'email', 'phone'] ) assert set(req['metadata_statements'].keys()) == {FO['swamid'], FO['feide']}
def test_pack_metadata_statement(): jb = FSJWKSBundle('', None, 'fo_jwks', key_conv={'to': quote_plus, 'from': unquote_plus}) _keyjar = build_keyjar(KEYDEFS)[1] op = Operator(keyjar=_keyjar, jwks_bundle=jb, iss='https://example.com/') req = MetadataStatement(issuer='https://example.org/op') sms = op.pack_metadata_statement(req) assert sms # Should be a signed JWT _jwt = factory(sms) assert _jwt assert _jwt.jwt.headers['alg'] == 'RS256' _body = json.loads(as_unicode(_jwt.jwt.part[1])) assert _body['iss'] == op.iss assert _body['issuer'] == 'https://example.org/op' # verify signature r = _jwt.verify_compact(sms, _keyjar.get_signing_key()) assert r
def test_unpack_metadata_statement_uri(): s = signer[OA['sunet']] req = MetadataStatement(issuer='https://example.org/op') # Not intermediate ms = s.create_signed_metadata_statement(req, 'discovery', single=True) jb = FSJWKSBundle('', None, 'fo_jwks', key_conv={'to': quote_plus, 'from': unquote_plus}) mds = MetaDataStore('msd') op = Operator(jwks_bundle=jb) op.httpcli = MockHTTPClient(mds) res = op.unpack_metadata_statement(jwt_ms=ms) assert len(res.parsed_statement) == 3 loel = op.evaluate_metadata_statement(res.result) assert len(loel) == 3 assert set([l.fo for l in loel]) == {'https://swamid.sunet.se', 'https://edugain.com', 'https://www.feide.no'}
def trade_app_pay(self, out_trade_no, total_amount, subject): """APP?? :out_trade_no: str ????? :total_amount: str ??? :subject: str ?? :returns: str ???????? """ self.__parameters['notify_url'] = self.__notify_url self.gen_parameters( 'alipay.trade.app.pay', { 'subject': subject, 'out_trade_no': out_trade_no, 'total_amount': total_amount, 'product_code': 'QUICK_MSECURITY_PAY' } ) sign = self.__ali_sign(self.gen_str()) return self.gen_str(1) + '&sign=' + quote_plus(sign)
def trade_refund(self, out_trade_no, refund_amount): """?????????? :out_trade_no: str ????? :refund_amount: str ???? :returns: tuple (url, data, method) """ self.gen_parameters( 'alipay.trade.refund', { 'out_trade_no': out_trade_no, 'refund_amount': refund_amount, # 'refund_reason': refund_reason, } ) sign = self.__ali_sign(self.gen_str()) data = self.gen_str(1) + '&sign=' + quote_plus(sign) return self.__ali_url, data, 'GET'
def _get_geocoding(self, key, location): """Lookup the Google geocoding API information for `key`""" url = self._location_query_base % quote_plus(key) data = self._read_from_url(url) response = json.loads(data) if response['status'] == 'OK': formatted_address = response['results'][0]['formatted_address'] pos = formatted_address.find(',') if pos == -1: location.name = formatted_address location.region = '' else: location.name = formatted_address[:pos].strip() location.region = formatted_address[pos + 1:].strip() l = response['results'][0]['geometry']['location'] location.latitude = float(l['lat']) location.longitude = float(l['lng']) else: raise AstralError('GoogleGeocoder: Unable to locate %s' % key)
def search(query, num=3): url_t = url_home + 'search?hl=en&q=%(query)s&num=%(num)d&btnG=Google+Search&tbs=0&safe=off&tbm=' query = quote_plus(query) ret = [] html = await get_page(url_t % vars()) hashes = set() soup = BeautifulSoup(html, 'html.parser') anchors = soup.find(id='search').findAll('a') for a in anchors: # Get the URL from the anchor tag. try: link = a['href'] except KeyError: continue # Filter invalid links and links pointing to Google itself. link = await filter_result(link) if not link: continue # Discard repeated results. h = hash(link) if h in hashes: continue hashes.add(h) ret.append(link) return ret
def test_quoting_space(self): # Make sure quote() and quote_plus() handle spaces as specified in # their unique way result = urllib_parse.quote(' ') self.assertEqual(result, hexescape(' '), "using quote(): %r != %r" % (result, hexescape(' '))) result = urllib_parse.quote_plus(' ') self.assertEqual(result, '+', "using quote_plus(): %r != +" % result) given = "a b cd e f" expect = given.replace(' ', hexescape(' ')) result = urllib_parse.quote(given) self.assertEqual(expect, result, "using quote(): %r != %r" % (expect, result)) expect = given.replace(' ', '+') result = urllib_parse.quote_plus(given) self.assertEqual(expect, result, "using quote_plus(): %r != %r" % (expect, result))
def _post(self, url, params=None): """ Send a POST message using some predetermined headers and params that are always necessary when talking to SO. using this instead of requests.post will save you the effort of adding the fkey, cookie, etc yourself every time. """ logger.debug("Posting to {}".format(url)) if params is None: params = {} params["fkey"] = self.fkey s = "&".join("{}={}".format(name, quote_plus(str(value))) for name, value in params.items()) header={ "Content-Length": str(len(s)), "Content-Type": "application/x-www-form-urlencoded", "Cookie": self.cookie } return requests.post(url, headers=header, data=s)
def webwxsync(): global SyncKey url = base_uri + '/webwxsync?lang=zh_CN&skey=%s&sid=%s&pass_ticket=%s' % ( BaseRequest['Skey'], BaseRequest['Sid'], quote_plus(pass_ticket)) params = { 'BaseRequest': BaseRequest, 'SyncKey': SyncKey, 'rr': ~int(time.time()), } request = getRequest(url=url, data=json.dumps(params)) request.add_header('ContentType', 'application/json; charset=UTF-8') response = wdf_urllib.urlopen(request) data = response.read().decode('utf-8', 'replace') # print(data) dic = json.loads(data) SyncKey = dic['SyncKey'] state = responseState('webwxsync', dic['BaseResponse']) return state
def list_show_uri(self, obj_types, verb): obj_type = self.args[0] assert_usage(obj_type in obj_types, "Don't know how to {0} {1}".format(verb, obj_type)) obj_info = obj_types[obj_type] uri = "/%s" % obj_type query = [] if obj_info['vhost'] and self.options.vhost: uri += "/%s" % quote_plus(self.options.vhost) cols = self.args[1:] if cols == [] and 'cols' in obj_info and self.use_cols(): cols = obj_info['cols'] if cols != []: query.append("columns=" + ",".join(cols)) sort = self.options.sort if sort: query.append("sort=" + sort) if self.options.sort_reverse: query.append("sort_reverse=true") query = "&".join(query) if query != "": uri += "?" + query return (uri, obj_info, cols)
def fetch_image(self, ctx, channel, randomize: bool = False, tags: list = []): guild = ctx.message.guild search = "https://konachan.com/post.json?limit=1&tags=" tag_search = "{} ".format(" ".join(tags)) if randomize: tag_search += " order:random" search += parse.quote_plus(tag_search) message = await channel.send("Fetching kona image...") async with aiohttp.ClientSession() as session: async with session.get(search) as r: website = await r.json() if website != []: imageURL = "https:{}".format(website[0].get("file_url")).replace(' ', '+') await message.edit(content="Requested by {}\n{}".format(ctx.message.author.mention, imageURL)) else: await message.delete()
def authorization_url(self): qd = {'response_type': 'code', 'client_id': self.key, 'scope': (' '.join(self.permissions)).strip(), 'state': self.state or self._make_new_state(), 'redirect_uri': self.redirect_uri} # urlencode uses quote_plus when encoding the query string so, # we ought to be encoding the qs by on our own. # we need to not return this as a string and instead return as bytes for urlib.parse qsl = [] for k, v in list(qd.items()): qsl.append('%s=%s' % (quote(k), quote(v))) return urljoin(self.AUTHORIZATION_URL, '?' + '&'.join(qsl), allow_fragments=True )
def get_profile(self, member_id=None, member_url=None, selectors=None, params=None, headers=None, member_email=None): if member_id: if type(member_id) is list: # Batch request, ids as CSV. url = '%s::(%s)' % (ENDPOINTS.PEOPLE, ','.join(member_id)) else: url = '%s/id=%s' % (ENDPOINTS.PEOPLE, str(member_id)) elif member_url: url = '%s/url=%s' % (ENDPOINTS.PEOPLE, quote_plus(member_url)) elif member_email: url = '%s/email=%s' % (ENDPOINTS.PEOPLE, quote_plus(member_email)) else: url = '%s/~' % ENDPOINTS.PEOPLE if selectors: url = '%s:(%s)' % (url, LinkedInSelector.parse(selectors)) if params is None: params = dict() params.update({'format': 'json'}) response = self.make_request('GET', url, params=params, headers=headers) raise_for_error(response) return response.json()
def get_memberships(self, member_id=None, member_url=None, group_id=None, selectors=None, params=None, headers=None): if member_id: url = '%s/id=%s/group-memberships' % (ENDPOINTS.PEOPLE, str(member_id)) elif member_url: url = '%s/url=%s/group-memberships' % (ENDPOINTS.PEOPLE, quote_plus(member_url)) else: url = '%s/~/group-memberships' % ENDPOINTS.PEOPLE if group_id: url = '%s/%s' % (url, str(group_id)) if selectors: url = '%s:(%s)' % (url, LinkedInSelector.parse(selectors)) response = self.make_request('GET', url, params=params, headers=headers) raise_for_error(response) return response.json()
def local_sub(self, filename, mimetype): """serve a local subtitle file""" if os.path.isfile(filename): filename = os.path.abspath(filename) else: return None webserver_ip =[(s.connect(('8.8.8.8', 53)), s.getsockname()[0], s.close()) for s in [socket.socket(socket.AF_INET, socket.SOCK_DGRAM)]][0][1] req_handler = local_server.SubtitleRequestHandler # create a webserver to handle a single request on a free port or a specific port if passed in the parameter port = 0 self.subtitleserver = http.server.HTTPServer((webserver_ip, port), req_handler) self.subtitlethread = threading.Thread(target=self.subtitleserver.handle_request) self.subtitlethread.start() url = "http://%s:%s%s" % (webserver_ip, str(self.subtitleserver.server_port), quote_plus(filename, "/")) return url
def base(self) -> GitLabCommit: """ Retrieves the base commit as a GitLabCommit object. >>> from os import environ >>> pr = GitLabMergeRequest( ... GitLabOAuthToken(environ['GITLAB_TEST_TOKEN']), ... 'gitmate-test-user/test', 2 ... ) >>> pr.base.sha '198dd16f8249ea98ed41876efe27d068b69fa215' :return: A GitLabCommit object. """ return GitLabCommit(self._token, self._repository, sha=None, branch=quote_plus(self.base_branch_name))
def head(self) -> GitLabCommit: """ Retrieves the head commit as a GitLabCommit object. >>> from os import environ >>> pr = GitLabMergeRequest( ... GitLabOAuthToken(environ['GITLAB_TEST_TOKEN']), ... 'gitmate-test-user/test', 2 ... ) >>> pr.head.sha '99f484ae167dcfcc35008ba3b5b564443d425ee0' :return: A GitLabCommit object. """ return GitLabCommit(self._token, self.source_repository.full_name, sha=None, branch=quote_plus(self.head_branch_name))
def __init__(self, token: Union[GitLabOAuthToken, GitLabPrivateToken], repository: Union[str, int]): """ Creates a new GitLabRepository object with the given credentials. :param token: A Token object to be used for authentication. :param repository: Full name or unique identifier of the repository, e.g. ``sils/baritone``. """ self._token = token self._repository = repository try: repository = int(repository) self._repository = None self._url = '/projects/{}'.format(repository) except ValueError: self._url = '/projects/' + quote_plus(repository)
def create_merge_request(self, title:str, base:str, head:str, body: Optional[str]=None, target_project_id: Optional[int]=None, target_project: Optional[str]=None): """ Create a new merge request in Repository """ url = self._url + '/merge_requests' data = { 'title' : title, 'target_branch' : base, 'source_branch' : head, 'id' : quote_plus(self.full_name), 'target_project_id' : target_project_id } json = post(self._token, url=url, data=data) from IGitt.GitLab.GitLabMergeRequest import GitLabMergeRequest return GitLabMergeRequest.from_data(json, self._token, repository=target_project, number=json['iid'])
def __init__(self, token: Union[GitLabOAuthToken, GitLabPrivateToken], repository: str, sha: Optional[str], branch: Optional[str]=None): """ Creates a new GitLabCommit object. :param token: A Token object to be used for authentication. :param repository: The full repository name. :param sha: The full commit SHA, if None given provide a branch. :param branch: A branch name if SHA is unavailable. Note that lazy loading won't work in that case. """ assert sha or branch, "Either full SHA or branch name has to be given!" self._token = token self._repository = repository self._sha = sha self._branch = branch self._url = '/projects/{id}/repository/commits/{sha}'.format( id=quote_plus(repository), sha=sha if sha else branch)
def get_statuses(self) -> Set[CommitStatus]: """ Retrieves the all commit statuses. :return: A (frozen)set of CommitStatus objects. :raises RuntimeError: If something goes wrong (network, auth...). """ # rebuild the url with full sha because gitlab doesn't work that way url = '/projects/{repo}/repository/commits/{sha}/statuses'.format( repo=quote_plus(self._repository), sha=self.sha) statuses = get(self._token, url) # Only the first of each context is the one we want result = set() contexts = set() for status in statuses: if status['name'] not in contexts: result.add(CommitStatus( INV_GL_STATE_TRANSLATION[status['status']], status['description'], status['name'], status['target_url'])) contexts.add(status['name']) return result
def get_derived_terms_from_wiktionnary(descriptor): """ :param descriptor: :return: a dict language -> set of str representing the drived t$erm in that language """ result = {} try: response = urllib.request.urlopen('https://en.wiktionary.org/w/index.php?title=' \ + quote_plus(descriptor.rstrip('\n\r')) + '&printable=yes') except IOError: return result else: html = response.read() soup = BeautifulSoup(html, 'html.parser') for language in ("English","French","Portuguese","Spanish"): titles_list = [el.attrs["title"] for el in soup.select('ul li span a[href$="#%s"]'%language) \ if "title" in el.attrs] if titles_list: # checks if the list is empty result[language.lower()] = set(titles_list) return result
def download(self, record): results = [] n_id = record['url'] #params for main grab params = {} namefields = ['id','first-name','last-name','maiden-name', 'formatted-name','phonetic-first-name','phonetic-last-name','formatted-phonetic-name'] occfields = ['headline','industry','positions'] locfields = ['location:(name,country:(code))'] activfields = ['current-share'] degreefields = ['num-connections','num-connections-capped'] descrifields = ['summary','specialties'] visualfields = ['picture-url'] selectors = namefields+occfields+locfields+activfields+degreefields+descrifields+visualfields results.append(self.get_bundled(self.apiroot+quote_plus(n_id)+':('+','.join(selectors)+')', params)) return results
def signature(params): """ ????,?????????? """ params.update({ 'Format': 'json', 'Version': '2015-01-09', 'AccessKeyId': ID, 'Timestamp': datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'), 'SignatureMethod': 'HMAC-SHA1', 'SignatureNonce': uuid.uuid4(), 'SignatureVersion': "1.0", }) query = urllib.urlencode(sorted(params.items())) log.debug(query) sign = API_METHOD + "&" + \ urllib.quote_plus("/") + "&" + urllib.quote(query, safe='') log.debug("signString: %s", sign) sign = hmac.new((TOKEN + "&").encode('utf-8'), sign.encode('utf-8'), hashlib.sha1).digest() sign = base64.b64encode(sign).strip() params["Signature"] = sign # sign.decode('utf-8').encode("base64").strip() return params
def __init_from_existing(self): query = "/data?xp={}".format(quote_plus(self.xp_name)) r = requests.get(self.client.url + query) if not r.ok: msg = "Something went wrong. Server sent: {}." raise ValueError(msg.format(r.text)) # Retrieve the current step for existing metrics content = json.loads(r.text) self.__update_steps(content["scalars"], self.scalar_steps, self.get_scalar_values) self.__update_steps(content["histograms"], self.hist_steps, self.get_histogram_values)
def to_zip(self, filename=None): query = "/backup?xp={}".format(quote_plus(self.xp_name)) r = requests.get(self.client.url + query) if not r.ok: msg = "Something went wrong. Server sent: {}." raise ValueError(msg.format(r.text)) if not filename: filename = "backup_" + self.xp_name + "_" + str(time.time()) out = open(filename + ".zip", "wb") out.write(r.content) out.close() return filename + ".zip" # Helper methods
def linking_websites(self, owner, repo): """ Finds the repo's popularity on the internet :param owner: The username of a project's owner :param repo: The name of the repository :return: DataFrame with the issues' id the date it was opened, and the date it was first responded to """ # Find websites that link to that repo repo_url = "https://github.com/{owner}/{repo}".format(owner=owner, repo=repo) query = '<a+href%3D"{repourl}"'.format(repourl=url.quote_plus(repo_url)) req = 'https://publicwww.com/websites/{query}/?export=csv&apikey={apikey}' req.format(query=query, apikey=self.__api_key) result = pd.read_csv(req, delimiter=';', header=None, names=['url', 'rank']) return result
def _translate_single_text(self, text, target_language, source_lauguage): assert _is_bytes(text) def split_text(text): start = 0 text = quote_plus(text) length = len(text) while (length - start) > self._MAX_LENGTH_PER_QUERY: for seperator in self._SEPERATORS: index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY) if index != -1: break else: raise Error('input too large') end = index + len(seperator) yield unquote_plus(text[start:end]) start = end yield unquote_plus(text[start:]) def make_task(text): return lambda: self._basic_translate(text, target_language, source_lauguage)[0] results = list(self._execute(make_task(i) for i in split_text(text))) return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
def _generate_deviation(deviations): speech_reply = [] card_reply = [] if deviations and tts_host: for d in deviations: deviation = quote_plus(d['Deviation']['Text'].encode('utf-8')) speech_reply.append('<s>%s <audio src="%s%s"/></s>' % (d['StopInfo']['TransportMode'].capitalize(), tts_host, deviation)) card_reply.append('%s - %s' % (d['StopInfo']['TransportMode'].capitalize(), d['Deviation']['Text'])) elif deviations and not tts_host: speech_reply.append('<s>There are some deviations right now.</s>') for d in deviations: deviation = quote_plus(d['Deviation']['Text'].encode('utf-8')) card_reply.append('%s - %s' % (d['StopInfo']['TransportMode'].capitalize(), d['Deviation']['Text'])) else: speech_reply.append(u'<s>There are no known deviations right now</s>') card_reply = speech_reply speech_text = ''.join(speech_reply) card_text = '\n'.join(card_reply) return speech_text, card_text
def url_escape(value, plus=True): """Returns a URL-encoded version of the given value. If ``plus`` is true (the default), spaces will be represented as "+" instead of "%20". This is appropriate for query strings but not for the path component of a URL. Note that this default is the reverse of Python's urllib module. .. versionadded:: 3.1 The ``plus`` argument """ quote = urllib_parse.quote_plus if plus else urllib_parse.quote return quote(utf8(value)) # python 3 changed things around enough that we need two separate # implementations of url_unescape. We also need our own implementation # of parse_qs since python 3's version insists on decoding everything.
def grab_albumart(search=''): search = qp(search) site = "https://www.google.com/search?site=&tbm=isch&source=hp&biw=1112&bih=613&q="+search+"&oq=backst&gs_l=img.3.0.0l10.1011.3209.0.4292.8.7.1.0.0.0.246.770.0j3j1.4.0..3..0...1.1.64.img..3.5.772.KyXkrVfTLT4#tbm=isch&q=back+street+boys+I+want+it+that+way" hdr = {'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.3', 'Accept-Encoding': 'none', 'Accept-Language': 'en-US,en;q=0.8', 'Connection': 'keep-alive'} req = makeRequest(site, hdr) content = str(req.content) end = content.find('jpg') start= content[:end].rfind('http') return content[start:end+3]