Python six.moves.urllib.parse 模块,parse_qsl() 实例源码

我们从Python开源项目中,提取了以下30个代码示例,用于说明如何使用six.moves.urllib.parse.parse_qsl()

项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    """
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,
                 parsed.fragment)

    return urlparse.urlunparse(url_parts)
项目:dnsknife    作者:Gandi    | 项目源码 | 文件源码
def params(arg):
    """
    Parse `arg` and returns a dict describing the TPDA
    arguments.

    Resolution order:
        - `arg` is a string is parsed as an URL
        - `arg` has an items() method and is parsed
           (works for multidicts, ..)
        - `arg` is a iterable is parsed as a
           ((name, value), (name, value), ..)
    """
    if isinstance(arg, str):
        pr = parse.urlparse(arg)
        if not pr.query:
            raise exceptions.IncompleteURI
        return Params(parse.parse_qsl(pr.query)).params()
    elif hasattr(arg, 'items'):
        return Params(arg.items()).params()
    elif hasattr(arg, '__iter__'):
        return Params(arg).params()
项目:Flask-pyoidc    作者:zamzterz    | 项目源码 | 文件源码
def test_logout(self):
        end_session_endpoint = 'https://provider.example.com/end_session'
        post_logout_uri = 'https://client.example.com/post_logout'
        authn = OIDCAuthentication(self.app,
                                   provider_configuration_info={'issuer': ISSUER,
                                                                'end_session_endpoint': end_session_endpoint},
                                   client_registration_info={'client_id': 'foo',
                                                             'post_logout_redirect_uris': [post_logout_uri]})
        id_token = IdToken(**{'sub': 'sub1', 'nonce': 'nonce'})
        with self.app.test_request_context('/logout'):
            flask.session['access_token'] = 'abcde'
            flask.session['userinfo'] = {'foo': 'bar', 'abc': 'xyz'}
            flask.session['id_token'] = id_token.to_dict()
            flask.session['id_token_jwt'] = id_token.to_jwt()

            end_session_redirect = authn._logout()
            assert all(k not in flask.session for k in ['access_token', 'userinfo', 'id_token', 'id_token_jwt'])

            assert end_session_redirect.status_code == 303
            assert end_session_redirect.headers['Location'].startswith(end_session_endpoint)
            parsed_request = dict(parse_qsl(urlparse(end_session_redirect.headers['Location']).query))
            assert parsed_request['state'] == flask.session['end_session_state']
            assert parsed_request['id_token_hint'] == id_token.to_jwt()
            assert parsed_request['post_logout_redirect_uri'] == post_logout_uri
项目:deb-oslo.utils    作者:openstack    | 项目源码 | 文件源码
def params(self, collapse=True):
        """Extracts the query parameters from the split urls components.

        This method will provide back as a dictionary the query parameter
        names and values that were provided in the url.

        :param collapse: Boolean, turn on or off collapsing of query values
        with the same name. Since a url can contain the same query parameter
        name with different values it may or may not be useful for users to
        care that this has happened. This parameter when True uses the
        last value that was given for a given name, while if False it will
        retain all values provided by associating the query parameter name with
        a list of values instead of a single (non-list) value.
        """
        if self.query:
            if collapse:
                return dict(parse.parse_qsl(self.query))
            else:
                params = {}
                for (key, value) in parse.parse_qsl(self.query):
                    if key in params:
                        if isinstance(params[key], list):
                            params[key].append(value)
                        else:
                            params[key] = [params[key], value]
                    else:
                        params[key] = value
                return params
        else:
            return {}
项目:fanfou-py    作者:akgnah    | 项目源码 | 文件源码
def request_token(self):
        resp = self.oauth_request(self.request_token_url, 'GET')
        oauth_token = dict(parse.parse_qsl(resp.read().decode()))
        self.authorize_url = self.authorize_url % (oauth_token['oauth_token'], self.callback)
        self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
        return self.oauth_token
项目:fanfou-py    作者:akgnah    | 项目源码 | 文件源码
def access_token(self, oauth_token=None, oauth_verifier=None):
        self.oauth_token = oauth_token or self.oauth_token
        args = {'oauth_verifier': oauth_verifier} if oauth_verifier else {}  # if callback is oob
        resp = self.oauth_request(self.access_token_url, 'GET', args)
        oauth_token = dict(parse.parse_qsl(resp.read().decode()))
        self.oauth_token = {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
        return self.oauth_token
项目:fanfou-py    作者:akgnah    | 项目源码 | 文件源码
def xauth(self, username, password):
        args = {
            'x_auth_username': username,
            'x_auth_password': password,
            'x_auth_mode': 'client_auth'
        }
        resp = self.oauth_request(self.access_token_url, 'GET', args)
        oauth_token = dict(parse.parse_qsl(resp.read().decode()))
        return {'key': oauth_token['oauth_token'], 'secret': oauth_token['oauth_token_secret']}
项目:mixmatch    作者:openstack    | 项目源码 | 文件源码
def __init__(self, url):
        parts = parse.urlparse(url)
        _query = frozenset(parse.parse_qsl(parts.query))
        _path = parse.unquote_plus(parts.path)
        parts = parts._replace(query=_query, path=_path)
        self.parts = parts
项目:mixmatch    作者:openstack    | 项目源码 | 文件源码
def __init__(self, url):
        parts = parse.urlparse(url)
        _query = frozenset(parse.parse_qsl(parts.query))
        _path = parse.unquote_plus(parts.path)
        parts = parts._replace(query=_query, path=_path)
        self.parts = parts
项目:python-distilclient    作者:openstack    | 项目源码 | 文件源码
def _cs_request_with_retries(self, url, method, **kwargs):
        # Check that certain things are called correctly
        if method in ['GET', 'DELETE']:
            assert 'body' not in kwargs
        elif method == 'PUT':
            assert 'body' in kwargs

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        # Note the call
        self.callstack.append((method, url, kwargs.get('body', None)))
        status, headers, body = getattr(self, callback)(**kwargs)
        r = utils.TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
        return r, body

    #
    # Quotas
    #
项目:dnsknife    作者:Gandi    | 项目源码 | 文件源码
def _qsl_get_one(qstring, param):
    """Return one unique param from query string"""
    args = parse.parse_qsl(qstring)
    sigvals = [arg[1] for arg in args if arg[0] == param]

    if len(sigvals) != 1:
        raise exceptions.InvalidArgument(param)

    return sigvals[0]



# Uses RDAP
项目:cloud-custodian    作者:capitalone    | 项目源码 | 文件源码
def get_s3_uri(self, uri):
        parsed = urlparse(uri)
        client = self.session_factory().client('s3')
        params = dict(
            Bucket=parsed.netloc,
            Key=parsed.path[1:])
        if parsed.query:
            params.update(dict(parse_qsl(parsed.query)))
        result = client.get_object(**params)
        return result['Body'].read()
项目:github2gitlab    作者:chauffer    | 项目源码 | 文件源码
def get(self, url, query, cache):
        payloads_file = (self.tmpdir + "/" +
                         hashlib.sha1(url.encode('utf-8')).hexdigest() +
                         ".json")
        if (not cache or not os.access(payloads_file, 0) or
                time.time() - os.stat(payloads_file).st_mtime > 24 * 60 * 60):
            payloads = []
            next_query = query
            while next_query:
                log.debug(str(next_query))
                result = requests.get(url, params=next_query)
                payloads += result.json()
                next_query = None
                for link in result.headers.get('Link', '').split(','):
                    if 'rel="next"' in link:
                        m = re.search('<(.*)>', link)
                        if m:
                            parsed_url = parse.urlparse(m.group(1))
                            # append query in case it was not preserved
                            # (gitlab has that problem)
                            next_query = query
                            next_query.update(
                                dict(parse.parse_qsl(parsed_url.query))
                            )
            if cache:
                with open(payloads_file, 'w') as f:
                    json.dump(payloads, f)
        else:
            with open(payloads_file, 'r') as f:
                payloads = json.load(f)
        return payloads
项目:Flask-pyoidc    作者:zamzterz    | 项目源码 | 文件源码
def test_authenticatate_with_extra_request_parameters(self):
        extra_params = {"foo": "bar", "abc": "xyz"}
        authn = OIDCAuthentication(self.app, provider_configuration_info={'issuer': ISSUER},
                                   client_registration_info={'client_id': 'foo'},
                                   extra_request_args=extra_params)

        with self.app.test_request_context('/'):
            a = authn._authenticate()
        request_params = dict(parse_qsl(urlparse(a.location).query))
        assert set(extra_params.items()).issubset(set(request_params.items()))
项目:ansible-kong-module    作者:toast38coza    | 项目源码 | 文件源码
def test_configure_for_plugin(self):

        expected_url = "{}/consumers/joe/auth-key" . format (mock_kong_admin_url)
        responses.add(responses.POST, expected_url, status=201)

        data = { "key": "123" }
        response = self.api.configure_for_plugin("joe", "auth-key", data)

        assert response.status_code == 201

        body = parse_qs(responses.calls[0].request.body)
        body_exactly = parse_qsl(responses.calls[0].request.body)
        assert body['key'][0] == "123", \
            "Expect correct. data to be sent. Got: {}" . format (body_exactly)
项目:django-allauth-cas    作者:aureplop    | 项目源码 | 文件源码
def get_auth_params(self, request, action):
        settings = self.get_settings()
        ret = dict(settings.get('AUTH_PARAMS', {}))
        dynamic_auth_params = request.GET.get('auth_params')
        if dynamic_auth_params:
            ret.update(dict(parse_qsl(dynamic_auth_params)))
        return ret
项目:localstack    作者:localstack    | 项目源码 | 文件源码
def expand_redirect_url(starting_url, key, bucket):
    """ Add key and bucket parameters to starting URL query string. """
    parsed = urlparse.urlparse(starting_url)
    query = collections.OrderedDict(urlparse.parse_qsl(parsed.query))
    query.update([('key', key), ('bucket', bucket)])

    redirect_url = urlparse.urlunparse((
        parsed.scheme, parsed.netloc, parsed.path,
        parsed.params, urlparse.urlencode(query), None))

    return redirect_url
项目:social-core    作者:python-social-auth    | 项目源码 | 文件源码
def request_access_token(self, *args, **kwargs):
        response = self.request(*args, **kwargs)
        return dict(parse_qsl(response.text))
项目:wsgiprox    作者:webrecorder    | 项目源码 | 文件源码
def __call__(self, env, start_response):
        status = '200 OK'

        params = dict(parse_qsl(env.get('QUERY_STRING')))

        ws = env.get('wsgi.websocket')
        if ws and not params.get('ignore_ws'):
            msg = 'WS Request Url: ' + env.get('REQUEST_URI', '')
            msg += ' Echo: ' + ws.receive()
            ws.send(msg)
            return []

        result = 'Requested Url: ' + env.get('REQUEST_URI', '')
        if env['REQUEST_METHOD'] == 'POST':
            result += ' Post Data: ' + env['wsgi.input'].read(int(env['CONTENT_LENGTH'])).decode('utf-8')

        if params.get('addproxyhost') == 'true':
            result += ' Proxy Host: ' + env.get('wsgiprox.proxy_host', '')

        result = result.encode('iso-8859-1')

        if params.get('chunked') == 'true':
            headers = []
        else:
            headers = [('Content-Length', str(len(result)))]

        write = start_response(status, headers)

        if params.get('write') == 'true':
            write(result)
            return iter([])
        else:
            return iter([result])


# ============================================================================
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        self.last_request_id = headers.get('x-openstack-request-id',
                                           'req-test')
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:python-kingbirdclient    作者:openstack    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def temp_url(self, lifetime=300, method='GET', inline=True, filename=None):
        """Obtains a temporary URL to an object.

        Args:
            lifetime (int): The time (in seconds) the temporary
                URL will be valid
            method (str): The HTTP method that can be used on
                the temporary URL
            inline (bool, default True): If False, URL will have a
                Content-Disposition header that causes browser to download as
                attachment.
            filename (str, optional): A urlencoded filename to use for
                attachment, otherwise defaults to object name
        """
        global_options = settings.get()['swift']
        auth_url = global_options.get('auth_url')
        temp_url_key = global_options.get('temp_url_key')

        if not self.resource:
            raise ValueError('can only create temporary URL on object')
        if not temp_url_key:
            raise ValueError(
                'a temporary url key must be set with settings.update '
                'or by setting the OS_TEMP_URL_KEY environment variable')
        if not auth_url:
            raise ValueError(
                'an auth url must be set with settings.update '
                'or by setting the OS_AUTH_URL environment variable')

        obj_path = '/v1/%s' % self[len(self.drive):]
        # Generate the temp url using swifts helper. Note that this method is ONLY
        # useful for obtaining the temp_url_sig and the temp_url_expires parameters.
        # These parameters will be used to construct a properly-escaped temp url
        obj_url = generate_temp_url(obj_path, lifetime, temp_url_key, method)
        query_begin = obj_url.rfind('temp_url_sig', 0, len(obj_url))
        obj_url_query = obj_url[query_begin:]
        obj_url_query = dict(parse.parse_qsl(obj_url_query))

        query = ['temp_url_sig=%s' % obj_url_query['temp_url_sig'],
                 'temp_url_expires=%s' % obj_url_query['temp_url_expires']]
        if inline:
            query.append('inline')
        if filename:
            query.append('filename=%s' % parse.quote(filename))

        auth_url_parts = parse.urlparse(auth_url)
        return parse.urlunparse((auth_url_parts.scheme,
                                 auth_url_parts.netloc,
                                 parse.quote(obj_path),
                                 auth_url_parts.params,
                                 '&'.join(query),
                                 auth_url_parts.fragment))
项目:chalktalk_docs    作者:loremIpsum1771    | 项目源码 | 文件源码
def encode_uri(uri):
    split = list(urlsplit(uri))
    split[1] = split[1].encode('idna').decode('ascii')
    split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii')
    query = list((q, quote_plus(v.encode('utf-8')))
                 for (q, v) in parse_qsl(split[3]))
    split[3] = urlencode(query).decode('ascii')
    return urlunsplit(split)
项目:python-karborclient    作者:openstack    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:python-distilclient    作者:openstack    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        self.last_request_id = headers.get('x-openstack-request-id',
                                           'req-test')
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def client_request(self, client, method, url, **kwargs):
        # Check that certain things are called correctly
        if method in ["GET", "DELETE"]:
            assert "json" not in kwargs

        # Note the call
        self.callstack.append(
            (method,
             url,
             kwargs.get("headers") or {},
             kwargs.get("json") or kwargs.get("data")))
        try:
            fixture = self.fixtures[url][method]
        except KeyError:
            pass
        else:
            return TestResponse({"headers": fixture[0],
                                 "text": fixture[1]})

        # Call the method
        args = parse.parse_qsl(parse.urlparse(url)[4])
        kwargs.update(args)
        munged_url = url.rsplit('?', 1)[0]
        munged_url = munged_url.strip('/').replace('/', '_').replace('.', '_')
        munged_url = munged_url.replace('-', '_')

        callback = "%s_%s" % (method.lower(), munged_url)

        if not hasattr(self, callback):
            raise AssertionError('Called unknown API method: %s %s, '
                                 'expected fakes method name: %s' %
                                 (method, url, callback))

        resp = getattr(self, callback)(**kwargs)
        if len(resp) == 3:
            status, headers, body = resp
        else:
            status, body = resp
            headers = {}
        return TestResponse({
            "status_code": status,
            "text": body,
            "headers": headers,
        })