Python urllib.parse 模块,parse_qs() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用urllib.parse.parse_qs()

项目:scibot    作者:SciCrunch    | 项目源码 | 文件源码
def process_POST_request(request):
    dict_ = urlparse.parse_qs(request.text)
    def htmlify(thing):
        try:
            html = dict_[thing][0]
        except KeyError as e:
            html = ''
        return '<html>' + html + '</html>'
    uri = dict_['uri'][0]
    head = htmlify('head')
    body = htmlify('body')
    try:
        text = dict_['data'][0]
    except KeyError as e:
        text = ''

    headsoup = BeautifulSoup(head, 'lxml')
    bodysoup = BeautifulSoup(body, 'lxml')

    target_uri = getUri(uri, headsoup, bodysoup)
    doi = getDoi(headsoup, bodysoup)
    return target_uri, doi, head, body, text
项目:AtlassianBot    作者:gpailler    | 项目源码 | 文件源码
def test_move_plan(bot, testdata):
    with controlled_responses(testdata['requests'], server) as rsps:
        for x in range(0, len(testdata['post_result'])):
            rsps.add_post(
                'http://host/build/admin/ajax/reorderBuild.action',
                200,
                {'status': 'OK'})

        msg = get_message()
        bot.move_plan(msg, 'BAMA-DEV')

        offset = len(testdata['requests'])
        for index, result in enumerate(testdata['post_result']):
            expDict = parse_qs(
                               result,
                               keep_blank_values=True,
                               strict_parsing=True)
            resDict = parse_qs(
                               rsps.calls[index + offset].request.body,
                               keep_blank_values=True,
                               strict_parsing=True)
            assert expDict == resDict

        [msg.send_webapi.assert_any_call(x, attachments=None, as_user=True) for x in testdata['result']]
项目:AtlassianBot    作者:gpailler    | 项目源码 | 文件源码
def test_move_deployment(bot, testdata):
    with controlled_responses(testdata['requests'], server) as rsps:
        rsps.add_post(
            'http://host/build/admin/ajax/reorderBuild.action',
            200,
            {'status': 'OK'})

        msg = get_message()
        bot.move_deployment(msg, '123')

        expDict = parse_qs(
            testdata['post_result'][0],
            keep_blank_values=True,
            strict_parsing=True)
        resDict = parse_qs(
            rsps.calls[1].request.body,
            keep_blank_values=True,
            strict_parsing=True)
        assert expDict == resDict

        [msg.send_webapi.assert_any_call(x, attachments=None, as_user=True) for x in testdata['result']]
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:android3dblendermouse    作者:sketchpunk    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def query_params(self, value=None):
        """
        Return or set a dictionary of query params

        :param dict value: new dictionary of values
        """
        if value is not None:
            return URL._mutate(self, query=unicode_urlencode(value, doseq=True))
        query = '' if self._tuple.query is None else self._tuple.query

        # In Python 2.6, urlparse needs a bytestring so we encode and then
        # decode the result.
        if not six.PY3:
            result = parse_qs(to_utf8(query), True)
            return dict_to_unicode(result)

        return parse_qs(query, True)
项目:PythonStudyCode    作者:TongTongX    | 项目源码 | 文件源码
def test_auth_url(self):
        perms = ['email', 'birthday']
        redirect_url = 'https://localhost/facebook/callback/'

        expected_url = 'https://www.facebook.com/dialog/oauth?' + urlencode(
            dict(client_id=self.app_id,
                 redirect_uri=redirect_url,
                 scope=','.join(perms)))
        actual_url = facebook.auth_url(self.app_id, redirect_url, perms=perms)

        # Since the order of the query string parameters might be
        # different in each URL, we cannot just compare them to each
        # other.
        expected_url_result = urlparse(expected_url)
        actual_url_result = urlparse(actual_url)
        expected_query = parse_qs(expected_url_result.query)
        actual_query = parse_qs(actual_url_result.query)

        self.assertEqual(actual_url_result.scheme, expected_url_result.scheme)
        self.assertEqual(actual_url_result.netloc, expected_url_result.netloc)
        self.assertEqual(actual_url_result.path, expected_url_result.path)
        self.assertEqual(actual_url_result.params, expected_url_result.params)
        self.assertEqual(actual_query, expected_query)
项目:fitbit-googlefit    作者:praveendath92    | 项目源码 | 文件源码
def headless_authorize(self):
        """
        Authorize without a display using only TTY.
        """
        url, _ = self.oauth.authorize_token_url(redirect_uri=self.redirect_uri)
        # Ask the user to open this url on a system with browser
        print('\n-------------------------------------------------------------------------')
        print('\t\tOpen the below URL in your browser\n')
        print(url)
        print('\n-------------------------------------------------------------------------\n')
        print('NOTE: After authenticating on Fitbit website, you will redirected to a URL which ')
        print('throws an ERROR. This is expected! Just copy the full redirected here.\n')
        redirected_url = input('Full redirected URL: ')
        params = urlparse.parse_qs(urlparse.urlparse(redirected_url).query)
        print(params['code'][0])
        self.authenticate_code(code=params['code'][0])
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microperi    作者:c0d3st0rm    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:micromasters    作者:mitodl    | 项目源码 | 文件源码
def get_coupon_code(request):
    """
    Get coupon code from an HttpRequest

    Args:
        request (django.http.request.HttpRequest): An HttpRequest

    Returns:
        str: A coupon code or None if none found
    """
    next_url = request.GET.get("next")
    if not next_url:
        return None
    parsed = urlparse(next_url)
    path = parsed.path
    if path not in ("/dashboard", "/dashboard/"):
        return None
    coupons = parse_qs(parsed.query).get("coupon")
    if coupons:
        return coupons[0]
    return None
项目:ceiba-dl    作者:lantw44    | 项目源码 | 文件源码
def url_to_path_and_args(url, no_query_string=False):
    if no_query_string:
        url = url.replace('?', '%3F').replace('#', '%23')
    components = urlsplit(url)
    path = components.path
    if no_query_string:
        path = unquote(path)
        # ??????? CEIBA ? %3F ??????????
        # ??????? CEIBA ? %253F ??????????
        # ?? ceiba_dl.Request ?????????????????????
        quote_test = path.replace('?', '').replace('#', '').replace(' ', '')
        if quote(quote_test) != quote_test:
            path = path.replace('?', '%3F').replace('#', '%23')
        args = {}
    else:
        query_string = components.query
        args = parse_qs(query_string, keep_blank_values=True)
        for key, value in args.items():
            if isinstance(value, list):
                assert len(value) == 1
                args[key] = value[0]
    return (path, args)

# lxml ????????? None??????????
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def extract_video_id(url):
    """ Extract the video id from a url, return video id as str. """
    idregx = re.compile(r'[\w-]{11}$')
    url = str(url)

    if idregx.match(url):
        return url # ID of video

    if '://' not in url:
        url = '//' + url
    parsedurl = urlparse(url)
    if parsedurl.netloc in ('youtube.com', 'www.youtube.com', 'm.youtube.com', 'gaming.youtube.com'):
        query = parse_qs(parsedurl.query)
        if 'v' in query and idregx.match(query['v'][0]):
            return query['v'][0]
    elif parsedurl.netloc in ('youtu.be', 'www.youtu.be'):
        vidid = parsedurl.path.split('/')[-1] if parsedurl.path else ''
        if idregx.match(vidid):
            return vidid

    err = "Need 11 character video id or the URL of the video. Got %s"
    raise ValueError(err % url)
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def parseqs(data):
    """ parse_qs, return unicode. """
    if type(data) == uni:
        return parse_qs(data)

    elif pyver == 3:
        data = data.decode("utf8")
        data = parse_qs(data)

    else:
        data = parse_qs(data)
        out = {}

        for k, v in data.items():
            k = k.decode("utf8")
            out[k] = [x.decode("utf8") for x in v]
            data = out

    return data
项目:youtube_downloader    作者:aksinghdce    | 项目源码 | 文件源码
def extract_playlist_id(playlist_url):
    # Normal playlists start with PL, Mixes start with RD + first video ID,
    # Liked videos start with LL, Uploads start with UU,
    # Favorites lists start with FL
    idregx = re.compile(r'((?:RD|PL|LL|UU|FL)[-_0-9a-zA-Z]+)$')

    playlist_id = None
    if idregx.match(playlist_url):
        playlist_id = playlist_url # ID of video

    if '://' not in playlist_url:
        playlist_url = '//' + playlist_url
    parsedurl = urlparse(playlist_url)
    if parsedurl.netloc in ('youtube.com', 'www.youtube.com'):
        query = parse_qs(parsedurl.query)
        if 'list' in query and idregx.match(query['list'][0]):
            playlist_id = query['list'][0]

    return playlist_id
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def do_GET(self):
        # /?oauth_token=72157630789362986-5405f8542b549e95&oauth_verifier=fe4eac402339100e

        qs = urllib_parse.urlsplit(self.path).query
        url_vars = urllib_parse.parse_qs(qs)

        oauth_token = url_vars['oauth_token'][0]
        oauth_verifier = url_vars['oauth_verifier'][0]

        if six.PY2:
            self.server.oauth_token = oauth_token.decode('utf-8')
            self.server.oauth_verifier = oauth_verifier.decode('utf-8')
        else:
            self.server.oauth_token = oauth_token
            self.server.oauth_verifier = oauth_verifier

        assert (isinstance(self.server.oauth_token, six.string_types))
        assert (isinstance(self.server.oauth_verifier, six.string_types))

        self.send_response(200)
        self.send_header('Content-type', 'text/html')
        self.end_headers()

        self.wfile.write(html.auth_okay_html)
项目:gcodeplot    作者:arpruss    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:sanic-python-web-server    作者:kimjoseph95    | 项目源码 | 文件源码
def form(self):
        if self.parsed_form is None:
            self.parsed_form = {}
            self.parsed_files = {}
            content_type, parameters = parse_header(self.headers.get('Content-Type'))
            try:
                if content_type is None or content_type == 'application/x-www-form-urlencoded':
                    self.parsed_form = RequestParameters(parse_qs(self.body.decode('utf-8')))
                elif content_type == 'multipart/form-data':
                    # TODO: Stream this instead of reading to/from memory
                    boundary = parameters['boundary'].encode('utf-8')
                    self.parsed_form, self.parsed_files = parse_multipart_form(self.body, boundary)
            except Exception as e:
                log.exception(e)
                pass

        return self.parsed_form
项目:sdk_python    作者:bunq    | 项目源码 | 文件源码
def update_dict_id_field_from_response_field(cls, dict_, dict_id_field,
                                                 response_obj, response_field,
                                                 response_param):
        """
        :type dict_: dict
        :type dict_id_field: str
        :type response_obj: dict
        :type response_field: str
        :type response_param: str
        """

        url = response_obj[response_field]

        if url is not None:
            url_parsed = urlparse.urlparse(url)
            parameters = urlparse.parse_qs(url_parsed.query)
            dict_[dict_id_field] = int(
                parameters[response_param][cls._INDEX_FIRST]
            )

            if cls._FIELD_COUNT in parameters and cls._FIELD_COUNT not in dict_:
                dict_[cls._FIELD_COUNT] = int(
                    parameters[client.Pagination.PARAM_COUNT][cls._INDEX_FIRST]
                )
项目:bitio    作者:whaleygeek    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": not starting '
                'with loop:// ({!r})'.format(parts.scheme))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: {!r}'.format(option))
        except ValueError as e:
            raise SerialException(
                'expected a string in the form '
                '"loop://[?logging={debug|info|warning|error}]": {}'.format(e))

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:scrapy-soccerway    作者:tvl    | 项目源码 | 文件源码
def parse_competition(self, response):
        start_url = 'http://www.soccerway.mobi/'
        links = response.xpath('//td[@class="score-time status"]//a/@href').extract()
        for l in links:
            #self.log('URL: {}'.format(start_url+l))
            request = Request(url=start_url+l, callback=self.parse_match)
            request.meta['proxy'] = 'http://127.0.0.1:8118'
            yield request
        groups = response.xpath('//select[@name="group_id"]/option/@value').extract()
        for g in groups:
            request = Request(url=start_url+g, callback=self.parse_group)
            request.meta['proxy'] = 'http://127.0.0.1:8118'
            yield request
        competition_id = int(parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/@href').extract_first())['id'][0])
        if competition_id in [308, 327, 366, 570]:
            rounds = response.xpath('//select[@name="round_id"]/option/@value').extract()
            for r in rounds:
                request = Request(url=start_url+r, callback=self.parse_round)
                request.meta['proxy'] = 'http://127.0.0.1:8118'
                yield request
项目:scrapy-soccerway    作者:tvl    | 项目源码 | 文件源码
def parse_match(self, response):
        item = MatchInfo()
        item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a/@href').extract()[3])['id'][0]
        item['area'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[1]
        item['competition'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[2]
        item['home_team'] = response.xpath('//div[@class="container left"]//a/text()').extract_first()
        item['away_team'] = response.xpath('//div[@class="container right"]//a/text()').extract_first()
        item['ht_last5'] = ''.join(response.xpath('//div[@class="container left"]//a/text()').extract()[1:6])
        item['at_last5'] = ''.join(response.xpath('//div[@class="container right"]//a/text()').extract()[1:6])
        item['datetime'] = datetime.fromtimestamp(int(response.xpath('//div[@class="details clearfix"]/dl/dt[.="Date"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Date"]//span/@data-value').extract_first()), timezone.utc).isoformat(' ')
        #item['competition'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Competition"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Competition"]/a/text()').extract_first()
        item['game_week'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Game week"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Game week"]/text()').extract_first()
        item['kick_off'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Kick-off"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Kick-off"]//span/text()').extract_first()
        item['venue'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Venue"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Venue"]//a/text()').extract_first()
        item['updated'] = datetime.utcnow().isoformat(' ')
        yield item
        return item
        #self.log('URL: {}'.format(response.url))
项目:scrapy-soccerway    作者:tvl    | 项目源码 | 文件源码
def parse(self, response):
        items = []
        area_id = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/@href').extract_first())['area_id'][0]
        area_name = response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/text()').extract_first()
        links = response.xpath('//div[@class="block_competitions_list real-content clearfix "]//li/a')
        for l in links:
            item = Competition()
            item['id'] = parse_qs(l.xpath('./@href').extract_first())['id'][0]
            item['name'] = l.xpath('./text()').extract_first()
            item['area_id'] = area_id
            item['area_name'] = area_name
            item['updated'] = datetime.utcnow().isoformat(' ')
            yield item
            items.append(item)
        return items
        #self.log('URL: {}'.format(response.url))
项目:scrapy-soccerway    作者:tvl    | 项目源码 | 文件源码
def parse(self, response):
        item = Team()
        item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/@href').extract_first())['id'][0]
        if not item['id']:
            return None
        item['name'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a[2]/text()').extract_first()
        item['area_id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//a[1]/@href').extract()[1])['area_id'][0]
        item['area_name'] = response.xpath('//div[@class="clearfix subnav level-1"]//a[1]/text()').extract()[1]
        item['website'] = response.xpath('//p[@class="center website"]/a/@href').extract_first()
        item['address'] = ", ".join(list(map(str.strip, response.xpath('//div[@class="clearfix"]/dl/dt[.="Address"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Address"]/text()').extract())))
        item['founded'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Founded"]/following-sibling::dd/text()').extract_first()
        item['country'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Country"]/following-sibling::dd/text()').extract_first()
        item['phone'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Phone"]/following-sibling::dd/text()').extract_first().strip()
        item['fax'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="Fax"]/following-sibling::dd/text()').extract_first().strip()
        item['email'] = response.xpath('//div[@class="clearfix"]/dl/dt[.="E-mail"]/following-sibling::dd/a/text()').extract_first()
        yield item
        return item
        #self.log('URL: {}'.format(response.url))
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def testRequestToken(self):

    class MockResponse(object):

      def __init__(self, code):
        self.code = code.decode()

      def read(self):
        return ('{"refresh_token": "' + self.code + '456"}').encode()

    def mock_urlopen(unused_url, param):
      return MockResponse(urlparse.parse_qs(param)[b'code'][0])

    # Choose urlopen function to mock based on Python version
    if sys.version_info[0] < 3:
      urlopen_lib = 'urllib2.urlopen'
    else:
      urlopen_lib = 'urllib.request.urlopen'

    with mock.patch(urlopen_lib, new=mock_urlopen):
      auth_code = '123'
      refresh_token = ee.oauth.request_token(auth_code)
      self.assertEqual('123456', refresh_token)
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def serial_class_for_url(url):
    """extract host and port from an URL string"""
    parts = urlparse.urlsplit(url)
    if parts.scheme != 'alt':
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": not starting with alt:// (%r)' % (parts.scheme,))
    class_name = 'Serial'
    try:
        for option, values in urlparse.parse_qs(parts.query, True).items():
            if option == 'class':
                class_name = values[0]
            else:
                raise ValueError('unknown option: %r' % (option,))
    except ValueError as e:
        raise serial.SerialException('expected a string in the form "alt://port[?option[=value][&option[=value]]]": %s' % e)
    return (''.join([parts.netloc, parts.path]), getattr(serial, class_name))

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "socket":
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": not starting with socket:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.socket')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
            # get host and port
            host, port = parts.hostname, parts.port
            if not 0 <= port < 65536:
                raise ValueError("port not in range 0...65535")
        except ValueError as e:
            raise SerialException('expected a string in the form "socket://<host>:<port>[?logging={debug|info|warning|error}]": %s' % e)
        return (host, port)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != "loop":
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": not starting with loop:// (%r)' % (parts.scheme,))
        try:
            # process options now, directly altering self
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'logging':
                    logging.basicConfig()   # XXX is that good to call it here?
                    self.logger = logging.getLogger('pySerial.loop')
                    self.logger.setLevel(LOGGER_LEVELS[values[0]])
                    self.logger.debug('enabled logging')
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise SerialException('expected a string in the form "loop://[?logging={debug|info|warning|error}]": %s' % e)

    #  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -  -
项目:microbit-serial    作者:martinohanlon    | 项目源码 | 文件源码
def from_url(self, url):
        """extract host and port from an URL string"""
        parts = urlparse.urlsplit(url)
        if parts.scheme != 'spy':
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": not starting with spy:// (%r)' % (parts.scheme,))
        # process options now, directly altering self
        formatter = FormatHexdump
        color = False
        output = sys.stderr
        try:
            for option, values in urlparse.parse_qs(parts.query, True).items():
                if option == 'file':
                    output = open(values[0], 'w')
                elif option == 'color':
                    color = True
                elif option == 'raw':
                    formatter = FormatRaw
                elif option == 'all':
                    self.show_all = True
                else:
                    raise ValueError('unknown option: %r' % (option,))
        except ValueError as e:
            raise serial.SerialException('expected a string in the form "spy://port[?option[=value][&option[=value]]]": %s' % e)
        self.formatter = formatter(output, color)
        return ''.join([parts.netloc, parts.path])
项目:realtimedisplay    作者:SuperDARNCanada    | 项目源码 | 文件源码
def get_target(self, target_plugin, path):
        """
        Parses the path, extracts a token, and looks up a target
        for that token using the token plugin. Sets
        target_host and target_port if successful
        """
        # The files in targets contain the lines
        # in the form of token: host:port

        # Extract the token parameter from url
        args = parse_qs(urlparse(path)[4]) # 4 is the query from url

        if not 'token' in args or not len(args['token']):
            raise self.server.EClose("Token not present")

        token = args['token'][0].rstrip('\n')

        result_pair = target_plugin.lookup(token)

        if result_pair is not None:
            return result_pair
        else:
            raise self.server.EClose("Token '%s' not found" % token)
项目:knesset-data-pipelines    作者:hasadna    | 项目源码 | 文件源码
def _extend_dataservice_class(self, dataservice_class):
        BaseDataserviceClass = super(MockDataserviceFunctionResourceProcessor, self)._extend_dataservice_class(dataservice_class)
        class ExtendedDataserviceClass(BaseDataserviceClass):

            @classmethod
            def _get_filename(cls, url, params):
                if url.startswith(
                        "http://online.knesset.gov.il/WsinternetSps/KnessetDataService/CommitteeScheduleData.svc/CommitteeAgendaSearch?"):
                    qs = parse_qs(urlparse(url).query)
                    return "CommitteeScheduleDava.svc_CommitteeAgendaSearch_CommitteeId_{}_{}_{}".format(
                        ",".join(qs["CommitteeId"]),
                        ",".join(qs["FromDate"]),
                        ",".join(qs["ToDate"]))

            @classmethod
            def _get_response_content(cls, url, params, timeout, proxies, retry_num=1):
                filename = cls._get_filename(url, params)
                return get_mock_response_content(filename, url, params, timeout, proxies, retry_num)

        return ExtendedDataserviceClass
项目:MSTU_scraper    作者:rrr3371    | 项目源码 | 文件源码
def get_students(self):
        group = self.group
        group_name = group[:group.find('(')].strip()
        group_code = group[group.find('(')+1:group.find(')')]

        students = []
        for row in self.table.children:
            if type(row) == NavigableString:
                continue

            active = True
            link = row.find(class_='fio_3').parent
            if link.has_attr('style') and link['style'] == 'color:gray;': #????? ?????? - ??????? ????????
                active = False
            student_id = parse_qs(urlparse(link['href']).query)['sid'][0]

            name = row.find(class_='fio_3').string.strip()
            record_book_id = row.find(class_='hc3').string.strip()

            name = " ".join(name.split())
            record_book_id  = " ".join(record_book_id.split())
            students.append({'name': name, 'id': student_id, 'record_book': record_book_id, 'active': int(active)})

        return {'group': group_name, 'code': group_code, 'students': students, 'id': self.group_id}
项目:urconf    作者:knyar    | 项目源码 | 文件源码
def test_api_get_paginated(self):
        def callback(request):
            params = parse_qs(urlparse(request.url).query)
            limit = params["limit"][0] if "limit" in params else 1
            offset = params["offset"][0] if "offset" in params else 0
            resp = """{{"stat": "ok", "offset": "{offset}", "limit": "{limit}",
                        "total": "10","fake":["fakedata{offset}"]}}""".format(
                offset=offset, limit=limit)
            return (200, {}, resp)
        responses.add_callback(responses.GET, "https://fake/getFake",
                               callback=callback)

        config = urconf.UptimeRobot("", url="https://fake/")
        result = config._api_get_paginated("getFake", {}, lambda x: x["fake"])

        assert len(responses.calls) == 10
        for i in (range(10)):
            assert "fakedata{}".format(i) in result
项目:goldmine    作者:Armored-Dragon    | 项目源码 | 文件源码
def filter_result(link):
    try:
        # Valid results are absolute URLs not pointing to a Google domain
        # like images.google.com or googleusercontent.com
        o = urlparse(link, 'http')
        if o.netloc and 'google' not in o.netloc:
            return link
        # Decode hidden URLs.
        if link.startswith('/url?'):
            link = parse_qs(o.query)['q'][0]
            # Valid results are absolute URLs not pointing to a Google domain
            o = urlparse(link, 'http')
            if o.netloc and 'google' not in o.netloc:
                return link
    except Exception:
        pass
    return None
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def append_query_params(self, url, **kwargs):

        uri = urlsplit(url)
        query = parse_qs(uri.query)

        for key in kwargs:
            if key in query:
                query[key].append(kwargs[key])
            else:
                query[key] = kwargs[key]

        query_string = urlencode(query, doseq=True)

        uri_new = uri._replace(query=query_string)

        return urlunsplit(uri_new)
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def test_get_without_verify_token(self):

        session = self.client.session
        session[SESSKEY_OAUTH_NEXT_URI] = "/test?type=set"
        session.save()

        getData = {
            "code": "code_will_not_be_checked_anyway"
        }

        res = self.client.get(reverse("instagram:connect"), getData)

        self.assertEqual(302, res.status_code)

        redirect_uri = urlsplit(res['Location'])
        self.assertEqual("/test", redirect_uri.path)
        self.assertDictEqual({"status": ["error"],
                              "type": ["set", "api"],
                              "detail": ["verify_token_not_set"]},
                             parse_qs(redirect_uri.query))
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def test_get_with_no_verify_token_in_session(self):

        session = self.client.session
        session[SESSKEY_OAUTH_NEXT_URI] = "/test"
        session.save()

        getData = {
            "code": "code_will_not_be_checked_anyway",
            "verify_token": "token_will_not_be_checked_anyway"
        }

        res = self.client.get(reverse("instagram:connect"), getData)

        self.assertEqual(302, res.status_code)

        redirect_uri = urlsplit(res['Location'])
        self.assertEqual("/test", redirect_uri.path)
        self.assertDictEqual({"status": ["error"],
                              "type": ["internal"],
                              "detail": ["no_verify_token_in_session"]},
                             parse_qs(redirect_uri.query))
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def test_get_with_valid_code_and_invalid_verify_token(self):

        sessionValue = "correctvalue"
        session = self.client.session
        session[SESSKEY_OAUTH_NEXT_URI] = "/test"
        session[SESSKEY_OAUTH_VERIFY_TOKEN] = sessionValue
        session.save()

        getData = {
            "verify_token": "someothervaluethaninthesession",
            "code": "code_will_not_be_checked_anyway"
        }

        res = self.client.get(reverse("instagram:connect"), getData)

        self.assertEqual(302, res.status_code)
        redirect_uri = urlsplit(res['Location'])
        self.assertEqual("/test", redirect_uri.path)
        self.assertDictEqual({"status": ["error"],
                              "type": ["api"],
                              "detail": ["invalid_verify_token"]},
                             parse_qs(redirect_uri.query))
项目:daisychain    作者:daisychainme    | 项目源码 | 文件源码
def append_query_params(self, url, **kwargs):

        uri = urlsplit(url)
        query = parse_qs(uri.query)

        for key in kwargs:
            if key in query:
                query[key].append(kwargs[key])
            else:
                query[key] = kwargs[key]

        query_string = urlencode(query, doseq=True)

        uri_new = uri._replace(query=query_string)

        return urlunsplit(uri_new)
项目:owllook    作者:howie6879    | 项目源码 | 文件源码
def change_email(request):
    """
    ??????
    :param request:
    :return:
        :   -1  ??session??  ??????
        :   0   ??????
        :   1   ??????
    """
    user = request['session'].get('user', None)
    data = parse_qs(str(request.body, encoding='utf-8'))
    if user:
        try:
            email = data.get('email', None)[0]
            motor_db = motor_base.get_db()
            await motor_db.user.update_one({'user': user},
                                           {'$set': {'email': email}})
            LOGGER.info('??????')
            return json({'status': 1})
        except Exception as e:
            LOGGER.exception(e)
            return json({'status': 0})
    else:
        return json({'status': -1})
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def test_obtain_access_token(self, rmock):
        rmock.post(requests_mock.ANY, text='{"access_token": "ANY_TOKEN"}')
        cmock = Mock()
        cmock.username = "ANY_USERNAME"
        cmock.auth_host = "ANY_URL.example"
        result = obtain_access_token(cmock, 'ANY_PASSWORD')
        self.assertEqual('ANY_TOKEN', result)
        received_post_data = parse_qs(rmock.request_history[0].text)
        expected_post_data = {u'username': [u'ANY_USERNAME'],
                              u'password': [u'ANY_PASSWORD'],
                              u'client_id': [u'jumpauth'],
                              u'grant_type': [u'password']}
        self.assertEqual(received_post_data, expected_post_data)
项目:mblog    作者:moling3650    | 项目源码 | 文件源码
def data_factory(app, handler):
    async def parse_data(request):
        logging.info('data_factory...')
        if request.method in ('POST', 'PUT'):
            if not request.content_type:
                return web.HTTPBadRequest(text='Missing Content-Type.')
            content_type = request.content_type.lower()
            if content_type.startswith('application/json'):
                request.__data__ = await request.json()
                if not isinstance(request.__data__, dict):
                    return web.HTTPBadRequest(text='JSON body must be object.')
                logging.info('request json: %s' % request.__data__)
            elif content_type.startswith(('application/x-www-form-urlencoded', 'multipart/form-data')):
                params = await request.post()
                request.__data__ = dict(**params)
                logging.info('request form: %s' % request.__data__)
            else:
                return web.HTTPBadRequest(text='Unsupported Content-Type: %s' % content_type)
        elif request.method == 'GET':
            qs = request.query_string
            request.__data__ = {k: v[0] for k, v in parse.parse_qs(qs, True).items()}
            logging.info('request query: %s' % request.__data__)
        else:
            request.__data__ = dict()
        return await handler(request)
    return parse_data

# ??????????????????Response??
项目:xavier    作者:bepress    | 项目源码 | 文件源码
def parse_body(self):
        if self.headers.get('Content-Type', '').startswith('application/json'):
            return json.loads(self.body)
        else:
            return MultiValueDict(parse_qs(self.body))
项目:eddie    作者:greenkey    | 项目源码 | 文件源码
def do_GET(self):
        """ Process GET requests.

            This method will process the requests in this form:

                `/command?parameter1=value1&...`

            It will use `in_message` parameter as the input message and reply
            with a JSON containing `out_message` propery:

                `{"out_message": "hello"}`
        """
        try:
            function, params = self.path.split("?")
            function, params = function[1:], parse_qs(params)
            self.send_response(200)
            self.end_headers()
            output_text = self.server.bot.process(
                "".join(params["in_message"])
            )
            output = {
                "out_message": output_text,
                "out_message_html": output_text.replace(
                    '&', '&amp;').replace(
                    '<', '&lt;').replace(
                    '>', '&gt;').replace(
                    '\n', '<br />')
            }
            self.wfile.write(json.dumps(output).encode("UTF-8"))
        except ValueError:
            # if no command is specified, serve the default html
            filename = os.path.join(
                os.path.dirname(__file__),
                'http',
                'index.html'
            )
            with open(filename, 'r') as template_file:
                self.send_response(200)
                self.end_headers()
                output = template_file.read()
            self.wfile.write(output.encode("UTF-8"))