Python six.moves.urllib.parse 模块,urlencode() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.moves.urllib.parse.urlencode()

项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def view_autocomplete(self, request, group, **kwargs):
        field = request.GET.get('autocomplete_field')
        query = request.GET.get('autocomplete_query')
        if field != 'issue_id' or not query:
            return Response({'issue_id': []})
        query = query.encode('utf-8')
        _url = '%s?%s' % (self.build_api_url(group, 'search'), urlencode({'query': query}))
        try:
            req = self.make_api_request(group.project, _url)
            body = safe_urlread(req)
        except (requests.RequestException, PluginError) as e:
            return self.handle_api_error(e)

        try:
            json_resp = json.loads(body)

        except ValueError as e:
            return self.handle_api_error(e)

        resp = json_resp.get('stories', {})
        stories = resp.get('stories', [])
        issues = [{'text': '(#%s) %s' % (i['id'], i['name']), 'id': i['id']} for i in stories]

        return Response({field: issues})
项目:umapi-client.py    作者:adobe-apiplatform    | 项目源码 | 文件源码
def __call__(self):
        headers = {
            "Content-Type": "application/x-www-form-urlencoded",
            "Cache-Control": "no-cache",
        }
        body = urlparse.urlencode({
            "client_id": self.api_key,
            "client_secret": self.client_secret,
            "jwt_token": self.jwt_token
        })

        r = requests.post(self.endpoint, headers=headers, data=body)
        if r.status_code != 200:
            raise RuntimeError("Unable to authorize against {}:\n"
                               "Response Code: {:d}, Response Text: {}\n"
                               "Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers))

        self.set_expiry(r.json()['expires_in'])

        return r.json()['access_token']
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _build_query(self, params):
    """Builds a query string.

    Args:
      params: dict, the query parameters

    Returns:
      The query parameters properly encoded into an HTTP URI query string.
    """
    if self.alt_param is not None:
      params.update({'alt': self.alt_param})
    astuples = []
    for key, value in six.iteritems(params):
      if type(value) == type([]):
        for x in value:
          x = x.encode('utf-8')
          astuples.append((key, x))
      else:
        if isinstance(value, six.text_type) and callable(value.encode):
          value = value.encode('utf-8')
        astuples.append((key, value))
    return '?' + urlencode(astuples)
项目:watcher-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def _list_request(self, resource, permanent=False, **kwargs):
        """Get the list of objects of the specified type.

        :param resource: The name of the REST resource, e.g., 'audits'.
        "param **kw: Parameters for the request.
        :return: A tuple with the server response and deserialized JSON list
                 of objects
        """

        uri = self._get_uri(resource, permanent=permanent)
        if kwargs:
            uri += "?%s" % urlparse.urlencode(kwargs)

        resp, body = self.get(uri)
        self.expected_success(200, int(resp['status']))

        return resp, self.deserialize(body)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def build_login_url(self, redirect_uri, state=None, scope=None):
        if not state:
            state = generate_token()

        params = {
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'state': state,
        }

        if scope:
            params['scope'] = scope

        login_url = OAUTH_DIALOG_URL.format(urlencode(params))

        return login_url, state
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def build_login_url(self, redirect_uri, state=None, scope=None):
        if not state:
            state = generate_token()

        params = {
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'state': state,
        }

        if scope:
            params['scope'] = scope

        login_url = OAUTH_DIALOG_URL.format(urlencode(params))

        return login_url, state
项目:flask-api-skeleton    作者:ianunruh    | 项目源码 | 文件源码
def build_login_url(self, redirect_uri, state=None, scope='email'):
        if not state:
            state = generate_token()

        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': redirect_uri,
            'state': state,
        }

        if scope:
            params['scope'] = scope

        login_url = OAUTH_DIALOG_URL.format(urlencode(params))

        return login_url, state
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _redirect_with_params(url_name, *args, **kwargs):
    """Helper method to create a redirect response with URL params.

    This builds a redirect string that converts kwargs into a
    query string.

    Args:
        url_name: The name of the url to redirect to.
        kwargs: the query string param and their values to build.

    Returns:
        A properly formatted redirect string.
    """
    url = urlresolvers.reverse(url_name, args=args)
    params = parse.urlencode(kwargs, True)
    return "{0}?{1}".format(url, params)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _build_query(self, params):
    """Builds a query string.

    Args:
      params: dict, the query parameters

    Returns:
      The query parameters properly encoded into an HTTP URI query string.
    """
    if self.alt_param is not None:
      params.update({'alt': self.alt_param})
    astuples = []
    for key, value in six.iteritems(params):
      if type(value) == type([]):
        for x in value:
          x = x.encode('utf-8')
          astuples.append((key, x))
      else:
        if isinstance(value, six.text_type) and callable(value.encode):
          value = value.encode('utf-8')
        astuples.append((key, value))
    return '?' + urlencode(astuples)
项目:plugin.audio.spotify    作者:marcelveldt    | 项目源码 | 文件源码
def get_authorize_url(self, state=None):
        """ Gets the URL to use to authorize this app
        """
        payload = {'client_id': self.client_id,
                   'response_type': 'code',
                   'redirect_uri': self.redirect_uri}
        if self.scope:
            payload['scope'] = self.scope
        if state is None:
            state = self.state
        if state is not None:
            payload['state'] = state

        urlparams = urllibparse.urlencode(payload)

        return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def sort_url_by_query_keys(url):
    """A helper function which sorts the keys of the query string of a url.

       For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
       returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
       prevent non-deterministic ordering of the query string causing
       problems with unit tests.
    :param url: url which will be ordered by query keys
    :returns url: url with ordered query keys
    """
    parsed = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed.query, True)
    sorted_query = sorted(queries, key=lambda x: x[0])

    encoded_sorted_query = urlparse.urlencode(sorted_query, True)

    url_parts = (parsed.scheme, parsed.netloc, parsed.path,
                 parsed.params, encoded_sorted_query,
                 parsed.fragment)

    return urlparse.urlunparse(url_parts)
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:pynetbox    作者:digitalocean    | 项目源码 | 文件源码
def get_session_key(self):
        """Requests session key

        Issues a GET request to the `get-session-key` endpoint for
        subsequent use in requests from the `secrets` endpoint.

        :Returns: String containing session key.
        """
        req = requests.post(
            '{}/secrets/get-session-key/?preserve_key=True'.format(self.base),
            headers={
                'accept': 'application/json',
                'authorization': 'Token {}'.format(self.token),
                'Content-Type': 'application/x-www-form-urlencoded',
            },
            data=urlencode({
                'private_key': self.private_key.strip('\n')
            })
        )
        if req.ok:
            return json.loads(req.text)['session_key']
        else:
            raise RequestError(req)
项目:ironic-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def _list_request(self, resource, permanent=False, headers=None,
                      extra_headers=False, **kwargs):
        """Get the list of objects of the specified type.

        :param resource: The name of the REST resource, e.g., 'nodes'.
        :param headers: List of headers to use in request.
        :param extra_headers: Specify whether to use headers.
        :param **kwargs: Parameters for the request.
        :returns: A tuple with the server response and deserialized JSON list
                 of objects

        """
        uri = self._get_uri(resource, permanent=permanent)
        if kwargs:
            uri += "?%s" % urllib.urlencode(kwargs)

        resp, body = self.get(uri, headers=headers,
                              extra_headers=extra_headers)
        self.expected_success(http_client.OK, resp.status)

        return resp, self.deserialize(body)
项目:python-kingbirdclient    作者:openstack    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def _redirect_with_params(url_name, *args, **kwargs):
    """Helper method to create a redirect response with URL params.

    This builds a redirect string that converts kwargs into a
    query string.

    Args:
        url_name: The name of the url to redirect to.
        kwargs: the query string param and their values to build.

    Returns:
        A properly formatted redirect string.
    """
    url = urlresolvers.reverse(url_name, args=args)
    params = parse.urlencode(kwargs, True)
    return "{0}?{1}".format(url, params)
项目:plugin.audio.connectcontrol    作者:NicolasHaeffner    | 项目源码 | 文件源码
def get_authorize_url(self, state=None):
        """ Gets the URL to use to authorize this app
        """
        payload = {'client_id': self.client_id,
                   'response_type': 'code',
                   'redirect_uri': self.redirect_uri}
        if self.scope:
            payload['scope'] = self.scope
        if state is None:
            state = self.state
        if state is not None:
            payload['state'] = state

        urlparams = urllibparse.urlencode(payload)

        return "%s?%s" % (self.OAUTH_AUTHORIZE_URL, urlparams)
项目:boartty    作者:openstack    | 项目源码 | 文件源码
def syncStories(sync, priority, **kw):
    app = sync.app
    params = {}
    for k,v in kw.items():
        if v is not None:
            params[k] = v
    params = urlparse.urlencode(params)
    remote = sync.get('v1/stories?%s' % (params,))

    tasks = []
    for remote_story in remote:
        t = SyncStoryTask(remote_story['id'], remote_story,
                          priority=priority)
        sync.submitTask(t)
        tasks.append(t)
    return tasks
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def _format_metadata_url(self, api_key, page_number):
        """Build the query RL for the quandl WIKI metadata.
        """
        query_params = [
            ('per_page', '100'),
            ('sort_by', 'id'),
            ('page', str(page_number)),
            ('database_code', 'WIKI'),
        ]
        if api_key is not None:
            query_params = [('api_key', api_key)] + query_params

        return (
            'https://www.quandl.com/api/v3/datasets.csv?'
            + urlencode(query_params)
        )
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def _format_wiki_url(self,
                         api_key,
                         symbol,
                         start_date,
                         end_date,
                         data_frequency):
        """
        Build a query URL for a quandl WIKI dataset.
        """
        query_params = [
            ('start_date', start_date.strftime('%Y-%m-%d')),
            ('end_date', end_date.strftime('%Y-%m-%d')),
            ('order', 'asc'),
        ]
        if api_key is not None:
            query_params = [('api_key', api_key)] + query_params

        return (
            "https://www.quandl.com/api/v3/datasets/WIKI/"
            "{symbol}.csv?{query}".format(
                symbol=symbol,
                query=urlencode(query_params),
            )
        )
项目:script.module.python.twitch    作者:MrSprigster    | 项目源码 | 文件源码
def live_request(channel):
    token = channel_token(channel)
    if keys.ERROR in token:
        return token
    else:
        q = UsherQuery('api/channel/hls/{channel}.m3u8')
        q.add_urlkw(keys.CHANNEL, channel)
        q.add_param(keys.SIG, token[keys.SIG])
        q.add_param(keys.TOKEN, token[keys.TOKEN])
        q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE)
        q.add_param(keys.ALLOW_SPECTRE, Boolean.TRUE)
        q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE)
        url = '?'.join([q.url, urlencode(q.params)])
        request_dict = {'url': url, 'headers': q.headers}
        log.debug('live_request: |{0}|'.format(str(request_dict)))
        return request_dict
项目:script.module.python.twitch    作者:MrSprigster    | 项目源码 | 文件源码
def video_request(video_id):
    video_id = valid_video_id(video_id)
    if video_id:
        token = vod_token(video_id)
        if keys.ERROR in token:
            return token
        else:
            q = UsherQuery('vod/{id}')
            q.add_urlkw(keys.ID, video_id)
            q.add_param(keys.NAUTHSIG, token[keys.SIG])
            q.add_param(keys.NAUTH, token[keys.TOKEN])
            q.add_param(keys.ALLOW_SOURCE, Boolean.TRUE)
            q.add_param(keys.ALLOW_AUDIO_ONLY, Boolean.TRUE)
            url = '?'.join([q.url, urlencode(q.params)])
            request_dict = {'url': url, 'headers': q.headers}
            log.debug('video_request: |{0}|'.format(str(request_dict)))
            return request_dict
    else:
        raise NotImplementedError('Unknown Video Type')
项目:deb-oslo.vmware    作者:openstack    | 项目源码 | 文件源码
def test_get_transfer_ticket(self):
        dc_path = 'datacenter-1'
        ds_name = 'datastore-1'
        params = {'dcPath': dc_path, 'dsName': ds_name}
        query = urlparse.urlencode(params)
        url = 'https://13.37.73.31/folder/images/aa.vmdk?%s' % query
        session = mock.Mock()
        session.invoke_api = mock.Mock()

        class Ticket(object):
            id = 'fake_id'
        session.invoke_api.return_value = Ticket()
        ds_url = datastore.DatastoreURL.urlparse(url)
        ticket = ds_url.get_transfer_ticket(session, 'PUT')
        self.assertEqual('%s="%s"' % (constants.CGI_COOKIE_KEY, 'fake_id'),
                         ticket)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def _redirect_with_params(url_name, *args, **kwargs):
    """Helper method to create a redirect response with URL params.

    This builds a redirect string that converts kwargs into a
    query string.

    Args:
        url_name: The name of the url to redirect to.
        kwargs: the query string param and their values to build.

    Returns:
        A properly formatted redirect string.
    """
    url = urlresolvers.reverse(url_name, args=args)
    params = parse.urlencode(kwargs, True)
    return "{0}?{1}".format(url, params)
项目:REMAP    作者:REMAPApp    | 项目源码 | 文件源码
def _build_query(self, params):
    """Builds a query string.

    Args:
      params: dict, the query parameters

    Returns:
      The query parameters properly encoded into an HTTP URI query string.
    """
    if self.alt_param is not None:
      params.update({'alt': self.alt_param})
    astuples = []
    for key, value in six.iteritems(params):
      if type(value) == type([]):
        for x in value:
          x = x.encode('utf-8')
          astuples.append((key, x))
      else:
        if isinstance(value, six.text_type) and callable(value.encode):
          value = value.encode('utf-8')
        astuples.append((key, value))
    return '?' + urlencode(astuples)
项目:python-adjutantclient    作者:openstack    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:python-group-proj    作者:Sharcee    | 项目源码 | 文件源码
def _data_for_exchange(self, code=None, username=None, password=None, scope=None, user_id=None):
        client_params = {
            "client_id": self.api.client_id,
            "client_secret": self.api.client_secret,
            "redirect_uri": self.api.redirect_uri,
            "grant_type": "authorization_code"
        }
        if code:
            client_params.update(code=code)
        elif username and password:
            client_params.update(username=username,
                                 password=password,
                                 grant_type="password")
            if scope:
                client_params.update(scope=' '.join(scope))
        elif user_id:
            client_params.update(user_id=user_id)
        return urlencode(client_params)
项目:python-karborclient    作者:openstack    | 项目源码 | 文件源码
def get_instance(self, type, id, search_opts=None, session_id=None):
        if session_id:
            headers = {'X-Configuration-Session': session_id}
        else:
            headers = {}

        if search_opts is None:
            search_opts = {}
        query_params = {}
        for key, val in search_opts.items():
            if val:
                query_params[key] = val
        query_string = ""
        if query_params:
            params = sorted(query_params.items(), key=lambda x: x[0])
            query_string = "?%s" % parse.urlencode(params)

        url = ("/protectables/{protectable_type}/instances/"
               "{protectable_id}{query_string}").format(
            protectable_type=type, protectable_id=id,
            query_string=query_string)
        return self._get(url, response_key="instance", headers=headers)
项目:python-karborclient    作者:openstack    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:python-distilclient    作者:openstack    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _generate_uri(hotp, type_name, account_name, issuer, extra_parameters):
    parameters = [
        ("digits", hotp._length),
        ("secret", base64.b32encode(hotp._key)),
        ("algorithm", hotp._algorithm.name.upper()),
    ]

    if issuer is not None:
        parameters.append(("issuer", issuer))

    parameters.extend(extra_parameters)

    uriparts = {
        "type": type_name,
        "label": ("%s:%s" % (quote(issuer), quote(account_name)) if issuer
                  else quote(account_name)),
        "parameters": urlencode(parameters),
    }
    return "otpauth://{type}/{label}?{parameters}".format(**uriparts)
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def _build_query(self, params):
    """Builds a query string.

    Args:
      params: dict, the query parameters

    Returns:
      The query parameters properly encoded into an HTTP URI query string.
    """
    if self.alt_param is not None:
      params.update({'alt': self.alt_param})
    astuples = []
    for key, value in six.iteritems(params):
      if type(value) == type([]):
        for x in value:
          x = x.encode('utf-8')
          astuples.append((key, x))
      else:
        if isinstance(value, six.text_type) and callable(value.encode):
          value = value.encode('utf-8')
        astuples.append((key, value))
    return '?' + urlencode(astuples)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def get_histories(self,usage,search_opts=None):

        if search_opts is None:
           search_opts = {}

        qparams = {}

        for opt, val in six.iteritems(search_opts):
            if val:
                qparams[opt] = val

        # Transform the dict to a sequence of two-element tuples in fixed
        # order, then the encoded string will be consistent in Python 2&3.
        if qparams:
            items = list(qparams.items())
            new_qparams = sorted(items, key=lambda x: x[0])
            query_string = "?%s" % parse.urlencode(new_qparams)
        else:
            query_string = ""

        return self._get_histories("/usages/%s/histories%s" % (usage ,query_string), "histories")
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def list(self,search_opts=None):

        if search_opts is None:
            search_opts = {}

        qparams = {}

        for opt, val in six.iteritems(search_opts):
            if val:
                qparams[opt] = val

        # Transform the dict to a sequence of two-element tuples in fixed
        # order, then the encoded string will be consistent in Python 2&3.
        if qparams:
            items = list(qparams.items())
            new_qparams = sorted(items, key=lambda x: x[0])
            query_string = "?%s" % parse.urlencode(new_qparams)
        else:
            query_string = ""

        return self._list("/usages%s" % (query_string), "usages")
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def list(self, **kwargs):
        """Get a list of software configs.

        :rtype: list of :class:`SoftwareConfig`
        """
        qparams = {}

        for opt, val in six.iteritems(kwargs):
            if val:
                qparams[opt] = val

        # Transform the dict to a sequence of two-element tuples in fixed
        # order, then the encoded string will be consistent in Python 2&3.
        if qparams:
            new_qparams = sorted(qparams.items(), key=lambda x: x[0])
            query_string = "?%s" % parse.urlencode(new_qparams)
        else:
            query_string = ""
        url = '/software_configs%s' % query_string
        return self._list(url, "software_configs")
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def get(self, stack_id, resource_name, with_attr=None):
        """Get the details for a specific resource.

        :param stack_id: ID of stack containing the resource
        :param resource_name: ID of resource to get the details for
        :param with_attr: Attributes to show
        """
        stack_id = self._resolve_stack_id(stack_id)
        url_str = '/stacks/%s/resources/%s' % (
                  parse.quote(stack_id, ''),
                  parse.quote(encodeutils.safe_encode(resource_name), ''))
        if with_attr:
            params = {'with_attr': with_attr}
            url_str += '?%s' % parse.urlencode(params, True)

        resp = self.client.get(url_str)
        body = utils.get_response_body(resp)
        return Resource(self, body.get('resource'))
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def list(self, detailed=True, is_public=True):
        """
        Get a list of all flavors.

        :rtype: list of :class:`Flavor`.
        """
        qparams = {}
        # is_public is ternary - None means give all flavors.
        # By default Nova assumes True and gives admins public flavors
        # and flavors from their own projects only.
        if not is_public:
            qparams['is_public'] = is_public
        query_string = "?%s" % parse.urlencode(qparams) if qparams else ""

        detail = ""
        if detailed:
            detail = "/detail"

        return self._list("/flavors%s%s" % (detail, query_string), "flavors")
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def do_request(self, method, action, body=None, headers=None, params=None):
        action = self.action_prefix + action
        if type(params) is dict and params:
            params = utils.safe_encode_dict(params)
            action += '?' + urlparse.urlencode(params, doseq=1)

        if body:
            body = self.serialize(body)

        resp, replybody = self.httpclient.do_request(
            action, method, body=body,
            content_type=self.content_type())

        status_code = resp.status_code
        if status_code in (requests.codes.ok,
                           requests.codes.created,
                           requests.codes.accepted,
                           requests.codes.no_content):
            return self.deserialize(replybody, status_code)
        else:
            if not replybody:
                replybody = resp.reason
            self._handle_fault_response(status_code, replybody)
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = "No %s matching %s." % (self.resource_class.__name__, kwargs)
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]
项目:eclcli    作者:nttcom    | 项目源码 | 文件源码
def find(self, base_url=None, **kwargs):
        """Find a single item with attributes matching ``**kwargs``.

        :param base_url: if provided, the generated URL will be appended to it
        """
        kwargs = self._filter_kwargs(kwargs)

        rl = self._list(
            '%(base_url)s%(query)s' % {
                'base_url': self.build_url(base_url=base_url, **kwargs),
                'query': '?%s' % parse.urlencode(kwargs) if kwargs else '',
            },
            self.collection_key)
        num = len(rl)

        if num == 0:
            msg = _("No %(name)s matching %(args)s.") % {
                'name': self.resource_class.__name__,
                'args': kwargs
            }
            raise exceptions.NotFound(404, msg)
        elif num > 1:
            raise exceptions.NoUniqueMatch
        else:
            return rl[0]