我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用urllib.parse.parse_qs()。
def process_POST_request(request): dict_ = urlparse.parse_qs(request.text) def htmlify(thing): try: html = dict_[thing][0] except KeyError as e: html = '' return '<html>' + html + '</html>' uri = dict_['uri'][0] head = htmlify('head') body = htmlify('body') try: text = dict_['data'][0] except KeyError as e: text = '' headsoup = BeautifulSoup(head, 'lxml') bodysoup = BeautifulSoup(body, 'lxml') target_uri = getUri(uri, headsoup, bodysoup) doi = getDoi(headsoup, bodysoup) return target_uri, doi, head, body, text
def test_move_plan(bot, testdata): with controlled_responses(testdata['requests'], server) as rsps: for x in range(0, len(testdata['post_result'])): rsps.add_post( 'http://host/build/admin/ajax/reorderBuild.action', 200, {'status': 'OK'}) msg = get_message() bot.move_plan(msg, 'BAMA-DEV') offset = len(testdata['requests']) for index, result in enumerate(testdata['post_result']): expDict = parse_qs( result, keep_blank_values=True, strict_parsing=True) resDict = parse_qs( rsps.calls[index + offset].request.body, keep_blank_values=True, strict_parsing=True) assert expDict == resDict [msg.send_webapi.assert_any_call(x, attachments=None, as_user=True) for x in testdata['result']]
def test_move_deployment(bot, testdata): with controlled_responses(testdata['requests'], server) as rsps: rsps.add_post( 'http://host/build/admin/ajax/reorderBuild.action', 200, {'status': 'OK'}) msg = get_message() bot.move_deployment(msg, '123') expDict = parse_qs( testdata['post_result'][0], keep_blank_values=True, strict_parsing=True) resDict = parse_qs( rsps.calls[1].request.body, keep_blank_values=True, strict_parsing=True) assert expDict == resDict [msg.send_webapi.assert_any_call(x, attachments=None, as_user=True) for x in testdata['result']]
def serial_class_for_url(url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'alt': raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,)) class_name = 'Serial' try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'class': class_name = values[0] else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e) return (''.join([parts.netloc, parts.path]), getattr(serial, class_name)) # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "socket": raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.socket') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) # get host and port host, port = parts.hostname, parts.port if not 0 <= port < 65536: raise ValueError("port not in range 0...65535") except ValueError as e: raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e) return (host, port) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "loop": raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.loop') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e) # - - - - - - - - - - - - - - - - - - - - - - - -
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != 'spy': raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,)) # process options now, directly altering self formatter = FormatHexdump color = False output = sys.stderr try: for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'file': output = open(values[0], 'w') elif option == 'color': color = True elif option == 'raw': formatter = FormatRaw elif option == 'all': self.show_all = True else: raise ValueError('unknown option: %r' % (option,)) except ValueError as e: raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e) self.formatter = formatter(output, color) return ''.join([parts.netloc, parts.path])
def query_params(self, value=None): """ Return or set a dictionary of query params :param dict value: new dictionary of values """ if value is not None: return URL._mutate(self, query=unicode_urlencode(value, doseq=True)) query = '' if self._tuple.query is None else self._tuple.query # In Python 2.6, urlparse needs a bytestring so we encode and then # decode the result. if not six.PY3: result = parse_qs(to_utf8(query), True) return dict_to_unicode(result) return parse_qs(query, True)
def test_auth_url(self): perms = ['email', 'birthday'] redirect_url = 'https://localhost/facebook/callback/' expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode( dict(client_id=self.app_id, redirect_uri=redirect_url, scope=','.join(perms))) actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms) # Since the order of the query string parameters might be # different in each URL, we cannot just compare them to each # other. expected_url_result = urlparse(expected_url) actual_url_result = urlparse(actual_url) expected_query = parse_qs(expected_url_result.query) actual_query = parse_qs(actual_url_result.query) self.assertEqual(actual_url_result.scheme, expected_url_result.scheme) self.assertEqual(actual_url_result.netloc, expected_url_result.netloc) self.assertEqual(actual_url_result.path, expected_url_result.path) self.assertEqual(actual_url_result.params, expected_url_result.params) self.assertEqual(actual_query, expected_query)
def headless_authorize(self): """ Authorize without a display using only TTY. """ url, _ = self.oauth.authorize_token_url(redirect_uri=self.redirect_uri) # Ask the user to open this url on a system with browser print('\n-------------------------------------------------------------------------') print('\t\tOpen the below URL in your browser\n') print(url) print('\n-------------------------------------------------------------------------\n') print('NOTE: After authenticating on Fitbit website, you will redirected to a URL which ') print('throws an ERROR. This is expected! Just copy the full redirected here.\n') redirected_url = input('Full redirected URL: ') params = urlparse.parse_qs(urlparse.urlparse(redirected_url).query) print(params['code'][0]) self.authenticate_code(code=params['code'][0])
def get_coupon_code(request): """ Get coupon code from an HttpRequest Args: request (django.http.request.HttpRequest): An HttpRequest Returns: str: A coupon code or None if none found """ next_url = request.GET.get("next") if not next_url: return None parsed = urlparse(next_url) path = parsed.path if path not in ("/dashboard", "/dashboard/"): return None coupons = parse_qs(parsed.query).get("coupon") if coupons: return coupons[0] return None
def url_to_path_and_args(url, no_query_string=False): if no_query_string: url = url.replace('?', '%3F').replace('#', '%23') components = urlsplit(url) path = components.path if no_query_string: path = unquote(path) # ??????? CEIBA ? %3F ?????????? # ??????? CEIBA ? %253F ?????????? # ?? ceiba_dl.Request ????????????????????? quote_test = path.replace('?', '').replace('#', '').replace(' ', '') if quote(quote_test) != quote_test: path = path.replace('?', '%3F').replace('#', '%23') args = {} else: query_string = components.query args = parse_qs(query_string, keep_blank_values=True) for key, value in args.items(): if isinstance(value, list): assert len(value) == 1 args[key] = value[0] return (path, args) # lxml ????????? None??????????
def extract_video_id(url): """ Extract the video id from a url, return video id as str. """ idregx = re.compile(r'[\w-]{11}$') url = str(url) if idregx.match(url): return url # ID of video if '://' not in url: url = '//' + url parsedurl = urlparse(url) if parsedurl.netloc in ('youtube.com', 'www.youtube.com', 'm.youtube.com', 'gaming.youtube.com'): query = parse_qs(parsedurl.query) if 'v' in query and idregx.match(query['v'][0]): return query['v'][0] elif parsedurl.netloc in ('youtu.be', 'www.youtu.be'): vidid = parsedurl.path.split('/')[-1] if parsedurl.path else '' if idregx.match(vidid): return vidid err = "Need 11 character video id or the URL of the video. Got %s" raise ValueError(err % url)
def parseqs(data): """ parse_qs, return unicode. """ if type(data) == uni: return parse_qs(data) elif pyver == 3: data = data.decode("utf8") data = parse_qs(data) else: data = parse_qs(data) out = {} for k, v in data.items(): k = k.decode("utf8") out[k] = [x.decode("utf8") for x in v] data = out return data
def extract_playlist_id(playlist_url): # Normal playlists start with PL, Mixes start with RD + first video ID, # Liked videos start with LL, Uploads start with UU, # Favorites lists start with FL idregx = re.compile(r'((?:RD|PL|LL|UU|FL)[-_0-9a-zA-Z]+)$') playlist_id = None if idregx.match(playlist_url): playlist_id = playlist_url # ID of video if '://' not in playlist_url: playlist_url = '//' + playlist_url parsedurl = urlparse(playlist_url) if parsedurl.netloc in ('youtube.com', 'www.youtube.com'): query = parse_qs(parsedurl.query) if 'list' in query and idregx.match(query['list'][0]): playlist_id = query['list'][0] return playlist_id
def do_GET(self): # /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e qs = urllib_parse.urlsplit(self.path).query url_vars = urllib_parse.parse_qs(qs) oauth_token = url_vars['oauth_token'][0] oauth_verifier = url_vars['oauth_verifier'][0] if six.PY2: self.server.oauth_token = oauth_token.decode('utf-8') self.server.oauth_verifier = oauth_verifier.decode('utf-8') else: self.server.oauth_token = oauth_token self.server.oauth_verifier = oauth_verifier assert (isinstance(self.server.oauth_token, six.string_types)) assert (isinstance(self.server.oauth_verifier, six.string_types)) self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() self.wfile.write(html.auth_okay_html)
def from_url(self, url): """extract host and port from an URL string""" parts = urlparse.urlsplit(url) if parts.scheme != "loop": raise SerialException( 'expected a string in the form ' '"loop://[?logging={debug|info|warning|error}]": not starting ' 'with loop:// ({!r})'.format(parts.scheme)) try: # process options now, directly altering self for option, values in urlparse.parse_qs(parts.query, True).items(): if option == 'logging': logging.basicConfig() # XXX is that good to call it here? self.logger = logging.getLogger('pySerial.loop') self.logger.setLevel(LOGGER_LEVELS[values[0]]) self.logger.debug('enabled logging') else: raise ValueError('unknown option: {!r}'.format(option)) except ValueError as e: raise SerialException( 'expected a string in the form ' '"loop://[?logging={debug|info|warning|error}]": {}'.format(e)) # - - - - - - - - - - - - - - - - - - - - - - - -
def form(self): if self.parsed_form is None: self.parsed_form = {} self.parsed_files = {} content_type, parameters = parse_header(self.headers.get('Content-Type')) try: if content_type is None or content_type == 'application/x-www-form-urlencoded': self.parsed_form = RequestParameters(parse_qs(self.body.decode('utf-8'))) elif content_type == 'multipart/form-data': # TODO: Stream this instead of reading to/from memory boundary = parameters['boundary'].encode('utf-8') self.parsed_form, self.parsed_files = parse_multipart_form(self.body, boundary) except Exception as e: log.exception(e) pass return self.parsed_form
def update_dict_id_field_from_response_field(cls, dict_, dict_id_field, response_obj, response_field, response_param): """ :type dict_: dict :type dict_id_field: str :type response_obj: dict :type response_field: str :type response_param: str """ url = response_obj[response_field] if url is not None: url_parsed = urlparse.urlparse(url) parameters = urlparse.parse_qs(url_parsed.query) dict_[dict_id_field] = int( parameters[response_param][cls._INDEX_FIRST] ) if cls._FIELD_COUNT in parameters and cls._FIELD_COUNT not in dict_: dict_[cls._FIELD_COUNT] = int( parameters[client.Pagination.PARAM_COUNT][cls._INDEX_FIRST] )
def parse_competition(self, response): start_url = 'http://www.soccerway.mobi/' links = response.xpath('//td[@class="score-time status"]//a/@href').extract() for l in links: #self.log('URL: {}'.format(start_url+l)) request = Request(url=start_url+l, callback=self.parse_match) request.meta['proxy'] = 'http://127.0.0.1:8118' yield request groups = response.xpath('//select[@name="group_id"]/option/@value').extract() for g in groups: request = Request(url=start_url+g, callback=self.parse_group) request.meta['proxy'] = 'http://127.0.0.1:8118' yield request competition_id = int(parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/@href').extract_first())['id'][0]) if competition_id in [308, 327, 366, 570]: rounds = response.xpath('//select[@name="round_id"]/option/@value').extract() for r in rounds: request = Request(url=start_url+r, callback=self.parse_round) request.meta['proxy'] = 'http://127.0.0.1:8118' yield request
def parse_match(self, response): item = MatchInfo() item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a/@href').extract()[3])['id'][0] item['area'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[1] item['competition'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[2] item['home_team'] = response.xpath('//div[@class="container left"]//a/text()').extract_first() item['away_team'] = response.xpath('//div[@class="container right"]//a/text()').extract_first() item['ht_last5'] = ''.join(response.xpath('//div[@class="container left"]//a/text()').extract()[1:6]) item['at_last5'] = ''.join(response.xpath('//div[@class="container right"]//a/text()').extract()[1:6]) item['datetime'] = datetime.fromtimestamp(int(response.xpath('//div[@class="details clearfix"]/dl/dt[.="Date"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Date"]//span/@data-value').extract_first()), timezone.utc).isoformat(' ') #item['competition'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Competition"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Competition"]/a/text()').extract_first() item['game_week'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Game week"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Game week"]/text()').extract_first() item['kick_off'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Kick-off"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Kick-off"]//span/text()').extract_first() item['venue'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Venue"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Venue"]//a/text()').extract_first() item['updated'] = datetime.utcnow().isoformat(' ') yield item return item #self.log('URL: {}'.format(response.url))
def parse(self, response): items = [] area_id = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/@href').extract_first())['area_id'][0] area_name = response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/text()').extract_first() links = response.xpath('//div[@class="block_competitions_list real-content clearfix "]//li/a') for l in links: item = Competition() item['id'] = parse_qs(l.xpath('./@href').extract_first())['id'][0] item['name'] = l.xpath('./text()').extract_first() item['area_id'] = area_id item['area_name'] = area_name item['updated'] = datetime.utcnow().isoformat(' ') yield item items.append(item) return items #self.log('URL: {}'.format(response.url))
def parse(self, response): item = Team() item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/@href').extract_first())['id'][0] if not item['id']: return None item['name'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/text()').extract_first() item['area_id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//a[1]/@href').extract()[1])['area_id'][0] item['area_name'] = response.xpath('//div[@class="clearfix subnav level-1"]//a[1]/text()').extract()[1] item['website'] = response.xpath('//p[@class="center website"]/a/@href').extract_first() item['address'] = ", ".join(list(map(str.strip, response.xpath('//div[@class="clearfix"]/dl/dt[.="Address"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Address"]/text()').extract()))) item['founded'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Founded"]/following-sibling::dd/text()').extract_first() item['country'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Country"]/following-sibling::dd/text()').extract_first() item['phone'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Phone"]/following-sibling::dd/text()').extract_first().strip() item['fax'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Fax"]/following-sibling::dd/text()').extract_first().strip() item['email'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="E-mail"]/following-sibling::dd/a/text()').extract_first() yield item return item #self.log('URL: {}'.format(response.url))
def testRequestToken(self): class MockResponse(object): def __init__(self, code): self.code = code.decode() def read(self): return ('{"refresh_token": "' + self.code + '456"}').encode() def mock_urlopen(unused_url, param): return MockResponse(urlparse.parse_qs(param)[b'code'][0]) # Choose urlopen function to mock based on Python version if sys.version_info[0] < 3: urlopen_lib = 'urllib2.urlopen' else: urlopen_lib = 'urllib.request.urlopen' with mock.patch(urlopen_lib, new=mock_urlopen): auth_code = '123' refresh_token = ee.oauth.request_token(auth_code) self.assertEqual('123456', refresh_token)
def get_target(self, target_plugin, path): """ Parses the path, extracts a token, and looks up a target for that token using the token plugin. Sets target_host and target_port if successful """ # The files in targets contain the lines # in the form of token: host:port # Extract the token parameter from url args = parse_qs(urlparse(path)[4]) # 4 is the query from url if not 'token' in args or not len(args['token']): raise self.server.EClose("Token not present") token = args['token'][0].rstrip('\n') result_pair = target_plugin.lookup(token) if result_pair is not None: return result_pair else: raise self.server.EClose("Token '%s' not found" % token)
def _extend_dataservice_class(self, dataservice_class): BaseDataserviceClass = super(MockDataserviceFunctionResourceProcessor, self)._extend_dataservice_class(dataservice_class) class ExtendedDataserviceClass(BaseDataserviceClass): @classmethod def _get_filename(cls, url, params): if url.startswith( "http://online.knesset.gov.il/WsinternetSps/KnessetDataService/CommitteeScheduleData.svc/CommitteeAgendaSearch?"): qs = parse_qs(urlparse(url).query) return "CommitteeScheduleDava.svc_CommitteeAgendaSearch_CommitteeId_{}_{}_{}".format( ",".join(qs["CommitteeId"]), ",".join(qs["FromDate"]), ",".join(qs["ToDate"])) @classmethod def _get_response_content(cls, url, params, timeout, proxies, retry_num=1): filename = cls._get_filename(url, params) return get_mock_response_content(filename, url, params, timeout, proxies, retry_num) return ExtendedDataserviceClass
def get_students(self): group = self.group group_name = group[:group.find('(')].strip() group_code = group[group.find('(')+1:group.find(')')] students = [] for row in self.table.children: if type(row) == NavigableString: continue active = True link = row.find(class_='fio_3').parent if link.has_attr('style') and link['style'] == 'color:gray;': #????? ?????? - ??????? ???????? active = False student_id = parse_qs(urlparse(link['href']).query)['sid'][0] name = row.find(class_='fio_3').string.strip() record_book_id = row.find(class_='hc3').string.strip() name = " ".join(name.split()) record_book_id = " ".join(record_book_id.split()) students.append({'name': name, 'id': student_id, 'record_book': record_book_id, 'active': int(active)}) return {'group': group_name, 'code': group_code, 'students': students, 'id': self.group_id}
def test_api_get_paginated(self): def callback(request): params = parse_qs(urlparse(request.url).query) limit = params["limit"][0] if "limit" in params else 1 offset = params["offset"][0] if "offset" in params else 0 resp = """{{"stat": "ok", "offset": "{offset}", "limit": "{limit}", "total": "10","fake":["fakedata{offset}"]}}""".format( offset=offset, limit=limit) return (200, {}, resp) responses.add_callback(responses.GET, "https://fake/getFake", callback=callback) config = urconf.UptimeRobot("", url="https://fake/") result = config._api_get_paginated("getFake", {}, lambda x: x["fake"]) assert len(responses.calls) == 10 for i in (range(10)): assert "fakedata{}".format(i) in result
def filter_result(link): try: # Valid results are absolute URLs not pointing to a Google domain # like images.google.com or googleusercontent.com o = urlparse(link, 'http') if o.netloc and 'google' not in o.netloc: return link # Decode hidden URLs. if link.startswith('/url?'): link = parse_qs(o.query)['q'][0] # Valid results are absolute URLs not pointing to a Google domain o = urlparse(link, 'http') if o.netloc and 'google' not in o.netloc: return link except Exception: pass return None
def append_query_params(self, url, **kwargs): uri = urlsplit(url) query = parse_qs(uri.query) for key in kwargs: if key in query: query[key].append(kwargs[key]) else: query[key] = kwargs[key] query_string = urlencode(query, doseq=True) uri_new = uri._replace(query=query_string) return urlunsplit(uri_new)
def test_get_without_verify_token(self): session = self.client.session session[SESSKEY_OAUTH_NEXT_URI] = "/test?type=set" session.save() getData = { "code": "code_will_not_be_checked_anyway" } res = self.client.get(reverse("instagram:connect"), getData) self.assertEqual(302, res.status_code) redirect_uri = urlsplit(res['Location']) self.assertEqual("/test", redirect_uri.path) self.assertDictEqual({"status": ["error"], "type": ["set", "api"], "detail": ["verify_token_not_set"]}, parse_qs(redirect_uri.query))
def test_get_with_no_verify_token_in_session(self): session = self.client.session session[SESSKEY_OAUTH_NEXT_URI] = "/test" session.save() getData = { "code": "code_will_not_be_checked_anyway", "verify_token": "token_will_not_be_checked_anyway" } res = self.client.get(reverse("instagram:connect"), getData) self.assertEqual(302, res.status_code) redirect_uri = urlsplit(res['Location']) self.assertEqual("/test", redirect_uri.path) self.assertDictEqual({"status": ["error"], "type": ["internal"], "detail": ["no_verify_token_in_session"]}, parse_qs(redirect_uri.query))
def test_get_with_valid_code_and_invalid_verify_token(self): sessionValue = "correctvalue" session = self.client.session session[SESSKEY_OAUTH_NEXT_URI] = "/test" session[SESSKEY_OAUTH_VERIFY_TOKEN] = sessionValue session.save() getData = { "verify_token": "someothervaluethaninthesession", "code": "code_will_not_be_checked_anyway" } res = self.client.get(reverse("instagram:connect"), getData) self.assertEqual(302, res.status_code) redirect_uri = urlsplit(res['Location']) self.assertEqual("/test", redirect_uri.path) self.assertDictEqual({"status": ["error"], "type": ["api"], "detail": ["invalid_verify_token"]}, parse_qs(redirect_uri.query))
def change_email(request): """ ?????? :param request: :return: : -1 ??session?? ?????? : 0 ?????? : 1 ?????? """ user = request['session'].get('user', None) data = parse_qs(str(request.body, encoding='utf-8')) if user: try: email = data.get('email', None)[0] motor_db = motor_base.get_db() await motor_db.user.update_one({'user': user}, {'$set': {'email': email}}) LOGGER.info('??????') return json({'status': 1}) except Exception as e: LOGGER.exception(e) return json({'status': 0}) else: return json({'status': -1})
def test_obtain_access_token(self, rmock): rmock.post(requests_mock.ANY, text='{"access_token": "ANY_TOKEN"}') cmock = Mock() cmock.username = "ANY_USERNAME" cmock.auth_host = "ANY_URL.example" result = obtain_access_token(cmock, 'ANY_PASSWORD') self.assertEqual('ANY_TOKEN', result) received_post_data = parse_qs(rmock.request_history[0].text) expected_post_data = {u'username': [u'ANY_USERNAME'], u'password': [u'ANY_PASSWORD'], u'client_id': [u'jumpauth'], u'grant_type': [u'password']} self.assertEqual(received_post_data, expected_post_data)
def data_factory(app, handler): async def parse_data(request): logging.info('data_factory...') if request.method in ('POST', 'PUT'): if not request.content_type: return web.HTTPBadRequest(text='Missing Content-Type.') content_type = request.content_type.lower() if content_type.startswith('application/json'): request.__data__ = await request.json() if not isinstance(request.__data__, dict): return web.HTTPBadRequest(text='JSON body must be object.') logging.info('request json: %s' % request.__data__) elif content_type.startswith(('application/x-www-form-urlencoded', 'multipart/form-data')): params = await request.post() request.__data__ = dict(**params) logging.info('request form: %s' % request.__data__) else: return web.HTTPBadRequest(text='Unsupported Content-Type: %s' % content_type) elif request.method == 'GET': qs = request.query_string request.__data__ = {k: v[0] for k, v in parse.parse_qs(qs, True).items()} logging.info('request query: %s' % request.__data__) else: request.__data__ = dict() return await handler(request) return parse_data # ??????????????????Response??
def parse_body(self): if self.headers.get('Content-Type', '').startswith('application/json'): return json.loads(self.body) else: return MultiValueDict(parse_qs(self.body))
def do_GET(self): """ Process GET requests. This method will process the requests in this form: `/command?parameter1=value1&...` It will use `in_message` parameter as the input message and reply with a JSON containing `out_message` propery: `{"out_message": "hello"}` """ try: function, params = self.path.split("?") function, params = function[1:], parse_qs(params) self.send_response(200) self.end_headers() output_text = self.server.bot.process( "".join(params["in_message"]) ) output = { "out_message": output_text, "out_message_html": output_text.replace( '&', '&').replace( '<', '<').replace( '>', '>').replace( '\n', '<br />') } self.wfile.write(json.dumps(output).encode("UTF-8")) except ValueError: # if no command is specified, serve the default html filename = os.path.join( os.path.dirname(__file__), 'http', 'index.html' ) with open(filename, 'r') as template_file: self.send_response(200) self.end_headers() output = template_file.read() self.wfile.write(output.encode("UTF-8"))