我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.parse.unquote()。
def parse_qs(qs): params = {} for field in qs.split('&'): r = partition(field, '=') k = r[0] v = r[2] if '+' in k: k = k.replace('+', ' ') if '%' in k: k = unquote(k) if '+' in v: v = v.replace('+', ' ') if k in params: params[k].append('%' in v and unquote(v) or v) else: if ',' in v: params[k] = [('%' in v and unquote(x) or x) for x in v.split(',')] else: params[k] = ['%' in v and unquote(v) or v] return params
def _format_resp(resp): """ Format API response """ resp = re.search('Response=(.*)', resp).group(1) resp = resp.replace('%14', ' ') pairs = resp.split() formatted = {} for pair in pairs: pair = pair.replace('%13', ' ') pair = pair.split() key = pair[0] if key == 'ITEMS': value = dehexify(pair[-1]) else: value = urlparse.unquote(pair[-1]) formatted[key] = value return formatted
def compat_urllib_parse_unquote(string, encoding='utf-8', errors='replace'): """Replace %xx escapes by their single-character equivalent. The optional encoding and errors parameters specify how to decode percent-encoded sequences into Unicode characters, as accepted by the bytes.decode() method. By default, percent-encoded sequences are decoded with UTF-8, and invalid sequences are replaced by a placeholder character. unquote('abc%20def') -> 'abc def'. """ if '%' not in string: string.split return string if encoding is None: encoding = 'utf-8' if errors is None: errors = 'replace' bits = _asciire.split(string) res = [bits[0]] append = res.append for i in range(1, len(bits), 2): append(compat_urllib_parse_unquote_to_bytes(bits[i]).decode(encoding, errors)) append(bits[i + 1]) return ''.join(res)
def url_to_path_and_args(url, no_query_string=False): if no_query_string: url = url.replace('?', '%3F').replace('#', '%23') components = urlsplit(url) path = components.path if no_query_string: path = unquote(path) # ??????? CEIBA ? %3F ?????????? # ??????? CEIBA ? %253F ?????????? # ?? ceiba_dl.Request ????????????????????? quote_test = path.replace('?', '').replace('#', '').replace(' ', '') if quote(quote_test) != quote_test: path = path.replace('?', '%3F').replace('#', '%23') args = {} else: query_string = components.query args = parse_qs(query_string, keep_blank_values=True) for key, value in args.items(): if isinstance(value, list): assert len(value) == 1 args[key] = value[0] return (path, args) # lxml ????????? None??????????
def _cheap_response_parse(arg1, arg2): """Silly parser for 'name=value; attr=attrvalue' format, to test out response renders """ def crumble(arg): "Break down string into pieces" lines = [line for line in arg if line] done = [] for line in lines: clauses = [clause for clause in line.split(';')] import logging logging.error("clauses %r", clauses) name, value = re.split(" *= *", clauses[0], 1) value = unquote(value.strip(' "')) attrs = [re.split(" *= *", clause, 1) \ for clause in clauses[1:] if clause] attrs = [attr for attr in attrs \ if attr[0] in Cookie.attribute_names] attrs = [(k, v.strip(' "')) for k, v in attrs] done.append((name, value, tuple(attrs))) return done result1 = crumble([arg1]) result2 = crumble(arg2) return result1, result2
def do_request(self, url, params=None, timeout=None): """Performs the HTTP request, signed with OAuth. :param timeout: optional request timeout, in seconds. :type timeout: float @return: the response content """ req = self.session.post(url, data=params, auth=self.oauth, timeout=timeout or self.default_timeout) # check the response headers / status code. if req.status_code != 200: self.log.error('do_request: Status code %i received, content:', req.status_code) for part in req.text.split('&'): self.log.error(' %s', urllib_parse.unquote(part)) raise exceptions.FlickrError('do_request: Status code %s received' % req.status_code) return req.content
def stickers_api_packs_stickers(self, request, packs, stickers): #packs = parse.unquote(packs) #stickers = parse.unquote(stickers) resp = {"packs": None, "stickers": None} resp["packs"] = packs resp["stickers"] = stickers resp.update({"comment": "Get sticker info"}) resp.update({"name": resp["stickers"]}) print(resp["stickers"]) sticker = await self.db.get_stickers_by_name(resp["stickers"]) print(sticker) resp.update({"url": sticker[0][2]}) resp.update({"id": sticker[0][0]}) resp.update({"rating": sticker[0][1]}) resp.update({"pack_url": sticker[0][3]}) return json(resp)
def execute(**kwargs): command = kwargs['command' ] arguments = kwargs['arguments'] direct = kwargs['direct' ] if not direct or not arguments: return data = ' '.join(arguments[:-1]) algo = arguments[-1].lower() BASE64 = 'base64' URL = 'url' algorithms = { BASE64: b64d, URL : unquote, } if algo not in algorithms: print('Unknown algorithm: {}'.format(algo)) return print(algorithms[algo](data))
def adapter_do_GET(self, request): payload = { "sendto": request.match_info["id"], "key": request.match_info["api_key"], "content": unquote(request.match_info["message"]) } results = await self.process_request('', # IGNORED '', # IGNORED payload) if results: content_type="text/html" results = results.encode("ascii", "xmlcharrefreplace") else: content_type="text/plain" results = "OK".encode('utf-8') return web.Response(body=results, content_type=content_type)
def __get_google_auth_session(username, password): google_accounts_url = 'https://accounts.google.com' authentication_url = 'https://accounts.google.com/ServiceLoginAuth' session = requests.session() login_html = session.get(google_accounts_url) soup_login = BeautifulSoup(login_html.content, 'html.parser').find('form').find_all('input') payload = {} for u in soup_login: if u.has_attr('value'): payload[u['name']] = u['value'] payload['Email'] = username payload['Passwd'] = password auto = login_html.headers.get('X-Auto-Login') follow_up = unquote(unquote(auto)).split('continue=')[-1] payload['continue'] = follow_up session.post(authentication_url, data=payload) return session
def away_links(request, html): """ ???????? ??? ??????? ?????? ? html-???? ?? ?????? ????? ? ?????????? """ site = get_current_site(request) soup = Soup(html, 'html5lib') for tag in soup.findAll('a'): if tag.get('href'): parsed = parse.urlparse(tag['href']) if '' not in (parsed.scheme, parsed.netloc) \ and not parsed.query \ and not is_same_domain(parsed.netloc, site.domain): tag['target'] = '_blank' tag['href'] = resolve_url('away') + '?url=' + parsed.geturl() if tag.string: tag.string = parse.unquote(tag.string) return soup.body.decode_contents()
def start(bot, update, args, update_queue): msg = update.message if len(args) == 1 and args[0] == 'help_inline': msg.reply_text(BASE_START_TEXT.format(user_name=msg.from_user.first_name, bot_name=bot.name) + INLINE_HELP_TEXT.format(bot_name=bot.name), disable_web_page_preview=True, parse_mode=ParseMode.HTML) return elif len(args) > 0: # Webogram doesn't urldecode/unquote args = unquote(' '.join(args)).split(' ') if len(args) == 2 and args[0] == 'cmd': msg.text = args[1] update_queue.put(update) return msg.reply_text(BASE_START_TEXT.format(user_name=msg.from_user.first_name, bot_name=bot.name) + START_TEXT, disable_web_page_preview=True)
def test_unquoting(self): # Make sure unquoting of all ASCII values works escape_list = [] for num in range(128): given = hexescape(chr(num)) expect = chr(num) result = urllib_parse.unquote(given) self.assertEqual(expect, result, "using unquote(): %r != %r" % (expect, result)) result = urllib_parse.unquote_plus(given) self.assertEqual(expect, result, "using unquote_plus(): %r != %r" % (expect, result)) escape_list.append(given) escape_string = ''.join(escape_list) del escape_list result = urllib_parse.unquote(escape_string) self.assertEqual(result.count('%'), 1, "using unquote(): not all characters escaped: " "%s" % result) self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, None) self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, ()) with support.check_warnings(('', BytesWarning), quiet=True): self.assertRaises((TypeError, AttributeError), urllib_parse.unquote, bytes(b''))
def get_context_data(self, **kwargs): context = super(SubmissionRedirectView,self).get_context_data(**kwargs) try: redirect_url = unquote(self.request.GET.get('redirect_url','')) if not redirect_url: redirect_url = Page.objects.get(pk=getConstant('general__defaultAdminSuccessPage')).get_absolute_url(settings.LANGUAGE_CODE) except ObjectDoesNotExist: redirect_url = '/' context.update({ 'redirect_url': redirect_url, 'seconds': self.request.GET.get('seconds',5), }) return context ################################################ # For Viewing Invoices and sending notifications
def download_file(self, url: str, filepath: str, fname="", progf=False): """Download a file from `url` to `filepath/name`""" r = self.session.get(url, stream=True) dlen = r.headers.get("content-length") step = (100 / int(dlen)) prog = 0 if not fname: fname = unquote(Path(r.url).name) with open(filepath+"/"+fname, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: prog += len(chunk) if progf: progf(int(step * prog)) f.write(chunk) if progf: progf(0) return filepath+"/"+fname
def owllook_delete_bookmark(request): """ ???? :param request: :return: : -1 ??session?? ?????? : 0 ?????? : 1 ?????? """ user = request['session'].get('user', None) data = parse_qs(str(request.body, encoding='utf-8')) bookmarkurl = data.get('bookmarkurl', '') if user and bookmarkurl: bookmark = unquote(bookmarkurl[0]) try: motor_db = motor_base.get_db() await motor_db.user_message.update_one({'user': user}, {'$pull': {'bookmarks': {"bookmark": bookmark}}}) LOGGER.info('??????') return json({'status': 1}) except Exception as e: LOGGER.exception(e) return json({'status': 0}) else: return json({'status': -1})
def owl_bd_novels(request, name): """ ???????? :param request: :param name: ??? :return: ?????? """ name = unquote(name) novels_name = 'intitle:{name} ?? ??'.format(name=name) try: res = await cache_owllook_baidu_novels_result(novels_name) parse_result = [] if res: parse_result = [i for i in res if i] UniResponse.SUCCESS.update({ResponseField.DATA: parse_result, ResponseField.FINISH_AT: get_time()}) return response_handle(request, UniResponse.SUCCESS, 200) except Exception as e: LOGGER.exception(e) return response_handle(request, UniResponse.SERVER_UNKNOWN_ERR, 500)
def get_music(artist, album, title): '''Get a track tags or download it''' page_format = request.args.get('format', 'html') artist = unquote(artist) album = unquote(album) title = unquote(title) collection = app.config['COLLECTION'] mf = MusicFilter(artists=[artist], albums=[album], titles=[title]) musics = webfilter(partial(collection.filter, cursor_factory=RealDictCursor), mf) if len(musics) != 1: return ('Music not found', 404) music = musics[0] if page_format == 'html': return render_template("music.html", music=music) elif page_format == 'json': return dumps(music, sort_keys=True, indent=4, separators=(',', ': ')) return ('Invalid format, available: json,html', 400)
def __init__(self, line): """Parse a lookup request from Squid into its constituent parts""" parts = line.decode().replace('\n', '').split(' ') # See if we're using concurrency; if so the first token is the integer channel ID try: self._channel = int(parts[0]) parts.pop(0) except ValueError: pass # First non-channel argument must be the client IP address # Failure to parse the client address is handled later on # by detecting the object's client property being None. addr = parts.pop(0) if addr != '-': try: self._client = ipaddress.ip_address(addr) except ValueError: pass # Everything else is ACL arguments self._acl = [unquote(p) for p in parts]
def test_file_head_response(file_name, static_file_directory): app = Sanic('test_file_helper') @app.route('/files/<filename>', methods=['GET', 'HEAD']) async def file_route(request, filename): file_path = os.path.join(static_file_directory, filename) file_path = os.path.abspath(unquote(file_path)) stats = await async_os.stat(file_path) headers = dict() headers['Accept-Ranges'] = 'bytes' headers['Content-Length'] = str(stats.st_size) if request.method == "HEAD": return HTTPResponse( headers=headers, content_type=guess_type(file_path)[0] or 'text/plain') else: return file(file_path, headers=headers, mime_type=guess_type(file_path)[0] or 'text/plain') request, response = app.test_client.head('/files/{}'.format(file_name)) assert response.status == 200 assert 'Accept-Ranges' in response.headers assert 'Content-Length' in response.headers assert int(response.headers[ 'Content-Length']) == len( get_file_content(static_file_directory, file_name))
def decode_local_uri(uri, transcoder, probe, preferred_transcoder): url = unquote(urlparse(uri).path) mime = get_mimetype(url, probe) transcode = False if transcoder: transcode = True for k in supported_formats.keys(): if mime == supported_formats[k][0]: transcode = False metadata = None thumb = None if os.path.exists(url): metadata, thumb, image_mime = get_metadata(url, mime, preferred_transcoder) return (url, True, mime, transcode and transcoder, metadata, thumb, image_mime) else: return None
def run(self, paths = []): items = [] for item in SideBarSelection(paths).getSelectedItems(): if item.isUnderCurrentProject(): txt = item.url('url_production') try: txt = urlunquote(txt.encode('utf8')).decode('utf8') except TypeError: txt = urlunquote(txt) items.append(txt) if len(items) > 0: sublime.set_clipboard("\n".join(items)); if len(items) > 1 : sublime.status_message("Items URL copied") else : sublime.status_message("Item URL copied")
def confirm(actor: Actor, form_action_value: str) -> Response: """Confirm the email address provided by the Supplier. :param actor: a namedtuple with Actor details :param form_action_value: form action from SSO Confirm your email page :return: response object """ session = actor.session # in order to be redirected to the correct URL we have `unquote` # the form_action_value url = "{}{}".format( get_absolute_url("sso:landing"), unquote(form_action_value)) headers = {"Referer": url} data = {"csrfmiddlewaretoken": actor.csrfmiddlewaretoken} return make_request( Method.POST, url, session=session, headers=headers, data=data)
def parse_content_metadata(response): """ Given a Response object from Requests, return the following information about it: * The file name * The content type, as a string * The content length, as an integer number of bytes """ file_name = None content_disposition = response.headers["Content-Disposition"] if content_disposition: result = CONTENT_DISPOSITION_RE.search(content_disposition) if result: file_name = unquote(result.group('filename')) if not file_name: file_name = unquote(os.path.basename(response.url)) content_type = response.headers["Content-Type"] content_length = response.headers["Content-Length"] if content_length: content_length = int(content_length) return file_name, content_type, content_length
def _parse_qsl(qs): r = [] for pair in qs.replace(';', '&').split('&'): if not pair: continue nv = pair.split('=', 1) if len(nv) != 2: nv.append('') key = urlunquote(nv[0].replace('+', ' ')) value = urlunquote(nv[1].replace('+', ' ')) r.append((key, value)) return r
def __init__(self, item): type, url = self._get_type_and_url(item) self.id = item['id'] self.subreddit = item['subreddit'] self.title = unquote(item['title']) self.score = int(item['score']) self.url = url self.comments = 'https://redd.it/' + item['id'] self.created_at = int(item['created_utc']) self.type = type
def __call__(self, environ, start_response): for key in 'REQUEST_URL', 'REQUEST_URI', 'UNENCODED_URL': if key not in environ: continue request_uri = unquote(environ[key]) script_name = unquote(environ.get('SCRIPT_NAME', '')) if request_uri.startswith(script_name): environ['PATH_INFO'] = request_uri[len(script_name):] \ .split('?', 1)[0] break return self.app(environ, start_response)
def is_valid_url(self, url): """check url is valid or not @return: True or False """ r1 = urlparse(unquote(str(url))).scheme r2 = urlparse(unquote(str(url))).netloc is_exit = str(r1) == 'http' or str(r1) == 'https' is_git = str(r2) == 'github.com' or str(r2) == 'www.github.com' if is_exit and is_git: return True else: self.__msg_list.append(self.__URL_IS_NOT_VALID) return False
def is_header(self, url): """check url is header or not @return: True or False """ result = urlparse(unquote(str(url))).path if len(str(result)) > 4: if str(result)[-2:] == '.h' or str(result)[-4:] == '.hpp': return True else: self.__msg_list.append(self.__URL_IS_NOT_VALID) else: self.__msg_list.append(self.__URL_IS_NOT_VALID) return False
def insert_db(self, data): """query insert db @return: query """ url = unquote(str(data.get_url())) name = str(data.get_name()) creator = str(data.get_creator()) description = str(data.get_description()) q1 = "INSERT INTO libraries(url, name, creator, description) " q2 = "SELECT '{}', '{}', '{}', '{}'".format(url, name, creator, description) q3 = "WHERE NOT EXISTS (SELECT 1 FROM libraries WHERE name = '{}');".format(name) query = "{} {} {}".format(q1, q2, q3) return query
def _download_filename(cls, response_url): return urlunquote(response_url.split("/")[-1])
def unicode_unquote(string): if string is None: return None if six.PY3: return unquote(string) return to_unicode(unquote(to_utf8(string)))
def corpora_list(): page = 1 query = None category = None doc = None if 'page' in request.args: page = request.args['page'] if 'query' in request.args: query = request.args['query'] if 'category' in request.args: category = request.args['category'] if 'doc' in request.args: doc = unquote(request.args['doc']) return Response(json.dumps(get_sentences(page=page, query=query, category=category, document=doc)), mimetype='application/json')
def compat_urllib_parse_unquote_plus(string, encoding='utf-8', errors='replace'): """Like unquote(), but also replace plus signs by spaces, as required for unquoting HTML form values. unquote_plus('%7e/abc+def') -> '~/abc def' """ string = string.replace('+', ' ') return compat_urllib_parse_unquote(string, encoding, errors)
def _parse_qsl(qs, keep_blank_values=False, strict_parsing=False, encoding='utf-8', errors='replace'): qs, _coerce_result = qs, compat_str pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] r = [] for name_value in pairs: if not name_value and not strict_parsing: continue nv = name_value.split('=', 1) if len(nv) != 2: if strict_parsing: raise ValueError("bad query field: %r" % (name_value,)) # Handle case of a control-name with no equal sign if keep_blank_values: nv.append('') else: continue if len(nv[1]) or keep_blank_values: name = nv[0].replace('+', ' ') name = compat_urllib_parse_unquote( name, encoding=encoding, errors=errors) name = _coerce_result(name) value = nv[1].replace('+', ' ') value = compat_urllib_parse_unquote( value, encoding=encoding, errors=errors) value = _coerce_result(value) r.append((name, value)) return r
def update_item(self, item, status=ItemRecordStatuses.OK, parent_path=None): """ :param onedrivee.api.items.OneDriveItem item: :param str status: One value of enum ItemRecordStatuses. :param str parent_path: If item does not have a parent reference, fallback to this path. """ parent_ref = item.parent_reference try: # the remote url is encoding with ASCII, we should convert to unicode # note: item needs encoded url to download file content parent_path = url_parse.unquote(parent_ref.path) except Exception: pass if item.is_folder: crc32_hash = None sha1_hash = None else: file_facet = item.file_props #it seems some json response objects don't hava hash metadata if file_facet.hashes is not None: crc32_hash = file_facet.hashes.crc32 sha1_hash = file_facet.hashes.sha1 else: item_local_path = self.remote_path_to_local_path(parent_path + "/" + item.name) crc32_hash = hasher.crc32_value(item_local_path) sha1_hash = hasher.hash_value(item_local_path) created_time_str = datetime_to_str(item.created_time) modified_time_str = datetime_to_str(item.modified_time) self.lock.acquire_write() self._cursor.execute( 'INSERT OR REPLACE INTO items (item_id, type, item_name, parent_id, parent_path, etag, ' 'ctag, size, created_time, modified_time, status, crc32_hash, sha1_hash)' ' VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', (item.id, item.type, item.name, parent_ref.id, parent_path, item.e_tag, item.c_tag, item.size, created_time_str, modified_time_str, status, crc32_hash, sha1_hash)) self._conn.commit() self.lock.release_write()
def get_full_path(self, path_info): """ Get local filename path from path_info. """ path_info = utils.decode_path_info(path_info) path_info = posixpath.normpath(urlunquote(path_info)) path = os.path.normpath(self.root + path_info) if (self.default_extension and not os.path.exists(path) and os.path.splitext(path)[1] == '' and os.path.isfile(path + self.default_extension)): path += self.default_extension return path
def list_directory(self, path, environ, start_response): """ Return a directory listing. """ try: names = os.listdir(path) except os.error: # pragma: no cover return self.error(404, environ, start_response) names.sort(key=lambda a: a.lower()) items = [] for name in names: fullname = os.path.join(path, name) displayname = linkname = name # Append / for directories or @ for symbolic links if os.path.isdir(fullname): displayname = name + "/" linkname = name + "/" if os.path.islink(fullname): displayname = name + "@" # Note: a link to a directory displays with @ and links with / items.append('<li><a href="{}">{}</a></li>'.format( urlquote(linkname), html_escape(displayname) )) f = io.BytesIO() f.write(self.directory_template.format( displaypath=html_escape(urlunquote(wsgiref.util.request_uri(environ))), items=os.linesep.join(items) ).encode(self.encoding)) length = f.tell() f.seek(0) headers = [ ('Content-Length', str(length)), ('Content-type', 'text/html; charset={}'.format(self.encoding)) ] start_response(self.get_status(200), headers) file_wrapper = environ.get('wsgi.file_wrapper', wsgiref.util.FileWrapper) return file_wrapper(f)
def command_on_VTE(command): """ command_on_VTE: VTE Üzerindeki Komutlar Nesne konumlar? listesinden tek tek gelen konumlar? iki ö?eli listeye çevirir. Gelen konum örn:(file:///etc/fux-release) ?eklindedir. Bu konumu ilk çift ters Bölü i?aretinden bölerek ikiye ay?r?r. örn:(['file:','/etc/fux-release']). Bu Listenin 1. eleman? olan as?l konumdaki Türkçe karakterler, urllib ile utf-8 Karakter kodlamas?na çevirilerek, içinde /home veya /trash de?eri olup olmad???na Bak?l?r. E?er bu de?erler içinde varsa önceden tan?ml? ev ve çöp dizinlerinin Gerçek konumlar? ekrana bas?l?r. /home ve /trash ibareleri konum içinde yok ise Konum oldu?u gibi terminal ekran?na bas?l?r ve her halükarda konumlar? ay?rmak için sonuna iki bo?luk eklenir. """ files = command.split("//") all_files = unquote(unquote(files[1])) + " " length = len(all_files) + 1 if all_files in "/home": all_files = os.environ["HOME"] content = terminal.feed_child(all_files, length) elif all_files in "/trash": all_files = os.environ["HOME"] + "/.local/share/Trash/files" content = terminal.feed_child(all_files, length) else: content = terminal.feed_child(all_files, length)