我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urlparse.parse_qsl()。
def read_multi(self, environ, keep_blank_values, strict_parsing): """Internal: read a part that is itself multipart.""" ib = self.innerboundary if not valid_boundary(ib): raise ValueError, 'Invalid boundary in multipart form: %r' % (ib,) self.list = [] if self.qs_on_post: for key, value in urlparse.parse_qsl(self.qs_on_post, self.keep_blank_values, self.strict_parsing): self.list.append(MiniFieldStorage(key, value)) FieldStorageClass = None klass = self.FieldStorageClass or self.__class__ part = klass(self.fp, {}, ib, environ, keep_blank_values, strict_parsing) # Throw first part away while not part.done: headers = rfc822.Message(self.fp) part = klass(self.fp, headers, ib, environ, keep_blank_values, strict_parsing) self.list.append(part) self.skip_lines()
def googlepass(url): try: try: headers = dict(urlparse.parse_qsl(url.rsplit('|', 1)[1])) except: headers = None url = url.split('|')[0].replace('\\', '') url = client.request(url, headers=headers, output='geturl') if 'requiressl=yes' in url: url = url.replace('http://', 'https://') else: url = url.replace('https://', 'http://') if headers: url += '|%s' % urllib.urlencode(headers) return url except: return
def moonwalk(link, ref, season, episode): try: if season and episode: q = dict(urlparse.parse_qsl(urlparse.urlsplit(link).query)) q.update({'season': season, 'episode': episode}) q = (urllib.urlencode(q)).replace('%2C', ',') link = link.replace('?' + urlparse.urlparse(link).query, '') + '?' + q trans = __get_moonwalk_translators(link, ref) trans = trans if trans else [(link, '')] urls = [] for i in trans: urls += __get_moonwalk(i[0], ref, info=i[1]) return urls except: return []
def do_GET(s): """Handle a GET request. Parses the query parameters and prints a message if the flow has completed. Note that we can't detect if an error occurred. """ s.send_response(200) s.send_header("Content-type", "text/html") s.end_headers() query = s.path.split('?', 1)[-1] query = dict(parse_qsl(query)) s.server.query_params = query s.wfile.write("<html><head><title>Authentication Status</title></head>") s.wfile.write("<body><p>The authentication flow has completed.</p>") s.wfile.write("</body></html>")
def _add_query_parameter(url, name, value): """Adds a query parameter to a url. Replaces the current value if it already exists in the URL. Args: url: string, url to add the query parameter to. name: string, query parameter name. value: string, query parameter value. Returns: Updated query parameter. Does not update the url if value is None. """ if value is None: return url else: parsed = list(urlparse.urlparse(url)) q = dict(parse_qsl(parsed[4])) q[name] = value parsed[4] = urllib.urlencode(q) return urlparse.urlunparse(parsed)
def strip_query(self, query): result = [] for k, v in urlparse.parse_qsl(query, keep_blank_values=True): # Ignore some keys. k_lower = k.lower() ignore = False for ignored_key in self._strip_query_variables: ignored_key = ignored_key.lower() if k_lower.endswith(ignored_key) or \ k_lower.startswith(ignored_key): ignore = True if ignore: continue # remove k-v pair if key gets stripped away stripped_k = self.re_code.sub('', k) if not stripped_k: continue # keep k-v pair only if value remains stripped_v = self.re_code.sub('', v) if stripped_v == v or stripped_v: result.append((k, v)) return urllib.urlencode(result)
def get_token(self, chal): """ Try to complete the challenge. :param chal: challenge string :returns: token or None """ url = urlparse(chal.pop('realm')) query = urlencode(dict(parse_qsl(url.query), **chal)) url = url._replace(query=query).geturl() auth = None if self.username is None \ else (self.username, self.password) response = requests.get(url, auth=auth, timeout=REQUEST_TIMEOUT) data = _json_or_none(response) if isinstance(data, dict): return data.get('token')
def facebook_compliance_fix(session): def _compliance_fix(r): # if Facebook claims to be sending us json, let's trust them. if 'application/json' in r.headers.get('content-type', {}): return r # Facebook returns a content-type of text/plain when sending their # x-www-form-urlencoded responses, along with a 200. If not, let's # assume we're getting JSON and bail on the fix. if 'text/plain' in r.headers.get('content-type', {}) and r.status_code == 200: token = dict(parse_qsl(r.text, keep_blank_values=True)) else: return r expires = token.get('expires') if expires is not None: token['expires_in'] = expires token['token_type'] = 'Bearer' r._content = to_unicode(dumps(token)).encode('UTF-8') return r session.register_compliance_hook('access_token_response', _compliance_fix) return session
def get_parsed_body(request): """Get the body (json or formData) of the request parsed. Args: request: request object to get the body from. Returns: A dictionary of the body. Raises: ValueError: if it is neither a json or a formData """ data = None if not request.parsed_body == '': try: data = json.loads(request.body.decode('utf-8')) except Exception: # Decode formData data = dict(parse_qsl(request.body)) if not data: # Not a form data raise return data
def do_GET(self): # noqa self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() query_params = dict(parse_qsl(urlparse(self.path).query)) code = query_params.get('code') if code: self.wfile.write( six.b( HTML_TEMPLATE.substitute( post_login_message=DOC_URL, login_result='Login successful'))) self.server.return_code(code) else: msg = query_params.get( 'error_description', query_params.get('error')) self.wfile.write( six.b( HTML_TEMPLATE.substitute( post_login_message=msg, login_result='Login failed'))) self.server.return_code(LocalServerError(msg))
def format_body(self, body): urlencoed_list = urlparse.parse_qsl(body) if not urlencoed_list: return "" max_length = max(map(lambda kv: len(kv[0]), urlencoed_list)) texts = [] for k, v in urlencoed_list: formatter = self._get_value_formatter(v) try: formatted_content = formatter.format_body(v) except: texts.append("{0}: {1}".format( k.ljust(max_length), v)) else: texts.append("{0}:".format(k.ljust(max_length))) texts.append(formatted_content) return u"\n".join(texts)
def format_tui(self, body): urlencoed_list = urlparse.parse_qsl(body) if not urlencoed_list: return [] max_length = max(map(lambda kv: len(kv[0]), urlencoed_list)) texts = [] for k, v in urlencoed_list: formatter = self._get_value_formatter(v) try: formatted_list = formatter.format_tui(v) except: texts.append(Text("{0}: {1}".format( k.ljust(max_length), v))) else: texts.append(Text("{0}:".format(k.ljust(max_length)))) texts = texts + formatted_list return texts
def format_console(self, body): # pragma: no cover urlencoed_list = urlparse.parse_qsl(body) if not urlencoed_list: return "" max_length = max(map(lambda kv: len(kv[0]), urlencoed_list)) texts = [] for k, v in urlencoed_list: formatter = self._get_value_formatter(v) try: formatted_content = formatter.format_console(v) except: texts.append("{0}: {1}".format( k.ljust(max_length), v)) else: texts.append("{0}:".format(k.ljust(max_length))) texts.append(formatted_content) return u"\n".join(texts)
def router(params_string): addon = xbmcaddon.Addon() kodi_wrapper = kodiwrapper.KodiWrapper(_handle, _url, addon) stream_service = urltostreamservice.UrlToStreamService(vrtplayer.VRTPlayer._VRT_BASE, vrtplayer.VRTPlayer._VRTNU_BASE_URL, kodi_wrapper) vrt_player = vrtplayer.VRTPlayer(addon.getAddonInfo("path"), kodi_wrapper, stream_service) params = dict(parse_qsl(params_string)) if params: if params['action'] == actions.LISTING_AZ: vrt_player.show_az_menu_items() elif params['action'] == actions.LISTING_CATEGORIES: vrt_player.show_category_menu_items() elif params['action'] == actions.LISTING_LIVE: vrt_player.show_livestream_items() elif params['action'] == actions.LISTING_VIDEOS: vrt_player.show_videos(params['video']) elif params['action'] == actions.LISTING_CATEGORY_VIDEOS: vrt_player.show_video_category_episodes(params['video']) elif params['action'] == actions.PLAY: vrt_player.play_vrtnu_video(params['video']) elif params['action'] == actions.PLAY_LIVE: vrt_player.play_livestream(params['video']) else: vrt_player.show_main_menu_items()
def do_GET(self): """Handle a GET request. Parses the query parameters and prints a message if the flow has completed. Note that we can't detect if an error occurred. """ self.send_response(200) self.send_header("Content-type", "text/html") self.end_headers() query = self.path.split('?', 1)[-1] query = dict(urlparse.parse_qsl(query)) self.server.query_params = query self.wfile.write("<html><head><title>Authentication Status</title></head>") self.wfile.write("<body><p>The authentication flow has completed.</p>") self.wfile.write("</body></html>")
def _add_query_parameter(url, name, value): """Adds a query parameter to a url. Replaces the current value if it already exists in the URL. Args: url: string, url to add the query parameter to. name: string, query parameter name. value: string, query parameter value. Returns: Updated query parameter. Does not update the url if value is None. """ if value is None: return url else: parsed = list(urlparse.urlparse(url)) q = dict(urlparse.parse_qsl(parsed[4])) q[name] = value parsed[4] = urllib.urlencode(q) return urlparse.urlunparse(parsed)
def from_html(self, cr, uid, model, field, element, context=None): url = element.find('img').get('src') url_object = urlparse.urlsplit(url) if url_object.path.startswith('/website/image'): # url might be /website/image/<model>/<id>[_<checksum>]/<field>[/<width>x<height>] fragments = url_object.path.split('/') query = dict(urlparse.parse_qsl(url_object.query)) model = query.get('model', fragments[3]) oid = query.get('id', fragments[4].split('_')[0]) field = query.get('field', fragments[5]) item = self.pool[model].browse(cr, uid, int(oid), context=context) return item[field] if self.local_url_re.match(url_object.path): return self.load_local_url(url) return self.load_remote_url(url)
def router(paramstring): """ Router function that calls other functions depending on the provided paramstring :param paramstring: :return: """ # Parse a URL-encoded paramstring to the dictionary of # {<parameter>: <value>} elements params = dict(parse_qsl(paramstring)) # Check the parameters passed to the plugin if 'action' in params: if params['action'] == 'listing': # Display the list of videos in a provided category. list_videos(params['category'],int(params['offset'])) elif params['action'] == 'play': # Play a video from a provided URL. play_video(params['video']) else: # If the plugin is called from Kodi UI without any parameters, # display the list of video categories if 'offset' in params: list_categories(int(params['offset'])) else: list_categories(0)
def __init__(self): self.gui = KodiVkGUI(self) self.conn = self.__connect_() if not self.conn: raise Exception() u_info = self.conn.users.get()[0] self.u = User(u_info['id'], self.conn) self.u.set_info() p = {'do': _DO_HOME} if sys.argv[2]: p.update(dict(urlparse.parse_qsl(sys.argv[2][1:]))) p['oid'] = int(p.get('oid', self.u.info['id'])) self.params = p if 'content_type' not in self.params.keys(): cw_id = xbmcgui.getCurrentWindowId() if cw_id in (10006, 10024, 10025, 10028): self.params['content_type'] = _CTYPE_VIDEO #elif id in (10005, 10500, 10501, 10502): # self.params['content_type'] = _CTYPE_AUDIO elif id in (10002,): self.params['content_type'] = _CTYPE_IMAGE self.c_type = self.params.get('content_type', None)
def assign(service, arg): if 0: II111iiii if service != '''www''': return if 0: I1IiiI * Oo0Ooo / OoO0O00.OoOoOO00.o0oOOo0O0Ooo / I1ii11iIi11i I1IiI = urlparse.urlparse(arg) o0OOO = urlparse.parse_qsl(I1IiI.query) for iIiiiI, Iii1ii1II11i in o0OOO: arg = arg.replace(Iii1ii1II11i, iIiiiI) if 0: I1iII1iiII + I1Ii111 / OOo if urlparse.urlparse(arg).query.find('''=''') == -1 or len(o0OOO) > 6: return if 0: I1II1 return (True, arg) if 0: iII1iII1i1iiI % iiIIIII1i1iI % iiI11iii111 % i1I1Ii1iI1ii if 0: I1iII1iiII / i1IIi % II111iiii - OoOoOO00
def audit(arg): Ii1iI = arg Oo = urlparse.urlparse(Ii1iI) I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme, Oo.netloc, Oo.path, decode(''), decode(''))) Oo0Ooo = urlparse.parse_qsl(Oo.query) if 0: iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')] for O0O0OO0O0O0, iiiii in Oo0Ooo: if O0O0OO0O0O0 in oo: continue debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i) IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii) if IiII1I1i1i1ii: security_info(IiII1I1i1i1ii[1]) return if 0: OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1
def audit(arg): ooO0oooOoO0 = arg II11i = urlparse.urlparse(ooO0oooOoO0) i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme, II11i.netloc, II11i.path, decode(''), decode(''))) Oo0Ooo = urlparse.parse_qsl(II11i.query) i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')] i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')] for I11 in i11: for O0O0OO0O0O0, iiiii in Oo0Ooo: if O0O0OO0O0O0 in i1111: continue debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O) Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii) if Oo0o0000o0o0: security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1])) return if 0: iiiii11iII1 % O0o
def _post(self, cmd, data={}): assert cmd in CMD_LIST assert self.user_id data.update({ 'cmd': cmd, 'userid': self.user_id, }) resp = requests.post(REST_URL, data=data) success = False error = None content = {} if resp.status_code == 200: data = dict(parse_qsl(resp.text)) if data['state'] == '1': success = True elif 'errorMessage' in data: error = data['errorMessage'] content = Struct(**{k: v for k, v in data.items() if k not in ('state', 'errorMessage')}) result = PayAppInternalResult(success=success, error=error, content=content) return result
def verify_callback(self, params): """??? ?? ????? ?????. :param params: `dict` ?? ?? ??? `str`, POST ??? """ if type(params) is str: params = Struct(**dict(parse_qsl(params))) elif type(params) is dict: params = Struct(**params) valid = True try: assert self.user_id == params.userid, u'??? ?? ???? ???? ????.' assert self.link_key == params.linkkey, u'?? KEY? ???? ????.' assert self.link_value == params.linkval, u'' except (AssertionError, AttributeError) as e: valid = False print(e) return valid
def resolve(url): try: url = urlparse.urlparse(url).query url = urlparse.parse_qsl(url)[0][1] url = 'http://videomega.tv/cdn.php?ref=%s' % url #control.log("### VIDEOMEGA RES %s" % url) result = client.request(url, mobile=True) #control.log("### VIDEOMEGA RES %s" % result) result = re.compile('eval.*?{}\)\)').findall(result)[-1] #control.log("### VIDEOMEGA RES2 %s" % result) result = jsunpack.unpack(result) #control.log("### VIDEOMEGA RE3 %s" % result) #"src", "http://abo.cdn.vizplay.org/m2/769a65801d8e8a110452f2b74d4082d1.mp4?st=-A2O2o2soMR81Niiag5EyA&hash=Dw8Kth5gh-nNyMRbWLZMKA" url = re.compile('"src","(.*?)"').findall(result)[-1] #control.log("### VIDEOMEGA RE4 %s" % url) return url except: return
def oembed(url, params=""): """ Render an OEmbed-compatible link as an embedded item. :param url: A URL of an OEmbed provider. :return: The OEMbed ``<embed>`` code. """ # Note: this method isn't currently very efficient - the data isn't # cached or stored. kwargs = dict(urlparse.parse_qsl(params)) try: return mark_safe(get_oembed_data( url, **kwargs )['html']) except (KeyError, ProviderException): if settings.DEBUG: return "No OEmbed data returned" return ""
def unmake(self, st): p = st.split(".") if len(p) != 2: return None s = self.decode(p[1].decode('hex')) if s == None: return None h = SHA.new() h.update(self.mac_key) h.update(s) f = h.hexdigest() print s if p[0] != f: return None kv = urlparse.parse_qsl(s) ret ={} for k, v in kv: ret[k] = v return ret
def router(paramstring): params = dict(parse_qsl(paramstring[1:])) if params: if params['mode'] == 'play': play_item = xbmcgui.ListItem(path=params['link']) xbmcplugin.setResolvedUrl(__handle__, True, listitem=play_item) else: for stream in streams(): list_item = xbmcgui.ListItem(label=stream['name'], thumbnailImage=stream['thumb']) list_item.setProperty('fanart_image', stream['thumb']) list_item.setProperty('IsPlayable', 'true') url = '{0}?mode=play&link={1}'.format(__url__, stream['link']) xbmcplugin.addDirectoryItem(__handle__, url, list_item, isFolder=False) xbmcplugin.endOfDirectory(__handle__) # -------------------------------------------------------------------------------- # Main # --------------------------------------------------------------------------------
def router(paramstring): """Decides what to do based on script parameters""" check_settings() params = dict(parse_qsl(paramstring)) # Nothing to do yet with those if not params: # Demo channel list channels = map_channels(filter_channels(get_tv_channels())) xbmcplugin.addDirectoryItems(plugin_handle, channels, len(channels)) xbmcplugin.addSortMethod( plugin_handle, xbmcplugin.SORT_METHOD_LABEL_IGNORE_THE) xbmcplugin.endOfDirectory(plugin_handle) elif params['action'] == 'play': play_channel(params['channel']) elif params['action'] == 'get_user_id': get_user_id()
def router(paramstring): """Router function that calls other functions depending on the provided paramstring.""" params = dict(urlparse.parse_qsl(paramstring)) if params: if params['action'] == 'play_event': play(params['channel_id'], params['airing_id']) if params['action'] == 'play_channel': play(params['channel_id']) elif params['action'] == 'list_events': list_events(params['schedule_type']) elif params['action'] == 'list_events_by_date': list_events(params['schedule_type'], params['filter_date']) elif params['action'] == 'list_upcoming_days': list_upcoming_days() elif params['action'] == 'show_auth_details': show_auth_details() elif params['action'] == 'search': search() elif params['action'] == 'dialog': dialog(params['dialog_type'], params['heading'], params['message']) elif params['action'] == 'channel_to_favs': channel_to_favs(params['channel_name'], params['channel_id']) else: main_menu()
def GET(self): """ list all rucio accounts. HTTP Success: 200 OK HTTP Error: 401 Unauthorized 500 InternalError :param Rucio-Account: Account identifier. :param Rucio-Auth-Token: as an 32 character hex string. :returns: A list containing all account names as dict. """ header('Content-Type', 'application/x-json-stream') filter = {} if ctx.query: filter = dict(parse_qsl(ctx.query[1:])) for account in list_accounts(filter=filter): yield render_json(**account) + "\n"
def parse_qsl(qs, keep_blank_values=0, strict_parsing=0): """Parse a query given as a string argument.""" warn("cgi.parse_qsl is deprecated, use urlparse.parse_qsl instead", PendingDeprecationWarning, 2) return urlparse.parse_qsl(qs, keep_blank_values, strict_parsing)
def read_urlencoded(self): """Internal: read data in query string format.""" qs = self.fp.read(self.length) if self.qs_on_post: qs += '&' + self.qs_on_post self.list = list = [] for key, value in urlparse.parse_qsl(qs, self.keep_blank_values, self.strict_parsing): list.append(MiniFieldStorage(key, value)) self.skip_lines()
def get_url_query(url): parsed_url = urlparse(url) url_query = parse_qsl(parsed_url.fragment) # login_response_url_query can have multiple key url_query = dict(url_query) return url_query