我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用urlparse.parse_qs()。
def do_GET(self): parsed_path = urlparse.urlparse(self.path) action = parsed_path.path params = urlparse.parse_qs(parsed_path.query, keep_blank_values=1) logging.info('Action for GET method is: %s.', action) for param in params: logging.info('%s=%s', param, params[param][0]) if action == '/kill': self._KillTestServer(params) elif action == '/ping': # The ping handler is used to check whether the spawner server is ready # to serve the requests. We don't need to test the status of the test # server when handling ping request. self._SendResponse(200, 'OK', {}, 'ready') logging.info('Handled ping request and sent response.') else: self._SendResponse(400, 'Unknown request', {}, '') logging.info('Encounter unknown request: %s.', action)
def process_POST_request(request): dict_ = urlparse.parse_qs(request.text) def htmlify(thing): try: html = dict_[thing][0] except KeyError as e: html = '' return '<html>' + html + '</html>' uri = dict_['uri'][0] head = htmlify('head') body = htmlify('body') try: text = dict_['data'][0] except KeyError as e: text = '' headsoup = BeautifulSoup(head, 'lxml') bodysoup = BeautifulSoup(body, 'lxml') target_uri = getUri(uri, headsoup, bodysoup) doi = getDoi(headsoup, bodysoup) return target_uri, doi, head, body, text
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) query = urlparse.urljoin(self.base_link, self.search_link) query = query % urllib.quote_plus(data['tvshowtitle']) t = cleantitle.get(data['tvshowtitle']) r = client.request(query) r = client.parseDOM(r, 'div', attrs = {'class': 'thumb'}) r = [(client.parseDOM(i, 'a', ret='href'), client.parseDOM(i, 'a', ret='title'), re.findall('(\d{4})', i)) for i in r] r = [(i[0][0], i[1][0], i[2][0]) for i in r if len(i[0]) > 0 and len(i[1]) > 0 and len(i[2]) > 0] url = [i[0] for i in r if t in cleantitle.get(i[1]) and ('Season %s' % season) in i[1]][0] url += '?episode=%01d' % int(episode) return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) year = re.findall('(\d{4})', premiered)[0] season = '%01d' % int(season) ; episode = '%01d' % int(episode) tvshowtitle = '%s %s: Season %s' % (data['tvshowtitle'], year, season) url = cache.get(self.pidtv_tvcache, 120, tvshowtitle) if url == None: raise Exception() url += '?episode=%01d' % int(episode) url = url.encode('utf-8') return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) headers = eval(data['headers']) aliases = eval(data['aliases']) title = data['tvshowtitle'] if 'tvshowtitle' in data else data['title'] title = cleantitle.getsearch(title) query = self.search_link % (urllib.quote_plus(title)) query = urlparse.urljoin(self.base_link, query) r = client.request(query, headers=headers, timeout='30', mobile=True) match = re.compile('alias=(.+?)\'">(.+?)</a>').findall(r) r = [(i[0], re.findall('(.+?)\s+-\s+Season\s+(\d+)', i[1])) for i in match] r = [(i[0], i[1][0][0], i[1][0][1]) for i in r if len(i[1]) > 0] r = [i[0] for i in r if self.matchAlias(i[1], aliases) and int(season) == int(i[2])][0] url = {'type': 'tvshow', 'id': r, 'episode': episode, 'season': season, 'headers': headers} url = urllib.urlencode(url) return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) year = re.findall('(\d{4})', premiered)[0] if int(year) >= 2016: raise Exception() url = re.sub('[^A-Za-z0-9]', '-', data['tvshowtitle']).lower() url = self.tvsearch_link % (url, data['year'], '%01d' % int(season), '%01d' % int(episode)) r = urlparse.urljoin(self.base_link, url) r = client.request(r, output='geturl') if not data['year'] in r: raise Exception() return url except: return
def resolve(self, url): try: b = urlparse.urlparse(url).netloc b = re.compile('([\w]+[.][\w]+)$').findall(b)[0] if not b in base64.b64decode(self.b_link): return url u, p, h = url.split('|') r = urlparse.parse_qs(h)['Referer'][0] #u += '&app_id=Exodus' c = self.request(r, output='cookie', close=False) result = self.request(u, post=p, referer=r, cookie=c) url = result.split('url=') url = [urllib.unquote_plus(i.strip()) for i in url] url = [i for i in url if i.startswith('http')] url = url[-1] return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if not url: return data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) tvshowtitle = data['tvshowtitle'] localtvshowtitle = data['localtvshowtitle'] aliases = source_utils.aliases_to_array(eval(data['aliases'])) year = data['year'] episode = tvmaze.tvMaze().episodeAbsoluteNumber(tvdb, int(season), int(episode)) url = self.__search([localtvshowtitle] + aliases, year, episode) if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + aliases, year, episode) return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if not url: return data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) data.update({'year': re.findall('(\d{4})', premiered)[0]}) tvshowtitle = data['tvshowtitle'] localtvshowtitle = data['localtvshowtitle'] aliases = source_utils.aliases_to_array(eval(data['aliases'])) url = self.__search([localtvshowtitle] + aliases, 'tv', data['year'], season, episode) if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + aliases, 'tv', data['year'], season, episode) return url except: return
def resolve(self, url): try: url = urlparse.urljoin(self.base_link, url) r = client.request(url, referer=self.base_link) r = json.loads(r)['Stream'] r = [(dom_parser.parse_dom(r, 'a', req='href'), dom_parser.parse_dom(r, 'iframe', req='src'))] r = [i[0][0].attrs['href'] if i[0] else i[1][0].attrs['src'] for i in r if i[0] or i[1]][0] if not r.startswith('http'): r = urlparse.parse_qs(r) r = [r[i][0] if r[i] and r[i][0].startswith('http') else (i, '') for i in r][0] return r except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if not url: return data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) title = data['localtvshowtitle'] title += ' S%02dE%02d' % (int(season), int(episode)) aliases = source_utils.aliases_to_array(eval(data['aliases'])) aliases = [i + ' S%02dE%02d' % (int(season), int(episode)) for i in aliases] url = self.__search([title] + aliases) if not url and data['tvshowtitle'] != data['localtvshowtitle']: title = data['tvshowtitle'] title += ' S%02dE%02d' % (int(season), int(episode)) url = self.__search([title] + aliases) return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if not url: return data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) tvshowtitle = data['tvshowtitle'] localtvshowtitle = data['localtvshowtitle'] aliases = source_utils.aliases_to_array(eval(data['aliases'])) year = re.findall('(\d{4})', premiered) year = year[0] if year else data['year'] url = self.__search([localtvshowtitle] + aliases, year, season, episode) if not url and tvshowtitle != localtvshowtitle: url = self.__search([tvshowtitle] + aliases, year, season, episode) return url except: return
def downloadVideo(self): url = unicode(self.tabWidget.currentWidget().url().toString()) # For youtube videos if validYoutubeUrl(url): vid_id = parse_qs(urlparse(url).query)['v'][0] url = 'https://m.youtube.com/watch?v=' + vid_id yt = YouTube(url) # Use PyTube module for restricted videos videos = yt.get_videos() dialog = youtube_dialog.YoutubeDialog(videos, self) if dialog.exec_() == 1 : index = abs(dialog.buttonGroup.checkedId())-2 vid = videos[index] reply = networkmanager.get( QNetworkRequest(QUrl.fromUserInput(vid.url)) ) self.handleUnsupportedContent(reply, vid.filename + '.' + vid.extension) return # For embeded HTML5 videos request = QNetworkRequest(self.video_URL) request.setRawHeader('Referer', self.video_page_url) reply = networkmanager.get(request) self.handleUnsupportedContent(reply)
def serial_class_for_url(url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'alt': raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,)) class_name = 'Serial' try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'class': class_name = values[0] else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e) return (''.join([parts.netloc, parts.path]), getattr(serial, class_name)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "socket": raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.socket') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) # get host and port host, port = parts.hostname, parts.port if not 0 <= port < 65536: raise ValueError("port not in range 0...65535") except ValueError as e: raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e) return (host, port) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "loop": raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.loop') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'spy': raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,)) # process options now, directly altering self formatter = FormatHexdump color = False output = sys.stderr try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'file': output = open(values[0], 'w') elif option == 'color': color = True elif option == 'raw': formatter = FormatRaw elif option == 'all': self.show_all = True else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e) self.formatter = formatter(output, color) return ''.join([parts.netloc, parts.path])
def query_params(self, value=None): """ Return or set a dictionary of query params :param dict value: new dictionary of values """ if value is not None: return URL._mutate(self, query=unicode_urlencode(value, doseq=True)) query = '' if self._tuple.query is None else self._tuple.query # In Python 2.6, urlparse needs a bytestring so we encode and then # decode the result. if not six.PY3: result = parse_qs(to_utf8(query), True) return dict_to_unicode(result) return parse_qs(query, True)
def test_auth_url(self): perms = ['email', 'birthday'] redirect_url = 'https://localhost/facebook/callback/' expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode( dict(client_id=self.app_id, redirect_uri=redirect_url, scope=','.join(perms))) actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms) # Since the order of the query string parameters might be # different in each URL, we cannot just compare them to each # other. expected_url_result = urlparse(expected_url) actual_url_result = urlparse(actual_url) expected_query = parse_qs(expected_url_result.query) actual_query = parse_qs(actual_url_result.query) self.assertEqual(actual_url_result.scheme, expected_url_result.scheme) self.assertEqual(actual_url_result.netloc, expected_url_result.netloc) self.assertEqual(actual_url_result.path, expected_url_result.path) self.assertEqual(actual_url_result.params, expected_url_result.params) self.assertEqual(actual_query, expected_query)
def parse(self, path): parsed = parse_qs(path[2:]) if "au" in parsed.keys(): for i in parsed: parsed[i] = " ".join(parsed[i]) print parsed return self.process(parsed) if "mk" in parsed.keys(): for i in parsed: parsed[i] = " ".join(parsed[i]) print parsed return self.gen(parsed) if "up" in parsed.keys() and "passwd" in parsed.keys(): for i in parsed: parsed[i] = " ".join(parsed[i]) print parsed return self.unpack(parsed) return "Not able to parse input"
def get_customers_by_action_callback(request): params = parse_qs(urlparse(request.url).query) if params['RecipientGroupID'][0] == '1' and params['ActionID'][0] == '2' and params['Date'][0] == '2015-06-24': if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params: if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',': resp_body = [ {'CustomerID': '231342', 'CustomerAttribute': 'BuddyZZ,UK'}, {'CustomerID': '943157', 'CustomerAttribute': 'Pax65,DE'} ] else: return 404, HEADERS['text'], 'Not Found' else: resp_body = [ {'CustomerID': '231342'}, {'CustomerID': '943157'} ] return 200, HEADERS['json'], json.dumps(resp_body) else: return 404, HEADERS['text'], 'Not Found'
def get_customer_actions_by_target_group_callback(request): params = parse_qs(urlparse(request.url).query) if params['TargetGroupID'][0] == '2' and params['Date'][0] == '2015-12-24': if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params: if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',': resp_body = [ {'CustomerID': 'A1342', 'ActionID': 49, 'ChannelID': 6, 'CustomerAttribute': 'BuddyZZ,UK'}, {'CustomerID': 'G4650', 'ActionID': 49, 'ChannelID': 6, 'CustomerAttribute': 'Mighty6,ES'} ] else: return 404, HEADERS['text'], 'Not Found' else: resp_body = [ {'CustomerID': 'A1342', 'ActionID': 49, 'ChannelID': 6}, {'CustomerID': 'G4650', 'ActionID': 49, 'ChannelID': 6} ] return 200, HEADERS['json'], json.dumps(resp_body) else: return 404, HEADERS['text'], 'Not Found'
def get_customer_one_time_actions_by_date_callback(request): params = parse_qs(urlparse(request.url).query) if params['Date'][0] == '2015-06-24': if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params: if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',': resp_body = [ {'CustomerID': '8D871', 'ActionID': 19, 'ChannelID': 3, 'CustomerAttribute': 'Yo999,UA'}, {'CustomerID': '8U76T', 'ActionID': 19, 'ChannelID': 3, 'CustomerAttribute': 'Neto2,TR'} ] else: return 404, HEADERS['text'], 'Not Found' else: resp_body = [ {'CustomerID': '8D871', 'ActionID': 19, 'ChannelID': 3}, {'CustomerID': '8U76T', 'ActionID': 19, 'ChannelID': 3} ] return 200, HEADERS['json'], json.dumps(resp_body) else: return 404, HEADERS['text'], 'Not Found'
def get_target_group_changers_callback(request): params = parse_qs(urlparse(request.url).query) if params['StartDate'][0] == '2015-09-01' and params['EndDate'][0] == '2015-09-30': if 'CustomerAttributes' in params and 'CustomerAttributesDelimiter' in params: if params['CustomerAttributes'][0] == 'Alias;Country' and params['CustomerAttributesDelimiter'][0] == ',': resp_body = [ {'CustomerID': '231342', 'InitialTargetGroupID': 4, 'FinalTargetGroupID': 12, 'CustomerAttribute': 'BuddyZZ,UK'}, {'CustomerID': '931342', 'InitialTargetGroupID': -1, 'FinalTargetGroupID': 8, 'CustomerAttribute': 'Pax65,DE'} ] else: return 404, HEADERS['text'], 'Not Found' else: resp_body = [ {'CustomerID': '231342', 'InitialTargetGroupID': 4, 'FinalTargetGroupID': 12}, {'CustomerID': '931342', 'InitialTargetGroupID': -1, 'FinalTargetGroupID': 8} ] return 200, HEADERS['json'], json.dumps(resp_body) else: return 404, HEADERS['text'], 'Not Found'
def get_customer_send_details_by_campaign_callback(request): params = parse_qs(urlparse(request.url).query) if params['CampaignID'][0] == '65874': if 'IncludeTemplateIDs' in params: if params['IncludeTemplateIDs'][0] == 'True': resp_body = [ {'CustomerID': '231342', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 10:30:00', 'SendID': 'HG65D', 'TemplateID': 12}, {'CustomerID': '917251', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 11:45:00', 'SendID': 'HG65E', 'TemplateID': 7} ] return 200, HEADERS['json'], json.dumps(resp_body) resp_body = [ {'CustomerID': '231342', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 10:30:00', 'SendID': 'HG65D'}, {'CustomerID': '917251', 'ChannelID': 4, 'ScheduledTime': '2015-12-30 11:45:00', 'SendID': 'HG65E'} ] return 200, HEADERS['json'], json.dumps(resp_body) else: return 404, HEADERS['text'], 'Not Found'
def get_executed_campaign_details_callback(request): params = parse_qs(urlparse(request.url).query) if params['Date'][0] == '2015-06-19': resp_body = [ {'CampaignID': 221, 'TargetGroupID': 15, 'CampaignType': 'Test/Control', 'Duration': 7, 'LeadTime': 3, 'Notes': '', 'IsMultiChannel': 'false', 'IsRecurrence': 'false', 'Status': 'Successful', 'Error': ''}, {'CampaignID': 81, 'TargetGroupID': 40, 'CampaignType': 'Test/Control', 'Duration': 10, 'LeadTime': 0, 'Notes': '', 'IsMultiChannel': 'true', 'IsRecurrence': 'true', 'Status': 'Failed', 'Error': 'ESP unavailable'} ] return 200, HEADERS['json'], json.dumps(resp_body) else: return 404, HEADERS['text'], 'Not Found'
def get_microsegment_changers_with_attributes_callback(request): params = parse_qs(urlparse(request.url).query) if params['StartDate'][0] == '2016-01-01' and params['EndDate'][0] == '2016-01-31'\ and params['CustomerAttributes'][0] == 'Alias;Country'\ and params['CustomerAttributesDelimiter'][0] == ',': resp_body = [ {'CustomerID': '231342', 'InitialMicrosegmentID': 4, 'FinalMicrosegmentID': 12, 'CustomerAttributes': 'BuddyZZ,UK'}, {'CustomerID': '231342', 'InitialMicrosegmentID': 3, 'FinalMicrosegmentID': 67, 'CustomerAttributes': 'Player99,US'} ] return 200, HEADERS['json'], json.dumps(resp_body) else: return 404, HEADERS['text'], 'Not Found'
def __init__(self, URL, assignment_id='', worker_id='', participant_id=''): logger.info("Creating bot with URL: %s." % URL) self.URL = URL parts = urlparse(URL) query = parse_qs(parts.query) if not assignment_id: assignment_id = query.get('assignment_id', [''])[0] if not participant_id: participant_id = query.get('participant_id', [''])[0] self.assignment_id = assignment_id if not worker_id: worker_id = query.get('worker_id', [''])[0] self.participant_id = participant_id self.worker_id = worker_id self.unique_id = worker_id + ':' + assignment_id
def parse_search_page(self, response): # handle current page for item in self.parse_tweets_block(response.body): yield item # get next page tmp = self.reScrollCursor.search(response.body) if tmp: query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0] scroll_cursor = tmp.group(1) url = 'https://twitter.com/i/search/timeline?q=%s&' \ 'include_available_features=1&include_entities=1&max_position=%s' % \ (urllib.quote_plus(query), scroll_cursor) yield http.Request(url, callback=self.parse_more_page) # TODO: # get refresh page # tmp = self.reRefreshCursor.search(response.body) # if tmp: # query = urlparse.parse_qs(urlparse.urlparse(response.request.url).query)['q'][0] # refresh_cursor=tmp.group(1)
def _parse_get(self, all_injectable = False): params_dict_list = urlparse.parse_qs(urlparse.urlsplit(self.url).query) for param, value_list in params_dict_list.items(): self.get_params[param] = value_list if self.tag in param: self.injs.append({ 'field' : 'GET', 'part' : 'param', 'param': param }) for idx, value in enumerate(value_list): if self.tag in value or all_injectable: self.injs.append({ 'field' : 'GET', 'part': 'value', 'param': param, 'value' : value, 'idx' : idx })
def extract_video_id(url): """ Extract the video id from a url, return video id as str. """ idregx = re.compile(r'[\w-]{11}$') url = str(url) if idregx.match(url): return url # ID of video if '://' not in url: url = '//' + url parsedurl = urlparse(url) if parsedurl.netloc in ('youtube.com', 'www.youtube.com', 'm.youtube.com', 'gaming.youtube.com'): query = parse_qs(parsedurl.query) if 'v' in query and idregx.match(query['v'][0]): return query['v'][0] elif parsedurl.netloc in ('youtu.be', 'www.youtu.be'): vidid = parsedurl.path.split('/')[-1] if parsedurl.path else '' if idregx.match(vidid): return vidid err = "Need 11 character video id or the URL of the video. Got %s" raise ValueError(err % url)
def parseqs(data): """ parse_qs, return unicode. """ if type(data) == uni: return parse_qs(data) elif pyver == 3: data = data.decode("utf8") data = parse_qs(data) else: data = parse_qs(data) out = {} for k, v in data.items(): k = k.decode("utf8") out[k] = [x.decode("utf8") for x in v] data = out return data
def extract_playlist_id(playlist_url): # Normal playlists start with PL, Mixes start with RD + first video ID, # Liked videos start with LL, Uploads start with UU, # Favorites lists start with FL idregx = re.compile(r'((?:RD|PL|LL|UU|FL)[-_0-9a-zA-Z]+)$') playlist_id = None if idregx.match(playlist_url): playlist_id = playlist_url # ID of video if '://' not in playlist_url: playlist_url = '//' + playlist_url parsedurl = urlparse(playlist_url) if parsedurl.netloc in ('youtube.com', 'www.youtube.com'): query = parse_qs(parsedurl.query) if 'list' in query and idregx.match(query['list'][0]): playlist_id = query['list'][0] return playlist_id
def do_POST(self): # http://stackoverflow.com/questions/4233218/python-basehttprequesthandler-post-variables ctype, pdict = cgi.parse_header(self.headers['content-type']) if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers['content-length']) postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1) else: postvars = {} # print(postvars) if 'Username' not in list(postvars.keys()) \ or 'Password' not in list(postvars.keys()): log('E', 'vali.', 'No credentials.') self.exit_on_error('No credentials.') return if not validate_id(postvars['Username'][0], postvars['Password'][0]): log('E', 'vali.', 'Wrong credentials.') self.exit_on_error('Wrong credentials.') return # print(postvars) try: dispatch(postvars) self.write_response({'Status': 'OK'}) except: log('E', 'hand.', 'Handler throws an exception.') self.exit_on_error('Handler throws and exception.')
def content(): parsed = urlparse.parse_qs(app.current_request.raw_body) return { 'states': parsed.get('states', []) }
def parse_qs(qs, keep_blank_values=0, strict_parsing=0): """Parse a query given as a string argument.""" warn("cgi.parse_qs is deprecated, use urlparse.parse_qs instead", PendingDeprecationWarning, 2) return urlparse.parse_qs(qs, keep_blank_values, strict_parsing)
def test_obtain_access_token(self, rmock): rmock.post(requests_mock.ANY, text='{"access_token": "ANY_TOKEN"}') cmock = Mock() cmock.username = "ANY_USERNAME" cmock.auth_host = "ANY_URL.example" result = obtain_access_token(cmock, 'ANY_PASSWORD') self.assertEqual('ANY_TOKEN', result) received_post_data = parse_qs(rmock.request_history[0].text) expected_post_data = {u'username': [u'ANY_USERNAME'], u'password': [u'ANY_PASSWORD'], u'client_id': [u'jumpauth'], u'grant_type': [u'password']} self.assertEqual(received_post_data, expected_post_data)
def parse(url): try: url = client.replaceHTMLCodes(url) except: pass try: url = urlparse.parse_qs(urlparse.urlparse(url).query)['u'][0] except: pass try: url = urlparse.parse_qs(urlparse.urlparse(url).query)['q'][0] except: pass return url
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) show = data['url'].split('/')[4] r = urlparse.urljoin(self.base_link, self.episode_link % (show, season, episode)) url = re.findall('(?://.+?|)(/.+)', r)[0] url = client.replaceHTMLCodes(url) url = url.encode('utf-8') return url except: pass
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: if url == None: return url = urlparse.parse_qs(url) url = dict([(i, url[i][0]) if url[i] else (i, '') for i in url]) url['title'], url['premiered'], url['season'], url['episode'] = title, premiered, season, episode url = urllib.urlencode(url) return url except: return
def episode(self, url, imdb, tvdb, title, premiered, season, episode): try: data = urlparse.parse_qs(url) data = dict([(i, data[i][0]) if data[i] else (i, '') for i in data]) return urllib.urlencode({'imdb': imdb, 'title': title, 'year': data['year'], 'season': season, 'episode': episode}) except: return