Python six.moves.urllib.parse 模块,parse_qs() 实例源码

我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用six.moves.urllib.parse.parse_qs()

项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def do_GET(self):
        self.send_response(200)
        self.send_header('Content-Type', 'text/plain')
        self.end_headers()

        parts = urlparse(self.path)
        query = parse_qs(parts.query)

        result = {}
        for k, v in query.items():
            if len(v) == 1:
                result[k] = v[0]
            elif len(v) == 0:
                result[k] = None
            else:
                result[k] = v

        self.server.callback_result = result

        self.wfile.write(b'You can close this window now')
项目:panko    作者:openstack    | 项目源码 | 文件源码
def _parse_connection_url(url):
        """Parse connection parameters from a database url.

        .. note::

          HBase Thrift does not support authentication and there is no
          database name, so we are not looking for these in the url.
        """
        opts = {}
        result = netutils.urlsplit(url)
        opts['table_prefix'] = urlparse.parse_qs(
            result.query).get('table_prefix', [None])[0]
        opts['table_prefix_separator'] = urlparse.parse_qs(
            result.query).get('table_prefix_separator', ['_'])[0]
        opts['dbtype'] = result.scheme
        if ':' in result.netloc:
            opts['host'], port = result.netloc.split(':')
        else:
            opts['host'] = result.netloc
            port = 9090
        opts['port'] = port and int(port) or 9090
        return opts
项目:novajoin    作者:openstack    | 项目源码 | 文件源码
def get_body(self, request):

        params = urlparse.parse_qs(request.query_string)
        if len(request.body) == 0:
            LOG.debug("Empty body provided in request")
            return None, params

        try:
            content_type = request.get_content_type()
        except exception.InvalidContentType:
            LOG.debug("Unrecognized Content-Type provided in request")
            return None, ''

        if not content_type:
            LOG.debug("No Content-Type provided in request")
            return None, ''

        return content_type, request.body
项目:callisto-core    作者:project-callisto    | 项目源码 | 文件源码
def facebook_validation_function(url):
    try:
        url_parts = _get_url_parts(url)
        # check if acceptable domain
        domain = url_parts[1]
        if not (domain == 'facebook.com' or domain.endswith('.facebook.com')):
            return None
        path = _get_initial_path(url_parts)

        # old style numeric profiles
        if path == "profile.php":  # ex. https://www.facebook.com/profile.php?id=100010279981469
            path = parse_qs(url_parts[3]).get('id')[0]
        if path == 'people':  # ex. https://www.facebook.com/people/John-Doe/100013326345115
            path = url_parts[2].strip('/').split('/')[2].lower()

        # TODO: validate against allowed username characteristics
        # https://github.com/project-callisto/callisto-core/issues/181
        if not path or path == "" or path.endswith(
                '.php') or path in generic_fb_urls:
            return None
        else:
            return path
    except ValidationError:
        return None
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def test_authorize_view(self):
        with self.app.test_client() as client:
            response = client.get('/oauth2authorize')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertIn(oauth2client.GOOGLE_AUTH_URI, location)
            self.assertNotIn(self.oauth2.client_secret, location)
            self.assertIn(self.oauth2.client_id, q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?return_url=/test')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?extra_param=test')
            location = response.headers['Location']
            self.assertIn('extra_param=test', location)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def test_authorize_view(self):
        with self.app.test_client() as client:
            response = client.get('/oauth2authorize')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertIn(oauth2client.GOOGLE_AUTH_URI, location)
            self.assertNotIn(self.oauth2.client_secret, location)
            self.assertIn(self.oauth2.client_id, q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?return_url=/test')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?extra_param=test')
            location = response.headers['Location']
            self.assertIn('extra_param=test', location)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def _pagination(self, collection, path, **params):
        if params.get('page_reverse', False):
            linkrel = 'previous'
        else:
            linkrel = 'next'
        next = True
        while next:
            res = self.get(path, params=params)
            yield res
            next = False
            try:
                for link in res['%s_links' % collection]:
                    if link['rel'] == linkrel:
                        query_str = urlparse.urlparse(link['href']).query
                        params = urlparse.parse_qs(query_str)
                        next = True
                        break
            except KeyError:
                break
项目:eclipse2017    作者:google    | 项目源码 | 文件源码
def test_authorize_view(self):
        with self.app.test_client() as client:
            response = client.get('/oauth2authorize')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])

            self.assertIn(GOOGLE_AUTH_URI, location)
            self.assertNotIn(self.oauth2.client_secret, location)
            self.assertIn(self.oauth2.client_id, q['client_id'])
            self.assertEqual(
                flask.session['google_oauth2_csrf_token'], state['csrf_token'])
            self.assertEqual(state['return_url'], '/')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?return_url=/test')
            location = response.headers['Location']
            q = urlparse.parse_qs(location.split('?', 1)[1])
            state = json.loads(q['state'][0])
            self.assertEqual(state['return_url'], '/test')

        with self.app.test_client() as client:
            response = client.get('/oauth2authorize?extra_param=test')
            location = response.headers['Location']
            self.assertIn('extra_param=test', location)
项目:aspen.py    作者:AspenWeb    | 项目源码 | 文件源码
def __init__(self, raw, errors='replace'):
        """Takes a string of type application/x-www-form-urlencoded.
        """
        # urllib needs bytestrings in py2 and unicode strings in py3
        raw_str = raw.encode('ascii') if PY2 else raw

        self.decoded = _decode(unquote_plus(raw_str), errors=errors)
        self.raw = raw

        common_kw = dict(keep_blank_values=True, strict_parsing=False)
        if PY2:
            # in python 2 parse_qs does its own unquote_plus'ing ...
            as_dict = parse_qs(raw_str, **common_kw)
            # ... but doesn't decode to unicode.
            for k, vals in list(as_dict.items()):
                as_dict[_decode(k, errors=errors)] = [
                    _decode(v, errors=errors) for v in vals
                ]
        else:
            # in python 3 parse_qs does the decoding
            as_dict = parse_qs(raw_str, errors=errors, **common_kw)

        Mapping.__init__(self, as_dict)
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def update_query_parameters(url, query_parameters):
    """
    Return url with updated query parameters.

    Arguments:
        url (str): Original url whose query parameters need to be updated.
        query_parameters (dict): A dictionary containing query parameters to be added to course selection url.

    Returns:
        (slug): slug identifier for the identity provider that can be used for identity verification of
            users associated the enterprise customer of the given user.

    """
    scheme, netloc, path, query_string, fragment = urlsplit(url)
    url_params = parse_qs(query_string)

    # Update url query parameters
    url_params.update(query_parameters)

    return urlunsplit(
        (scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
    )
项目:edx-enterprise    作者:edx    | 项目源码 | 文件源码
def traverse_pagination(response, endpoint):
    """
    Traverse a paginated API response.

    Extracts and concatenates "results" (list of dict) returned by DRF-powered
    APIs.

    Arguments:
        response (Dict): Current response dict from service API
        endpoint (slumber Resource object): slumber Resource object from edx-rest-api-client

    Returns:
        list of dict.

    """
    results = response.get('results', [])

    next_page = response.get('next')
    while next_page:
        querystring = parse_qs(urlparse(next_page).query, keep_blank_values=True)
        response = endpoint.get(**querystring)
        results += response.get('results', [])
        next_page = response.get('next')

    return results
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def get_msg(hinfo, binding):
    if binding == BINDING_SOAP:
        xmlstr = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        xmlstr = _inp[i:j]
    else:  # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        xmlstr = parse_qs(parts.query)["SAMLRequest"][0]

    return xmlstr

# ------------------------------------------------------------------------
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def get_msg(hinfo, binding, response=False):
    if binding == BINDING_SOAP:
        msg = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        msg = _inp[i:j]
    elif binding == BINDING_HTTP_ARTIFACT:
        # either by POST or by redirect
        if hinfo["data"]:
            _inp = hinfo["data"][3]
            i = _inp.find(TAG1)
            i += len(TAG1) + 1
            j = _inp.find('"', i)
            msg = _inp[i:j]
        else:
            parts = urlparse(hinfo["url"])
            msg = parse_qs(parts.query)["SAMLart"][0]
    else: # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        msg = parse_qs(parts.query)["SAMLRequest"][0]

    return msg
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def get_msg(hinfo, binding, response=False):
    if binding == BINDING_SOAP:
        msg = hinfo["data"]
    elif binding == BINDING_HTTP_POST:
        _inp = hinfo["data"][3]
        i = _inp.find(TAG1)
        i += len(TAG1) + 1
        j = _inp.find('"', i)
        msg = _inp[i:j]
    elif binding == BINDING_URI:
        if response:
            msg = hinfo["data"]
        else:
            msg = ""
            return parse_qs(hinfo["url"].split("?")[1])["ID"][0]
    else:  # BINDING_HTTP_REDIRECT
        parts = urlparse(hinfo["headers"][0][1])
        msg = parse_qs(parts.query)["SAMLRequest"][0]

    return msg
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def test_parse_faulty_request_to_err_status(self):
        req_id, authn_request = self.client.create_authn_request(
            destination="http://www.example.com")

        binding = BINDING_HTTP_REDIRECT
        htargs = self.client.apply_binding(binding, "%s" % authn_request,
                                           "http://www.example.com", "abcd")
        _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
        print(_dict)

        try:
            self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
            status = None
        except OtherError as oe:
            print(oe.args)
            status = s_utils.error_status_factory(oe)

        assert status
        print(status)
        assert _eq(status.keyswv(), ["status_code", "status_message"])
        assert status.status_message.text == 'Not destined for me!'
        status_code = status.status_code
        assert _eq(status_code.keyswv(), ["status_code", "value"])
        assert status_code.value == samlp.STATUS_RESPONDER
        assert status_code.status_code.value == samlp.STATUS_UNKNOWN_PRINCIPAL
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def test_parse_ok_request(self):
        req_id, authn_request = self.client.create_authn_request(
            message_id="id1", destination="http://localhost:8088/sso")

        print(authn_request)
        binding = BINDING_HTTP_REDIRECT
        htargs = self.client.apply_binding(binding, "%s" % authn_request,
                                           "http://www.example.com", "abcd")
        _dict = parse_qs(htargs["headers"][0][1].split('?')[1])
        print(_dict)

        req = self.server.parse_authn_request(_dict["SAMLRequest"][0], binding)
        # returns a dictionary
        print(req)
        resp_args = self.server.response_args(req.message, [BINDING_HTTP_POST])
        assert resp_args["destination"] == "http://lingon.catalogix.se:8087/"
        assert resp_args["in_response_to"] == "id1"
        name_id_policy = resp_args["name_id_policy"]
        assert _eq(name_id_policy.keyswv(), ["format", "allow_create"])
        assert name_id_policy.format == saml.NAMEID_FORMAT_TRANSIENT
        assert resp_args[
                   "sp_entity_id"] == "urn:mace:example.com:saml:roland:sp"
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def parse_discovery_service_response(url="", query="",
            returnIDParam="entityID"):
        """
        Deal with the response url from a Discovery Service

        :param url: the url the user was redirected back to or
        :param query: just the query part of the URL.
        :param returnIDParam: This is where the identifier of the IdP is
            place if it was specified in the query. Default is 'entityID'
        :return: The IdP identifier or "" if none was given
        """

        if url:
            part = urlparse(url)
            qsd = parse_qs(part[4])
        elif query:
            qsd = parse_qs(query)
        else:
            qsd = {}

        try:
            return qsd[returnIDParam][0]
        except KeyError:
            return ""
项目:deb-python-pysaml2    作者:openstack    | 项目源码 | 文件源码
def disco(environ, start_response, _sp):
    query = parse_qs(environ["QUERY_STRING"])
    entity_id = query["entityID"][0]
    _sid = query["sid"][0]
    came_from = CACHE.outstanding_queries[_sid]
    _sso = SSO(_sp, environ, start_response, cache=CACHE, **ARGS)
    resp = _sso.redirect_to_auth(_sso.sp, entity_id, came_from)

    # Add cookie
    kaka = make_cookie("ve_disco", entity_id, "SEED_SAW")
    resp.headers.append(kaka)
    return resp(environ, start_response)


# ----------------------------------------------------------------------------


# noinspection PyUnusedLocal
项目:cloak-server    作者:encryptme    | 项目源码 | 文件源码
def _post_servers(self, request):
        # type: (requests.PreparedRequest) -> requests.Response
        data = parse_qs(force_text(request.body))

        self.server_id = self._public_id('srv')
        self.auth_token = ''.join(random.choice(mixed_alphabet) for i in xrange(20))
        self.api_version = force_text(request.headers['X-Cloak-API-Version'])
        self.name = data['name'][0]
        self.target_id = data['target'][0]

        # Make sure these exist
        data['email'][0]
        data['password'][0]

        result = {
            'server_id': self.server_id,
            'auth_token': self.auth_token,
            'server': self._server_result(),
        }

        return self._response(request, 201, result)
项目:cloak-server    作者:encryptme    | 项目源码 | 文件源码
def _post_server_csr(self, request):
        # type: (requests.PreparedRequest) -> requests.Response
        data = parse_qs(force_text(request.body))

        if self._authenticate(request):
            self.csr = data['csr'][0]
            self.pki_tag = ''.join(random.choice(mixed_alphabet) for i in xrange(16))
            response = self._response(request, 202)
        else:
            response = self._response(request, 401)

        return response

    #
    # Utils
    #
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def forward_request(self, method, path, data, headers):
        req_data = None
        if method == 'POST' and path == '/':
            req_data = urlparse.parse_qs(to_str(data))
            action = req_data.get('Action')[0]

        if req_data:
            if action == 'CreateChangeSet':
                return create_change_set(req_data)
            elif action == 'DescribeChangeSet':
                return describe_change_set(req_data)
            elif action == 'ExecuteChangeSet':
                return execute_change_set(req_data)
            elif action == 'UpdateStack' and req_data.get('TemplateURL'):
                # Temporary fix until the moto CF backend can handle TemplateURL (currently fails)
                url = re.sub(r'https?://s3\.amazonaws\.com', aws_stack.get_local_service_url('s3'),
                    req_data.get('TemplateURL')[0])
                req_data['TemplateBody'] = requests.get(url).content
                modified_data = urlparse.urlencode(req_data, doseq=True)
                return Request(data=modified_data, headers=headers, method=method)
            elif action == 'ValidateTemplate':
                return validate_template(req_data)

        return True
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def oauth_authorization_request(self, token):
        """
        Generates the URL for the authorization link
        """
        if not isinstance(token, dict):
            token = parse_qs(token)

        oauth_token = token.get(self.OAUTH_TOKEN_PARAMETER_NAME)[0]
        state = self.get_or_create_state()
        base_url = self.setting('MEDIAWIKI_URL')

        return '{0}?{1}'.format(base_url, urlencode({
            'title': 'Special:Oauth/authenticate',
            self.OAUTH_TOKEN_PARAMETER_NAME: oauth_token,
            self.REDIRECT_URI_PARAMETER_NAME: self.get_redirect_uri(state)
        }))
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def access_token(self, token):
        """
        Fetches the Mediawiki access token.
        """
        auth_token = self.oauth_auth(token)

        response = requests.post(
            url=self.setting('MEDIAWIKI_URL'),
            params={'title': 'Special:Oauth/token'},
            auth=auth_token
        )
        credentials = parse_qs(response.content)
        oauth_token_key = credentials.get(b('oauth_token'))[0]
        oauth_token_secret = credentials.get(b('oauth_token_secret'))[0]
        oauth_token_key = oauth_token_key.decode()
        oauth_token_secret = oauth_token_secret.decode()

        return {
            'oauth_token': oauth_token_key,
            'oauth_token_secret': oauth_token_secret
        }
项目:collectors    作者:opentrials    | 项目源码 | 文件源码
def _process_url(page_from, page_to, url):

    # Get url page
    query = urlparse(url).query
    query = parse_qs(query)
    page = query.get('page')

    # Preserve if match
    if page:
        page_from = int(page_from)
        page_to = int(page_to)
        page = int(page[0])
        if page >= page_from and page <= page_to:
            return url

    return None
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_servers_with_limit(self):
        req = self.req('/fake/servers?limit=3')
        res_dict = self.controller.index(req)

        servers = res_dict['servers']
        self.assertEqual([s['id'] for s in servers],
                [fakes.get_fake_uuid(i) for i in range(len(servers))])

        servers_links = res_dict['servers_links']
        self.assertEqual(servers_links[0]['rel'], 'next')
        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected_params = {'limit': ['3'],
                           'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected_params))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_server_details_with_limit(self):
        req = self.req('/fake/servers/detail?limit=3')
        res = self.controller.detail(req)

        servers = res['servers']
        self.assertEqual([s['id'] for s in servers],
                [fakes.get_fake_uuid(i) for i in range(len(servers))])

        servers_links = res['servers_links']
        self.assertEqual(servers_links[0]['rel'], 'next')

        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers/detail', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_server_details_with_limit_and_other_params(self):
        req = self.req('/fake/servers/detail'
                                      '?limit=3&blah=2:t'
                                      '&sort_key=id1&sort_dir=asc')
        res = self.controller.detail(req)

        servers = res['servers']
        self.assertEqual([s['id'] for s in servers],
                [fakes.get_fake_uuid(i) for i in range(len(servers))])

        servers_links = res['servers_links']
        self.assertEqual(servers_links[0]['rel'], 'next')

        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers/detail', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected = {'limit': ['3'], 'blah': ['2:t'],
                    'sort_key': ['id1'], 'sort_dir': ['asc'],
                    'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_servers_with_limit(self):
        req = fakes.HTTPRequest.blank('/fake/servers?limit=3')
        res_dict = self.controller.index(req)

        servers = res_dict['servers']
        self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))],
                         [s['id'] for s in servers])

        servers_links = res_dict['servers_links']
        self.assertEqual('next', servers_links[0]['rel'])
        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected_params = {'limit': ['3'],
                           'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected_params))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_server_details_with_limit(self):
        req = fakes.HTTPRequest.blank('/fake/servers/detail?limit=3')
        res = self.controller.detail(req)

        servers = res['servers']
        self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))],
                         [s['id'] for s in servers])

        servers_links = res['servers_links']
        self.assertEqual('next', servers_links[0]['rel'])

        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers/detail', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def test_get_server_details_with_limit_and_other_params(self):
        req = fakes.HTTPRequest.blank('/fake/servers/detail'
                                      '?limit=3&blah=2:t'
                                      '&sort_key=id1&sort_dir=asc')
        res = self.controller.detail(req)

        servers = res['servers']
        self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))],
                         [s['id'] for s in servers])

        servers_links = res['servers_links']
        self.assertEqual('next', servers_links[0]['rel'])
        # Retrieve the parameters from the next link, they should contain the
        # same limit, filter, and sort information as the original request as
        # well as a marker; this ensures that the caller can simply use the
        # "next" link and that they do not need to manually insert the limit
        # and sort information.
        href_parts = urlparse.urlparse(servers_links[0]['href'])
        self.assertEqual('/v2/fake/servers/detail', href_parts.path)
        params = urlparse.parse_qs(href_parts.query)
        expected = {'limit': ['3'], 'blah': ['2:t'],
                    'sort_key': ['id1'], 'sort_dir': ['asc'],
                    'marker': [fakes.get_fake_uuid(2)]}
        self.assertThat(params, matchers.DictMatches(expected))
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def delete(self, req, id):
        if self.ext_mgr.is_loaded('os-extended-quotas'):
            context = req.environ['nova.context']
            authorize_delete(context)
            params = urlparse.parse_qs(req.environ.get('QUERY_STRING', ''))
            user_id = params.get('user_id', [None])[0]
            if user_id and not self.ext_mgr.is_loaded('os-user-quotas'):
                raise webob.exc.HTTPNotFound()
            try:
                nova.context.authorize_project_context(context, id)
                # NOTE(alex_xu): back-compatible with db layer hard-code admin
                # permission checks. This has to be left only for API v2.0
                # because this version has to be stable even if it means that
                # only admins can call this method while the policy could be
                # changed.
                nova.context.require_admin_context(context)
                if user_id:
                    QUOTAS.destroy_all_by_project_and_user(context,
                                                           id, user_id)
                else:
                    QUOTAS.destroy_all_by_project(context, id)
                return webob.Response(status_int=202)
            except exception.Forbidden:
                raise webob.exc.HTTPForbidden()
        raise webob.exc.HTTPNotFound()
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def test_simple_notification(self):
        responses.add('POST', 'https://api.pushover.net/1/messages.json', body=SUCCESS)
        self.plugin.set_option('userkey', 'abcdef', self.project)
        self.plugin.set_option('apikey', 'ghijkl', self.project)

        group = self.create_group(message='Hello world', culprit='foo.bar')
        event = self.create_event(
            group=group,
            message='Hello world',
            tags={'level': 'warning'},
        )

        rule = Rule.objects.create(project=self.project, label='my rule')

        notification = Notification(event=event, rule=rule)

        with self.options({'system.url-prefix': 'http://example.com'}):
            self.plugin.notify(notification)

        request = responses.calls[0].request
        payload = parse_qs(request.body)
        assert payload == {
            'message': ['{}\n\nTags: level=warning'.format(event.get_legacy_message())],
            'title': ['Bar: Hello world'],
            'url': ['http://example.com/baz/bar/issues/{}/'.format(group.id)],
            'url_title': ['Issue Details'],
            'priority': ['0'],
            'user': ['abcdef'],
            'token': ['ghijkl'],
        }
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def test_notification_without_culprit(self):
        responses.add('POST', 'http://example.com/slack')
        self.plugin.set_option('webhook', 'http://example.com/slack', self.project)
        self.plugin.set_option('exclude_culprit', True, self.project)

        group = self.create_group(message='Hello world', culprit='foo.bar')
        event = self.create_event(group=group, message='Hello world', tags={'level': 'warning'})

        rule = Rule.objects.create(project=self.project, label='my rule')

        notification = Notification(event=event, rule=rule)

        with self.options({'system.url-prefix': 'http://example.com'}):
            self.plugin.notify(notification)

        request = responses.calls[0].request
        payload = json.loads(parse_qs(request.body)['payload'][0])
        assert payload == {
            'username':
            'Sentry',
            'attachments': [
                {
                    'color': '#f18500',
                    'fields': [
                        {
                            'short': True,
                            'value': 'foo Bar',
                            'title': 'Project'
                        },
                    ],
                    'fallback': '[foo Bar] Hello world',
                    'title': 'Hello world',
                    'title_link': 'http://example.com/baz/bar/issues/1/?referrer=slack',
                },
            ],
        }
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def test_notification_without_project(self):
        responses.add('POST', 'http://example.com/slack')
        self.plugin.set_option('webhook', 'http://example.com/slack', self.project)
        self.plugin.set_option('exclude_project', True, self.project)

        group = self.create_group(message='Hello world', culprit='foo.bar')
        event = self.create_event(group=group, message='Hello world', tags={'level': 'warning'})

        rule = Rule.objects.create(project=self.project, label='my rule')

        notification = Notification(event=event, rule=rule)

        with self.options({'system.url-prefix': 'http://example.com'}):
            self.plugin.notify(notification)

        request = responses.calls[0].request
        payload = json.loads(parse_qs(request.body)['payload'][0])
        assert payload == {
            'username':
            'Sentry',
            'attachments': [
                {
                    'color': '#f18500',
                    'fields': [
                        {
                            'short': False,
                            'value': 'foo.bar',
                            'title': 'Culprit',
                        },
                    ],
                    'fallback': '[foo Bar] Hello world',
                    'title': 'Hello world',
                    'title_link': 'http://example.com/baz/bar/issues/1/?referrer=slack',
                },
            ],
        }
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def query(self):
        return parse.parse_qs(self.parsed_url.query,
                              keep_blank_values=True)
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def query(self):
        return parse.parse_qs(self.parsed_url.query,
                              keep_blank_values=True)
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def query(self):
        return parse.parse_qs(self.parsed_url.query,
                              keep_blank_values=True)
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_reference_name(self):
        full_url = protocol.ticket_request_url(
            "http://example.co.uk/path/to/resource", reference_name="1",
            start=2, end=100)
        parsed = urlparse(full_url)
        self.assertEqual(parsed.scheme, "http")
        self.assertEqual(parsed.netloc, "example.co.uk")
        self.assertEqual(parsed.path, "/path/to/resource")
        query = parse_qs(parsed.query)
        self.assertEqual(query["referenceName"], ["1"])
        self.assertEqual(query["start"], ["2"])
        self.assertEqual(query["end"], ["100"])
        self.assertEqual(len(query), 3)
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_reference_md5(self):
        md5 = "b9185d4fade27aa27e17f25fafec695f"
        full_url = protocol.ticket_request_url(
            "https://example.com/resource", reference_md5=md5)
        parsed = urlparse(full_url)
        self.assertEqual(parsed.scheme, "https")
        self.assertEqual(parsed.netloc, "example.com")
        self.assertEqual(parsed.path, "/resource")
        query = parse_qs(parsed.query)
        self.assertEqual(query["referenceMD5"], [md5])
        self.assertEqual(len(query), 1)
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_format(self):
        for data_format in ["cram", "CRAM", "BAM"]:
            full_url = protocol.ticket_request_url(
                "http://example.co.uk/path/to/resource", data_format=data_format)
            parsed = urlparse(full_url)
            self.assertEqual(parsed.scheme, "http")
            self.assertEqual(parsed.netloc, "example.co.uk")
            self.assertEqual(parsed.path, "/path/to/resource")
            query = parse_qs(parsed.query)
            self.assertEqual(query["format"], [data_format.upper()])
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def test_embedded_query_strings(self):
        full_url = protocol.ticket_request_url("http://a.com/stuff?a=a&b=b")
        query = parse_qs(urlparse(full_url).query)
        self.assertEqual(query["a"], ["a"])
        self.assertEqual(query["b"], ["b"])

        full_url = protocol.ticket_request_url(
            "http://a.com/stuff?a=a&b=b", reference_name="123")
        query = parse_qs(urlparse(full_url).query)
        self.assertEqual(query["a"], ["a"])
        self.assertEqual(query["b"], ["b"])
        self.assertEqual(query["referenceName"], ["123"])
项目:htsget    作者:jeromekelleher    | 项目源码 | 文件源码
def ticket_request_url(
        url, fmt=None, reference_name=None, reference_md5=None,
        start=None, end=None, fields=None, tags=None, notags=None,
        data_format=None):
    parsed_url = urlparse(url)
    get_vars = parse_qs(parsed_url.query)
    # TODO error checking
    if reference_name is not None:
        get_vars["referenceName"] = reference_name
    if reference_md5 is not None:
        get_vars["referenceMD5"] = reference_md5
    if start is not None:
        get_vars["start"] = int(start)
    if end is not None:
        get_vars["end"] = int(end)
    if data_format is not None:
        get_vars["format"] = data_format.upper()
    # if fields is not None:
    #     get_vars["fields"] = ",".join(fields)
    # if tags is not None:
    #     get_vars["tags"] = ",".join(tags)
    # if notags is not None:
    #     get_vars["notags"] = ",".join(notags)
    new_url = list(parsed_url)
    new_url[4] = urlencode(get_vars, doseq=True)
    return urlunparse(new_url)